Skip to content

Commit

Permalink
Fix #1360: Efficient pinset status with filters
Browse files Browse the repository at this point in the history
This commit modifies the pintracker StatusAll call to take a status filter.

This allows to skip a PinLs call to ipfs when checking status for items that
are queued, pinning, unpinning or in error. Those status come directly from
the operation tracker. This should result in a significant performance
increase for those calls, particularly in nodes with several hundred thousand
pins and more, where the call to IPFS is very expensive.

A new TrackerStatusUnexpectedlyUnpinned status has been introduce to
differentiate between pin errors (tracked by the operation tracker) and "lost"
items (which before were pin errors too). This new status is handled by the
Recover() operation as before.
  • Loading branch information
hsanjuan committed Jul 6, 2021
1 parent f7a2e4a commit 1393eeb
Show file tree
Hide file tree
Showing 10 changed files with 129 additions and 102 deletions.
30 changes: 2 additions & 28 deletions api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -891,30 +891,6 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
}
}

// filterGlobalPinInfos takes a GlobalPinInfo slice and discards
// any item in it which does not carry a PinInfo matching the
// filter (OR-wise).
func filterGlobalPinInfos(globalPinInfos []*types.GlobalPinInfo, filter types.TrackerStatus) []*types.GlobalPinInfo {
if filter == types.TrackerStatusUndefined {
return globalPinInfos
}

var filteredGlobalPinInfos []*types.GlobalPinInfo

for _, globalPinInfo := range globalPinInfos {
for _, pinInfo := range globalPinInfo.PeerMap {
// silenced the error because we should have detected
// earlier if filters were invalid
if pinInfo.Status.Match(filter) {
filteredGlobalPinInfos = append(filteredGlobalPinInfos, globalPinInfo)
break
}
}
}

return filteredGlobalPinInfos
}

func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")
Expand All @@ -936,7 +912,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAllLocal",
struct{}{},
filter,
&pinInfos,
)
if err != nil {
Expand All @@ -950,7 +926,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
"",
"Cluster",
"StatusAll",
struct{}{},
filter,
&globalPinInfos,
)
if err != nil {
Expand All @@ -959,8 +935,6 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
}
}

globalPinInfos = filterGlobalPinInfos(globalPinInfos, filter)

api.sendResponse(w, autoStatus, nil, globalPinInfos)
}

Expand Down
37 changes: 22 additions & 15 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ const (
// The IPFS daemon is not pinning the item through this cid but it is
// tracked in a cluster dag
TrackerStatusSharded
// The item is in the state and should be pinned, but
// it is however not pinned and not queued/pinning.
TrackerStatusUnexpectedlyUnpinned
)

// Composite TrackerStatus.
Expand All @@ -89,19 +92,21 @@ const (
type TrackerStatus int

var trackerStatusString = map[TrackerStatus]string{
TrackerStatusUndefined: "undefined",
TrackerStatusClusterError: "cluster_error",
TrackerStatusPinError: "pin_error",
TrackerStatusUnpinError: "unpin_error",
TrackerStatusError: "error",
TrackerStatusPinned: "pinned",
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
TrackerStatusUndefined: "undefined",
TrackerStatusClusterError: "cluster_error",
TrackerStatusPinError: "pin_error",
TrackerStatusUnpinError: "unpin_error",
TrackerStatusError: "error",
TrackerStatusPinned: "pinned",
TrackerStatusPinning: "pinning",
TrackerStatusUnpinning: "unpinning",
TrackerStatusUnpinned: "unpinned",
TrackerStatusRemote: "remote",
TrackerStatusPinQueued: "pin_queued",
TrackerStatusUnpinQueued: "unpin_queued",
TrackerStatusQueued: "queued",
TrackerStatusSharded: "sharded",
TrackerStatusUnexpectedlyUnpinned: "unexpectedly_unpinned",
}

// values autofilled in init()
Expand Down Expand Up @@ -130,9 +135,11 @@ func (st TrackerStatus) String() string {

// Match returns true if the tracker status matches the given filter.
// For example TrackerStatusPinError will match TrackerStatusPinError
// and TrackerStatusError
// and TrackerStatusError.
func (st TrackerStatus) Match(filter TrackerStatus) bool {
return filter == 0 || st&filter > 0
return filter == TrackerStatusUndefined ||
st == TrackerStatusUndefined ||
st&filter > 0
}

// MarshalJSON uses the string representation of TrackerStatus for JSON
Expand Down
18 changes: 11 additions & 7 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1087,21 +1087,21 @@ func (c *Cluster) StateSync(ctx context.Context) error {
// StatusAll returns the GlobalPinInfo for all tracked Cids in all peers.
// If an error happens, the slice will contain as much information as
// could be fetched from other peers.
func (c *Cluster) StatusAll(ctx context.Context) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus) ([]*api.GlobalPinInfo, error) {
_, span := trace.StartSpan(ctx, "cluster/StatusAll")
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll")
return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll", filter)
}

// StatusAllLocal returns the PinInfo for all the tracked Cids in this peer.
func (c *Cluster) StatusAllLocal(ctx context.Context) []*api.PinInfo {
func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
_, span := trace.StartSpan(ctx, "cluster/StatusAllLocal")
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.tracker.StatusAll(ctx)
return c.tracker.StatusAll(ctx, filter)
}

// Status returns the GlobalPinInfo for a given Cid as fetched from all
Expand Down Expand Up @@ -1157,7 +1157,7 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error)
defer span.End()
ctx = trace.NewContext(c.ctx, span)

return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal")
return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal", nil)
}

// RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked
Expand Down Expand Up @@ -1824,10 +1824,14 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c
return gpin, nil
}

func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ([]*api.GlobalPinInfo, error) {
func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, arg interface{}) ([]*api.GlobalPinInfo, error) {
ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice")
defer span.End()

if arg == nil {
arg = struct{}{}
}

infos := make([]*api.GlobalPinInfo, 0)
fullMap := make(map[cid.Cid]*api.GlobalPinInfo)

Expand Down Expand Up @@ -1857,7 +1861,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) (
members,
comp,
method,
struct{}{},
arg,
rpcutil.CopyPinInfoSliceToIfaces(replies),
)

Expand Down
5 changes: 3 additions & 2 deletions ipfscluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ type PinTracker interface {
// Untrack tells the tracker that a Cid is to be forgotten. The tracker
// may perform an IPFS unpin operation.
Untrack(context.Context, cid.Cid) error
// StatusAll returns the list of pins with their local status.
StatusAll(context.Context) []*api.PinInfo
// StatusAll returns the list of pins with their local status. Takes a
// filter to specify which statuses to report.
StatusAll(context.Context, api.TrackerStatus) []*api.PinInfo
// Status returns the local status of a given Cid.
Status(context.Context, cid.Cid) *api.PinInfo
// RecoverAll calls Recover() for all pins tracked.
Expand Down
10 changes: 5 additions & 5 deletions ipfscluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,7 +655,7 @@ func TestClustersPin(t *testing.T) {
delay()
}
fpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
if v.Status != api.TrackerStatusPinned {
t.Errorf("%s should have been pinned but it is %s", v.Cid, v.Status)
Expand Down Expand Up @@ -704,7 +704,7 @@ func TestClustersPin(t *testing.T) {
delay()

funpinned := func(t *testing.T, c *Cluster) {
status := c.tracker.StatusAll(ctx)
status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
for _, v := range status {
t.Errorf("%s should have been unpinned but it is %s", v.Cid, v.Status)
}
Expand Down Expand Up @@ -848,7 +848,7 @@ func TestClustersStatusAll(t *testing.T) {
pinDelay()
// Global status
f := func(t *testing.T, c *Cluster) {
statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -910,7 +910,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) {
return
}

statuses, err := c.StatusAll(ctx)
statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined)
if err != nil {
t.Error(err)
}
Expand Down Expand Up @@ -1194,7 +1194,7 @@ func TestClustersReplicationOverall(t *testing.T) {

f := func(t *testing.T, c *Cluster) {
// confirm that the pintracker state matches the current global state
pinfos := c.tracker.StatusAll(ctx)
pinfos := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined)
if len(pinfos) != nClusters {
t.Error("Pinfos does not have the expected pins")
}
Expand Down
6 changes: 3 additions & 3 deletions pintracker/pintracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
// in state but not on IPFS
Cid: test.Cid4,
PinInfoShort: api.PinInfoShort{
Status: api.TrackerStatusPinError,
Status: api.TrackerStatusUnexpectedlyUnpinned,
},
},
},
Expand All @@ -216,7 +216,7 @@ func TestPinTracker_StatusAll(t *testing.T) {
t.Errorf("PinTracker.Track() error = %v", err)
}
time.Sleep(200 * time.Millisecond)
got := tt.args.tracker.StatusAll(context.Background())
got := tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
if len(got) != len(tt.want) {
for _, pi := range got {
t.Logf("pinfo: %v", pi)
Expand Down Expand Up @@ -259,7 +259,7 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) {
b.Run(tt.name, func(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
tt.args.tracker.StatusAll(context.Background())
tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined)
}
})
}
Expand Down
71 changes: 43 additions & 28 deletions pintracker/stateless/stateless.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,24 +271,31 @@ func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error {
}

// StatusAll returns information for all Cids pinned to the local IPFS node.
func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo {
func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll")
defer span.End()

pininfos, err := spt.localStatus(ctx, true)
pininfos, err := spt.localStatus(ctx, true, filter)
if err != nil {
return nil
}

// get all inflight operations from optracker and put them into the
// map, deduplicating any existing items with their inflight operation.
//
// we cannot filter in GetAll, because we are meant to replace items in
// pininfos and set the correct status, as otherwise they will remain in
// PinError.
for _, infop := range spt.optracker.GetAll(ctx) {
pininfos[infop.Cid] = infop
}

var pis []*api.PinInfo
for _, pi := range pininfos {
pis = append(pis, pi)
// Last filter.
if pi.Status.Match(filter) {
pis = append(pis, pi)
}
}
return pis
}
Expand Down Expand Up @@ -382,7 +389,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll")
defer span.End()

statuses := spt.StatusAll(ctx)
statuses := spt.StatusAll(ctx, api.TrackerStatusUndefined)
resp := make([]*api.PinInfo, 0)
for _, st := range statuses {
r, err := spt.recoverWithPinInfo(ctx, st)
Expand Down Expand Up @@ -412,7 +419,7 @@ func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error
func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) {
var err error
switch pi.Status {
case api.TrackerStatusPinError:
case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned:
logger.Infof("Restarting pin operation for %s", pi.Cid)
err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin)
case api.TrackerStatusUnpinError:
Expand Down Expand Up @@ -465,11 +472,12 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo
return pins, nil
}

// localStatus returns a joint set of consensusState and ipfsStatus
// marking pins which should be meta or remote and leaving any ipfs pins that
// aren't in the consensusState out. If incExtra is true, Remote and Sharded
// pins will be added to the status slice.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid]*api.PinInfo, error) {
// localStatus returns a joint set of consensusState and ipfsStatus marking
// pins which should be meta or remote and leaving any ipfs pins that aren't
// in the consensusState out. If incExtra is true, Remote and Sharded pins
// will be added to the status slice. If a filter is provided, only statuses
// matching the filter will be returned.
func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.TrackerStatus) (map[cid.Cid]*api.PinInfo, error) {
ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus")
defer span.End()

Expand All @@ -486,11 +494,14 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid
return nil, err
}

// get statuses from ipfs node first
localpis, err := spt.ipfsStatusAll(ctx)
if err != nil {
logger.Error(err)
return nil, err
var localpis map[cid.Cid]*api.PinInfo
// Only query IPFS if we want to status for pinned items
if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) {
localpis, err = spt.ipfsStatusAll(ctx)
if err != nil {
logger.Error(err)
return nil, err
}
}

pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins))
Expand All @@ -509,25 +520,29 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid

switch {
case p.Type == api.MetaType:
pinInfo.Status = api.TrackerStatusSharded
if incExtra {
pininfos[p.Cid] = &pinInfo
if !incExtra || !filter.Match(api.TrackerStatusSharded) {
continue
}
pinInfo.Status = api.TrackerStatusSharded
pininfos[p.Cid] = &pinInfo
case p.IsRemotePin(spt.peerID):
pinInfo.Status = api.TrackerStatusRemote
if incExtra {
pininfos[p.Cid] = &pinInfo
if !incExtra || !filter.Match(api.TrackerStatusRemote) {
continue
}
case pinnedInIpfs:
pinInfo.Status = api.TrackerStatusRemote
pininfos[p.Cid] = &pinInfo
case pinnedInIpfs: // always false unless filter matches TrackerStatusPinnned
ipfsInfo.Name = p.Name
pininfos[p.Cid] = ipfsInfo
default:
// report as PIN_ERROR for this peer. this will be
// overwritten if the operation tracker has more info
// for this (an ongoing pinning operation). Otherwise,
// it means something should be pinned and it is not
// known by IPFS. Should be handled to "recover".
pinInfo.Status = api.TrackerStatusPinError
// report as UNEXPECTEDLY_UNPINNED for this peer.
// this will be overwritten if the operation tracker
// has more info for this (an ongoing pinning
// operation). Otherwise, it means something should be
// pinned and it is not known by IPFS. Should be
// handled to "recover".

pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned
pinInfo.Error = errUnexpectedlyUnpinned.Error()
pininfos[p.Cid] = &pinInfo
}
Expand Down

0 comments on commit 1393eeb

Please sign in to comment.