Skip to content

Commit

Permalink
Merge pull request #1377 from ipfs/fix/1360-efficient-pin-status
Browse files Browse the repository at this point in the history
Fix #1360: Efficient pinset status with filters
  • Loading branch information
hsanjuan committed Jul 6, 2021
2 parents 2397d6d + edfcfa3 commit 54c3608
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 104 deletions.
30 changes: 2 additions & 28 deletions api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,30 +891,6 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
}
}

// filterGlobalPinInfos takes a GlobalPinInfo slice and discards
// any item in it which does not carry a PinInfo matching the
// filter (OR-wise).
func filterGlobalPinInfos(globalPinInfos []*types.GlobalPinInfo, filter types.TrackerStatus) []*types.GlobalPinInfo {
if filter == types.TrackerStatusUndefined {
return globalPinInfos
}

var filteredGlobalPinInfos []*types.GlobalPinInfo

for _, globalPinInfo := range globalPinInfos {
for _, pinInfo := range globalPinInfo.PeerMap {
// silenced the error because we should have detected
// earlier if filters were invalid
if pinInfo.Status.Match(filter) {
filteredGlobalPinInfos = append(filteredGlobalPinInfos, globalPinInfo)
break
}
}
}

return filteredGlobalPinInfos
}

func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
Expand All @@ -936,7 +912,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAllLocal",
struct{}{},
filter,
&pinInfos,
)
if err != nil {
Expand All @@ -950,7 +926,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAll",
struct{}{},
filter,
&globalPinInfos,
)
if err != nil {
Expand All @@ -959,8 +935,6 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
}
}

globalPinInfos = filterGlobalPinInfos(globalPinInfos, filter)

api.sendResponse(w, autoStatus, nil, globalPinInfos)
}

Expand Down
37 changes: 22 additions & 15 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// The IPFS daemon is not pinning the item through this cid but it is
// tracked in a cluster dag
TrackerStatusSharded
// The item is in the state and should be pinned, but
// it is however not pinned and not queued/pinning.
TrackerStatusUnexpectedlyUnpinned
)

// Composite TrackerStatus.
Expand All @@ -89,19 +92,21 @@ const (
type TrackerStatus int

var trackerStatusString = map[TrackerStatus]string{
TrackerStatusUndefined: "undefined",
TrackerStatusClusterError: "cluster_error",
TrackerStatusPinError: "pin_error",
TrackerStatusUnpinError: "unpin_error",
TrackerStatusError: "error",
TrackerStatusPinned: "pinned",
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
TrackerStatusUndefined: "undefined",
TrackerStatusClusterError: "cluster_error",
TrackerStatusPinError: "pin_error",
TrackerStatusUnpinError: "unpin_error",
TrackerStatusError: "error",
TrackerStatusPinned: "pinned",
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
TrackerStatusSharded: "sharded",
TrackerStatusUnexpectedlyUnpinned: "unexpectedly_unpinned",
}

// values autofilled in init()
Expand Down Expand Up @@ -130,9 +135,11 @@ func (st TrackerStatus) String() string {

// Match returns true if the tracker status matches the given filter.
// For example TrackerStatusPinError will match TrackerStatusPinError
// and TrackerStatusError
// and TrackerStatusError.
func (st TrackerStatus) Match(filter TrackerStatus) bool {
return filter == 0 || st&filter > 0
return filter == TrackerStatusUndefined ||
st == TrackerStatusUndefined ||
st&filter > 0
}

// MarshalJSON uses the string representation of TrackerStatus for JSON
Expand Down
18 changes: 11 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,21 +1087,21 @@ func (c *Cluster) StateSync(ctx context.Context) error {
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
func (c *Cluster) StatusAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll")
return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll", filter)
}

// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
func (c *Cluster) StatusAllLocal(ctx context.Context) []*api.PinInfo {
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.tracker.StatusAll(ctx)
return c.tracker.StatusAll(ctx, filter)
}

// Status returns the GlobalPinInfo for a given Cid as fetched from all
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal")
return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal", nil)
}

// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
Expand Down Expand Up @@ -1824,10 +1824,14 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
return gpin, nil
}

func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, arg interface{}) ([]*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice")
defer span.End()

if arg == nil {
arg = struct{}{}
}

infos := make([]*api.GlobalPinInfo, 0)
fullMap := make(map[cid.Cid]*api.GlobalPinInfo)

Expand Down Expand Up @@ -1857,7 +1861,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
members,
comp,
method,
struct{}{},
arg,
rpcutil.CopyPinInfoSliceToIfaces(replies),
)

Expand Down
5 changes: 3 additions & 2 deletions ipfscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ type PinTracker interface {
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(context.Context, cid.Cid) error
// StatusAll returns the list of pins with their local status.
StatusAll(context.Context) []*api.PinInfo
// StatusAll returns the list of pins with their local status. Takes a
// filter to specify which statuses to report.
StatusAll(context.Context, api.TrackerStatus) []*api.PinInfo
// Status returns the local status of a given Cid.
Status(context.Context, cid.Cid) *api.PinInfo
// RecoverAll calls Recover() for all pins tracked.
Expand Down
10 changes: 5 additions & 5 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func TestClustersPin(t *testing.T) {
delay()
}
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
if v.Status != api.TrackerStatusPinned {
t.Errorf("%s should have been pinned but it is %s", v.Cid, v.Status)
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestClustersPin(t *testing.T) {
delay()

funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
t.Errorf("%s should have been unpinned but it is %s", v.Cid, v.Status)
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestClustersStatusAll(t *testing.T) {
pinDelay()
// Global status
f := func(t *testing.T, c *Cluster) {
statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -910,7 +910,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
return
}

statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func TestClustersReplicationOverall(t *testing.T) {

f := func(t *testing.T, c *Cluster) {
// confirm that the pintracker state matches the current global state
pinfos := c.tracker.StatusAll(ctx)
pinfos := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
if len(pinfos) != nClusters {
t.Error("Pinfos does not have the expected pins")
}
Expand Down
6 changes: 3 additions & 3 deletions pintracker/pintracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
// in state but not on IPFS
Cid: test.Cid4,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
Status: api.TrackerStatusUnexpectedlyUnpinned,
},
},
},
Expand All @@ -216,7 +216,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(200 * time.Millisecond)
got := tt.args.tracker.StatusAll(context.Background())
got := tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
if len(got) != len(tt.want) {
for _, pi := range got {
t.Logf("pinfo: %v", pi)
Expand Down Expand Up @@ -259,7 +259,7 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
tt.args.tracker.StatusAll(context.Background())
tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
}
})
}
Expand Down
71 changes: 43 additions & 28 deletions pintracker/stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,31 @@ func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error {
}

// StatusAll returns information for all Cids pinned to the local IPFS node.
func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo {
func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
defer span.End()

pininfos, err := spt.localStatus(ctx, true)
pininfos, err := spt.localStatus(ctx, true, filter)
if err != nil {
return nil
}

// get all inflight operations from optracker and put them into the
// map, deduplicating any existing items with their inflight operation.
//
// we cannot filter in GetAll, because we are meant to replace items in
// pininfos and set the correct status, as otherwise they will remain in
// PinError.
for _, infop := range spt.optracker.GetAll(ctx) {
pininfos[infop.Cid] = infop
}

var pis []*api.PinInfo
for _, pi := range pininfos {
pis = append(pis, pi)
// Last filter.
if pi.Status.Match(filter) {
pis = append(pis, pi)
}
}
return pis
}
Expand Down Expand Up @@ -382,7 +389,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
defer span.End()

statuses := spt.StatusAll(ctx)
statuses := spt.StatusAll(ctx, api.TrackerStatusUndefined)
resp := make([]*api.PinInfo, 0)
for _, st := range statuses {
r, err := spt.recoverWithPinInfo(ctx, st)
Expand Down Expand Up @@ -412,7 +419,7 @@ func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
var err error
switch pi.Status {
case api.TrackerStatusPinError:
case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned:
logger.Infof("Restarting pin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin)
case api.TrackerStatusUnpinError:
Expand Down Expand Up @@ -465,11 +472,12 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
return pins, nil
}

// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be meta or remote and leaving any ipfs pins that
// aren't in the consensusState out. If incExtra is true, Remote and Sharded
// pins will be added to the status slice.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid]*api.PinInfo, error) {
// localStatus returns a joint set of consensusState and ipfsStatus marking
// pins which should be meta or remote and leaving any ipfs pins that aren't
// in the consensusState out. If incExtra is true, Remote and Sharded pins
// will be added to the status slice. If a filter is provided, only statuses
// matching the filter will be returned.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.TrackerStatus) (map[cid.Cid]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus")
defer span.End()

Expand All @@ -486,11 +494,14 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid
return nil, err
}

// get statuses from ipfs node first
localpis, err := spt.ipfsStatusAll(ctx)
if err != nil {
logger.Error(err)
return nil, err
var localpis map[cid.Cid]*api.PinInfo
// Only query IPFS if we want to status for pinned items
if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) {
localpis, err = spt.ipfsStatusAll(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}

pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins))
Expand All @@ -509,25 +520,29 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid

switch {
case p.Type == api.MetaType:
pinInfo.Status = api.TrackerStatusSharded
if incExtra {
pininfos[p.Cid] = &pinInfo
if !incExtra || !filter.Match(api.TrackerStatusSharded) {
continue
}
pinInfo.Status = api.TrackerStatusSharded
pininfos[p.Cid] = &pinInfo
case p.IsRemotePin(spt.peerID):
pinInfo.Status = api.TrackerStatusRemote
if incExtra {
pininfos[p.Cid] = &pinInfo
if !incExtra || !filter.Match(api.TrackerStatusRemote) {
continue
}
case pinnedInIpfs:
pinInfo.Status = api.TrackerStatusRemote
pininfos[p.Cid] = &pinInfo
case pinnedInIpfs: // always false unless filter matches TrackerStatusPinnned
ipfsInfo.Name = p.Name
pininfos[p.Cid] = ipfsInfo
default:
// report as PIN_ERROR for this peer. this will be
// overwritten if the operation tracker has more info
// for this (an ongoing pinning operation). Otherwise,
// it means something should be pinned and it is not
// known by IPFS. Should be handled to "recover".
pinInfo.Status = api.TrackerStatusPinError
// report as UNEXPECTEDLY_UNPINNED for this peer.
// this will be overwritten if the operation tracker
// has more info for this (an ongoing pinning
// operation). Otherwise, it means something should be
// pinned and it is not known by IPFS. Should be
// handled to "recover".

pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned
pinInfo.Error = errUnexpectedlyUnpinned.Error()
pininfos[p.Cid] = &pinInfo
}
Expand Down

0 comments on commit 54c3608

Please sign in to comment.