Skip to content

Commit

Permalink
Include Name as GlobalPinInfo key and consolidate redundant keys
Browse files Browse the repository at this point in the history
GlobalPinInfo objects carried redundant information (Cid, Peer) that takes
space and time to serialize.

This has been addressed by having GlobalPinInfo embed PinInfoShort rather than
PinInfo. This new types ommits redundant fields.
  • Loading branch information
hsanjuan committed May 16, 2020
1 parent 4a0c819 commit 055f482
Show file tree
Hide file tree
Showing 11 changed files with 267 additions and 186 deletions.
12 changes: 2 additions & 10 deletions api/rest/client/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,16 +490,12 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
if time.Now().After(wait.pinStart.Add(5 * time.Second)) { //pinned
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfo{
PeerMap: map[string]*types.PinInfoShort{
peer.Encode(test.PeerID1): {
Cid: in,
Peer: test.PeerID1,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},
peer.Encode(test.PeerID2): {
Cid: in,
Peer: test.PeerID2,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},
Expand All @@ -508,16 +504,12 @@ func (wait *waitService) Status(ctx context.Context, in cid.Cid, out *types.Glob
} else { // pinning
*out = types.GlobalPinInfo{
Cid: in,
PeerMap: map[string]*types.PinInfo{
PeerMap: map[string]*types.PinInfoShort{
peer.Encode(test.PeerID1): {
Cid: in,
Peer: test.PeerID1,
Status: types.TrackerStatusPinning,
TS: wait.pinStart,
},
peer.Encode(test.PeerID2): {
Cid: in,
Peer: test.PeerID2,
Status: types.TrackerStatusPinned,
TS: wait.pinStart,
},
Expand Down
15 changes: 3 additions & 12 deletions api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -960,7 +960,7 @@ func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo))
api.sendResponse(w, autoStatus, err, pinInfo.ToGlobal())
} else {
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
Expand Down Expand Up @@ -1019,7 +1019,7 @@ func (api *API) recoverHandler(w http.ResponseWriter, r *http.Request) {
pin.Cid,
&pinInfo,
)
api.sendResponse(w, autoStatus, err, pinInfoToGlobal(&pinInfo))
api.sendResponse(w, autoStatus, err, pinInfo.ToGlobal())
} else {
var pinInfo types.GlobalPinInfo
err := api.rpcClient.CallContext(
Expand Down Expand Up @@ -1127,19 +1127,10 @@ func (api *API) parsePidOrError(w http.ResponseWriter, r *http.Request) peer.ID
return pid
}

func pinInfoToGlobal(pInfo *types.PinInfo) *types.GlobalPinInfo {
return &types.GlobalPinInfo{
Cid: pInfo.Cid,
PeerMap: map[string]*types.PinInfo{
peer.Encode(pInfo.Peer): pInfo,
},
}
}

func pinInfosToGlobal(pInfos []*types.PinInfo) []*types.GlobalPinInfo {
gPInfos := make([]*types.GlobalPinInfo, len(pInfos))
for i, p := range pInfos {
gPInfos[i] = pinInfoToGlobal(p)
gPInfos[i] = p.ToGlobal()
}
return gPInfos
}
Expand Down
45 changes: 38 additions & 7 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,11 +246,12 @@ var ipfsPinStatus2TrackerStatusMap = map[IPFSPinStatus]TrackerStatus{
// GlobalPinInfo contains cluster-wide status information about a tracked Cid,
// indexed by cluster peer.
type GlobalPinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"n"`
// https://github.com/golang/go/issues/28827
// Peer IDs are of string Kind(). We can't use peer IDs here
// as Go ignores TextMarshaler.
PeerMap map[string]*PinInfo `json:"peer_map" codec:"pm,omitempty"`
PeerMap map[string]*PinInfoShort `json:"peer_map" codec:"pm,omitempty"`
}

// String returns the string representation of a GlobalPinInfo.
Expand All @@ -263,17 +264,47 @@ func (gpi *GlobalPinInfo) String() string {
return str
}

// PinInfo holds information about local pins.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"n,omitempty"`
Peer peer.ID `json:"peer" codec:"p,omitempty"`
// Add adds a PinInfo object to a GlobalPinInfo
func (gpi *GlobalPinInfo) Add(pi *PinInfo) {
if !gpi.Cid.Defined() {
gpi.Cid = pi.Cid
gpi.Name = pi.Name
}

if gpi.PeerMap == nil {
gpi.PeerMap = make(map[string]*PinInfoShort)
}

gpi.PeerMap[peer.Encode(pi.Peer)] = &pi.PinInfoShort
}

// PinInfoShort is a subset of PinInfo which is embedded in GlobalPinInfo
// objects and does not carry redundant information as PinInfo would.
type PinInfoShort struct {
PeerName string `json:"peername" codec:"pn,omitempty"`
Status TrackerStatus `json:"status" codec:"st,omitempty"`
TS time.Time `json:"timestamp" codec:"ts,omitempty"`
Error string `json:"error" codec:"e,omitempty"`
}

// PinInfo holds information about local pins. This is used by the Pin
// Trackers.
type PinInfo struct {
Cid cid.Cid `json:"cid" codec:"c"`
Name string `json:"name" codec:"m,omitempty"`
Peer peer.ID `json:"Peer" codec:"p,omitempty"`

PinInfoShort
}

// ToGlobal converts a PinInfo object to a GlobalPinInfo with
// a single peer corresponding to the given PinInfo.
func (pi *PinInfo) ToGlobal() *GlobalPinInfo {
gpi := GlobalPinInfo{}
gpi.Add(pi)
return &gpi
}

// Version holds version information
type Version struct {
Version string `json:"version" codec:"v"`
Expand Down
148 changes: 88 additions & 60 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1646,35 +1646,72 @@ func (c *Cluster) getTrustedPeers(ctx context.Context) ([]peer.ID, error) {
return trustedPeers, nil
}

func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, t time.Time) {
func setTrackerStatus(gpin *api.GlobalPinInfo, h cid.Cid, peers []peer.ID, status api.TrackerStatus, name string, t time.Time) {
for _, p := range peers {
gpin.PeerMap[peer.Encode(p)] = &api.PinInfo{
Cid: h,
Peer: p,
PeerName: p.String(),
Status: status,
TS: t,
}
gpin.Add(&api.PinInfo{
Cid: h,
Name: name,
Peer: p,
PinInfoShort: api.PinInfoShort{
PeerName: p.String(),
Status: status,
TS: t,
},
})
}
}

func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h cid.Cid) (*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoCid")
defer span.End()

gpin := &api.GlobalPinInfo{
Cid: h,
PeerMap: make(map[string]*api.PinInfo),
}
// The object we will return
gpin := &api.GlobalPinInfo{}

// allocated peers, we will contact them through rpc
var dests []peer.ID
// un-allocated peers, we will set remote status
var remote []peer.ID
timeNow := time.Now()

// set dests and remote
// If pin is not part of the pinset, mark it unpinned
pin, err := c.PinGet(ctx, h)
if err != nil && err != state.ErrNotFound {
logger.Error(err)
return nil, err
}

// When NotFound return directly with an unpinned
// status.
if err == state.ErrNotFound {
var members []peer.ID
if c.config.FollowerMode {
members = []peer.ID{c.host.ID()}
} else {
members, err = c.consensus.Peers(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}

setTrackerStatus(
gpin,
h,
members,
api.TrackerStatusUnpinned,
"",
timeNow,
)
return gpin, nil
}

// The pin exists. Set the name.
gpin.Name = pin.Name

// Make the list of peers that will receive the request.
if c.config.FollowerMode {
// during follower mode return status only on self peer
// during follower mode return only local status.
dests = []peer.ID{c.host.ID()}
remote = []peer.ID{}
} else {
Expand All @@ -1684,17 +1721,6 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
return nil, err
}

// If pin is not part of the pinset, mark it unpinned
pin, err := c.PinGet(ctx, h)
if err == state.ErrNotFound {
setTrackerStatus(gpin, h, members, api.TrackerStatusUnpinned, timeNow)
return gpin, nil
}
if err != nil {
logger.Error(err)
return nil, err
}

if len(pin.Allocations) > 0 {
dests = pin.Allocations
remote = peersSubtract(members, dests)
Expand All @@ -1705,7 +1731,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
}

// set status remote on un-allocated peers
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, timeNow)
setTrackerStatus(gpin, h, remote, api.TrackerStatusRemote, pin.Name, timeNow)

lenDests := len(dests)
replies := make([]*api.PinInfo, lenDests)
Expand All @@ -1726,7 +1752,7 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c

// No error. Parse and continue
if e == nil {
gpin.PeerMap[peer.Encode(dests[i])] = r
gpin.Add(r)
continue
}

Expand All @@ -1737,14 +1763,17 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c

// Deal with error cases (err != nil): wrap errors in PinInfo
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, dests[i], e)
gpin.PeerMap[peer.Encode(dests[i])] = &api.PinInfo{
Cid: h,
Peer: dests[i],
PeerName: dests[i].String(),
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
}
gpin.Add(&api.PinInfo{
Cid: h,
Name: pin.Name,
Peer: dests[i],
PinInfoShort: api.PinInfoShort{
PeerName: dests[i].String(),
Status: api.TrackerStatusClusterError,
TS: timeNow,
Error: e.Error(),
},
})
}

return gpin, nil
Expand Down Expand Up @@ -1784,23 +1813,16 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
rpcutil.CopyPinInfoSliceToIfaces(replies),
)

mergePins := func(pins []*api.PinInfo) {
for _, p := range pins {
if p == nil {
continue
}
item, ok := fullMap[p.Cid]
if !ok {
fullMap[p.Cid] = &api.GlobalPinInfo{
Cid: p.Cid,
PeerMap: map[string]*api.PinInfo{
peer.Encode(p.Peer): p,
},
}
} else {
item.PeerMap[peer.Encode(p.Peer)] = p
}
setPinInfo := func(p *api.PinInfo) {
if p == nil {
return
}
info, ok := fullMap[p.Cid]
if !ok {
info = &api.GlobalPinInfo{}
fullMap[p.Cid] = info
}
info.Add(p)
}

erroredPeers := make(map[peer.ID]string)
Expand All @@ -1812,21 +1834,27 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
}
logger.Errorf("%s: error in broadcast response from %s: %s ", c.id, members[i], e)
erroredPeers[members[i]] = e.Error()
} else {
mergePins(r)
continue
}

for _, pin := range r {
setPinInfo(pin)
}
}

// Merge any errors
for p, msg := range erroredPeers {
for c := range fullMap {
fullMap[c].PeerMap[peer.Encode(p)] = &api.PinInfo{
Cid: c,
Peer: p,
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
}
setPinInfo(&api.PinInfo{
Cid: c,
Name: "",
Peer: p,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusClusterError,
TS: time.Now(),
Error: msg,
},
})
}
}

Expand Down
8 changes: 3 additions & 5 deletions cmd/ipfs-cluster-ctl/formatters.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,17 +142,15 @@ func textFormatPrintID(obj *api.ID) {
func textFormatPrintGPInfo(obj *api.GlobalPinInfo) {
var b strings.Builder

var name string
peers := make([]string, 0, len(obj.PeerMap))
for k, v := range obj.PeerMap {
for k := range obj.PeerMap {
peers = append(peers, k)
name = v.Name // All PinInfos will have the same name
}
sort.Strings(peers)

fmt.Fprintf(&b, "%s", obj.Cid)
if name != "" {
fmt.Fprintf(&b, " | %s", name)
if obj.Name != "" {
fmt.Fprintf(&b, " | %s", obj.Name)
}

b.WriteString(":\n")
Expand Down
2 changes: 1 addition & 1 deletion cmd/ipfs-cluster-follow/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ func printStatusOnline(absPath, clusterName string) error {
}
}
pinInfo := gpi.PeerMap[pid]
printPin(gpi.Cid, pinInfo.Status.String(), pinInfo.Name, pinInfo.Error)
printPin(gpi.Cid, pinInfo.Status.String(), gpi.Name, pinInfo.Error)
}
return nil
}
Expand Down

0 comments on commit 055f482

Please sign in to comment.