Skip to content

Commit

Permalink
Address issues from self-review
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Hector Sanjuan <hector@protocol.ai>
  • Loading branch information
hsanjuan committed Feb 27, 2019
1 parent 6447ea5 commit c4b18cd
Show file tree
Hide file tree
Showing 10 changed files with 133 additions and 68 deletions.
2 changes: 1 addition & 1 deletion adder/adder.go
Expand Up @@ -94,7 +94,7 @@ func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (cid.Cid
// FromFiles adds content from a files.Directory. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.Directory) (cid.Cid, error) {
logger.Error("adding from files")
logger.Debug("adding from files")
a.setContext(ctx)

if a.ctx.Err() != nil { // don't allow running twice
Expand Down
7 changes: 2 additions & 5 deletions api/ipfsproxy/ipfsproxy.go
Expand Up @@ -300,15 +300,11 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
return
}

var rpcArg interface{} = api.PinCid(c)
if op == "Unpin" {
rpcArg = c
}
err = proxy.rpcClient.Call(
"",
"Cluster",
op,
rpcArg,
api.PinCid(c),
&struct{}{},
)
if err != nil {
Expand Down Expand Up @@ -485,6 +481,7 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
repoStats := make([]*api.IPFSRepoStat, len(peers), len(peers))
repoStatsIfaces := make([]interface{}, len(repoStats), len(repoStats))
for i := range repoStats {
repoStats[i] = &api.IPFSRepoStat{}
repoStatsIfaces[i] = repoStats[i]
}

Expand Down
2 changes: 1 addition & 1 deletion api/rest/restapi.go
Expand Up @@ -697,7 +697,7 @@ func (api *API) unpinHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"Unpin",
pin.Cid,
pin,
&struct{}{},
)
api.sendResponse(w, http.StatusAccepted, err, nil)
Expand Down
6 changes: 0 additions & 6 deletions api/rest/restapi_test.go
Expand Up @@ -781,12 +781,6 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
resp[0].Cid.String() != test.TestCid1 ||
resp[1].PeerMap[peer.IDB58Encode(test.TestPeerID1)].Status.String() != "pinning" {
t.Errorf("unexpected statusAll resp")
for _, gpi := range resp {
t.Errorf("%s:\n", gpi.Cid)
for k, v := range gpi.PeerMap {
t.Errorf("%s: %+v\n", k, v)
}
}
}

// Test local=true
Expand Down
50 changes: 34 additions & 16 deletions api/types.go
Expand Up @@ -241,7 +241,7 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster peer.
type GlobalPinInfo struct {
Cid cid.Cid `json:"cid" codec:"c,omitempty"`
Cid cid.Cid `json:"cid" codec:"c"`
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
Expand All @@ -250,7 +250,7 @@ type GlobalPinInfo struct {

// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c,omitempty"`
Cid cid.Cid `json:"cid" codec:"c"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
PeerName string `json:"peername" codec:"pn,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
Expand All @@ -260,21 +260,21 @@ type PinInfo struct {

// Version holds version information
type Version struct {
Version string `json:"Version" codec:"v,omitempty"`
Version string `json:"Version" codec:"v"`
}

// ConnectGraph holds information about the connectivity of the cluster
// To read, traverse the keys of ClusterLinks. Each such id is one of
// the peers of the "ClusterID" peer running the query. ClusterLinks[id]
// in turn lists the ids that peer "id" sees itself connected to. It is
// possible that id is a peer of ClusterID, but ClusterID can not reach id
// over rpc, in which case ClusterLinks[id] == [], as id's view of its
// connectivity can not be retrieved.
// ConnectGraph holds information about the connectivity of the cluster To
// read, traverse the keys of ClusterLinks. Each such id is one of the peers
// of the "ClusterID" peer running the query. ClusterLinks[id] in turn lists
// the ids that peer "id" sees itself connected to. It is possible that id is
// a peer of ClusterID, but ClusterID can not reach id over rpc, in which case
// ClusterLinks[id] == [], as id's view of its connectivity can not be
// retrieved.
//
// Iff there was an error reading the IPFSID of the peer then id will not be a
// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS
// then id will be a key of IPFSLinks. In the event of a SwarmPeers error
// IPFSLinks[id] == [].
// Iff there was an error reading the IPFSID of the peer then id will not be a
// key of ClustertoIPFS or IPFSLinks. Finally iff id is a key of ClustertoIPFS
// then id will be a key of IPFSLinks. In the event of a SwarmPeers error
// IPFSLinks[id] == [].
type ConnectGraph struct {
ClusterID peer.ID
// ipfs to ipfs links
Expand All @@ -285,39 +285,51 @@ type ConnectGraph struct {
ClustertoIPFS map[string]peer.ID `json:"cluster_to_ipfs" codec:"ci,omitempty"`
}

// Multiaddr is a utility type wrapping a Multiaddress
// Multiaddr is a concrete type to wrap a Multiaddress so that it knows how to
// serialize and deserialize itself.
type Multiaddr struct {
multiaddr.Multiaddr
}

// NewMultiaddr returns a cluster Multiaddr wrapper creating the
// multiaddr.Multiaddr with the given string.
func NewMultiaddr(mstr string) (Multiaddr, error) {
m, err := multiaddr.NewMultiaddr(mstr)
return Multiaddr{Multiaddr: m}, err
}

// NewMultiaddrWithValue returns a new cluster Multiaddr wrapper using the
// given multiaddr.Multiaddr.
func NewMultiaddrWithValue(ma multiaddr.Multiaddr) Multiaddr {
return Multiaddr{Multiaddr: ma}
}

// MarshalJSON returns a JSON-formatted multiaddress.
func (maddr Multiaddr) MarshalJSON() ([]byte, error) {
return maddr.Multiaddr.MarshalJSON()
}

// UnmarshalJSON parses a cluster Multiaddr from the JSON representation.
func (maddr *Multiaddr) UnmarshalJSON(data []byte) error {
maddr.Multiaddr, _ = multiaddr.NewMultiaddr("")
return maddr.Multiaddr.UnmarshalJSON(data)
}

// MarshalBinary returs the bytes of the wrapped multiaddress.
func (maddr Multiaddr) MarshalBinary() ([]byte, error) {
return maddr.Multiaddr.MarshalBinary()
}

// UnmarshalBinary casts some bytes as a multiaddress wraps it with
// the given cluster Multiaddr.
func (maddr *Multiaddr) UnmarshalBinary(data []byte) error {
datacopy := make([]byte, len(data)) // This is super important
copy(datacopy, data)
maddr.Multiaddr, _ = multiaddr.NewMultiaddr("")
return maddr.Multiaddr.UnmarshalBinary(datacopy)
}

// Value returns the wrapped multiaddr.Multiaddr.
func (maddr Multiaddr) Value() multiaddr.Multiaddr {
return maddr.Multiaddr
}
Expand Down Expand Up @@ -701,7 +713,13 @@ func (pin *Pin) Equals(pin2 *Pin) bool {
return false
}

if pin.Reference != pin2.Reference {
if pin.Reference != nil && pin2.Reference == nil ||
pin.Reference == nil && pin2.Reference != nil {
return false
}

if pin.Reference != nil && pin2.Reference != nil &&
!pin.Reference.Equals(*pin2.Reference) {
return false
}

Expand Down
5 changes: 4 additions & 1 deletion cluster.go
Expand Up @@ -1378,7 +1378,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api
members, err := c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return []*api.GlobalPinInfo{}, err
return nil, err
}
lenMembers := len(members)

Expand All @@ -1398,6 +1398,9 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, method string) ([]*api

mergePins := func(pins []*api.PinInfo) {
for _, p := range pins {
if p == nil {
continue
}
item, ok := fullMap[p.Cid]
if !ok {
fullMap[p.Cid] = &api.GlobalPinInfo{
Expand Down
2 changes: 1 addition & 1 deletion consensus/raft/consensus_test.go
Expand Up @@ -95,7 +95,7 @@ func TestConsensusPin(t *testing.T) {
t.Error("the operation did not make it to the log:", err)
}

time.Sleep(5000 * time.Millisecond)
time.Sleep(250 * time.Millisecond)
st, err := cc.State(ctx)
if err != nil {
t.Fatal("error getting state:", err)
Expand Down
10 changes: 5 additions & 5 deletions consensus/raft/log_op.go
Expand Up @@ -26,10 +26,10 @@ type LogOpType int
// It implements the consensus.Op interface and it is used by the
// Consensus component.
type LogOp struct {
SpanCtx trace.SpanContext `codec:"sctx,omitempty"`
TagCtx []byte `codec:"tctx,omitempty"`
Cid *api.Pin `codec:"p,omitempty"`
Type LogOpType `codec:"t,omitempty"`
SpanCtx trace.SpanContext `codec:"s,omitempty"`
TagCtx []byte `codec:"t,omitempty"`
Cid *api.Pin `codec:"c,omitempty"`
Type LogOpType `codec:"p,omitempty"`
consensus *Consensus `codec:-`
tracing bool `codec:-`
}
Expand Down Expand Up @@ -91,7 +91,7 @@ func (op *LogOp) ApplyTo(cstate consensus.State) (consensus.State, error) {
"",
"Cluster",
"Untrack",
pin.Cid,
pin,
&struct{}{},
nil,
)
Expand Down

0 comments on commit c4b18cd

Please sign in to comment.