From 1393eeb7d4284a348e04d5e6f64c470d9dfe44e4 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Sun, 4 Jul 2021 23:40:15 +0200 Subject: [PATCH] Fix #1360: Efficient pinset status with filters This commit modifies the pintracker StatusAll call to take a status filter. This allows to skip a PinLs call to ipfs when checking status for items that are queued, pinning, unpinning or in error. Those status come directly from the operation tracker. This should result in a significant performance increase for those calls, particularly in nodes with several hundred thousand pins and more, where the call to IPFS is very expensive. A new TrackerStatusUnexpectedlyUnpinned status has been introduce to differentiate between pin errors (tracked by the operation tracker) and "lost" items (which before were pin errors too). This new status is handled by the Recover() operation as before. --- api/rest/restapi.go | 30 +---------- api/types.go | 37 ++++++++------ cluster.go | 18 ++++--- ipfscluster.go | 5 +- ipfscluster_test.go | 10 ++-- pintracker/pintracker_test.go | 6 +-- pintracker/stateless/stateless.go | 71 ++++++++++++++++---------- pintracker/stateless/stateless_test.go | 4 +- rpc_api.go | 12 ++--- test/rpc_api_mock.go | 38 +++++++++++--- 10 files changed, 129 insertions(+), 102 deletions(-) diff --git a/api/rest/restapi.go b/api/rest/restapi.go index 3c52ff348..cb299294e 100644 --- a/api/rest/restapi.go +++ b/api/rest/restapi.go @@ -891,30 +891,6 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) { } } -// filterGlobalPinInfos takes a GlobalPinInfo slice and discards -// any item in it which does not carry a PinInfo matching the -// filter (OR-wise). -func filterGlobalPinInfos(globalPinInfos []*types.GlobalPinInfo, filter types.TrackerStatus) []*types.GlobalPinInfo { - if filter == types.TrackerStatusUndefined { - return globalPinInfos - } - - var filteredGlobalPinInfos []*types.GlobalPinInfo - - for _, globalPinInfo := range globalPinInfos { - for _, pinInfo := range globalPinInfo.PeerMap { - // silenced the error because we should have detected - // earlier if filters were invalid - if pinInfo.Status.Match(filter) { - filteredGlobalPinInfos = append(filteredGlobalPinInfos, globalPinInfo) - break - } - } - } - - return filteredGlobalPinInfos -} - func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { queryValues := r.URL.Query() local := queryValues.Get("local") @@ -936,7 +912,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { "", "Cluster", "StatusAllLocal", - struct{}{}, + filter, &pinInfos, ) if err != nil { @@ -950,7 +926,7 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { "", "Cluster", "StatusAll", - struct{}{}, + filter, &globalPinInfos, ) if err != nil { @@ -959,8 +935,6 @@ func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) { } } - globalPinInfos = filterGlobalPinInfos(globalPinInfos, filter) - api.sendResponse(w, autoStatus, nil, globalPinInfos) } diff --git a/api/types.go b/api/types.go index 4de6b091e..ad565b9b5 100644 --- a/api/types.go +++ b/api/types.go @@ -77,6 +77,9 @@ const ( // The IPFS daemon is not pinning the item through this cid but it is // tracked in a cluster dag TrackerStatusSharded + // The item is in the state and should be pinned, but + // it is however not pinned and not queued/pinning. + TrackerStatusUnexpectedlyUnpinned ) // Composite TrackerStatus. @@ -89,19 +92,21 @@ const ( type TrackerStatus int var trackerStatusString = map[TrackerStatus]string{ - TrackerStatusUndefined: "undefined", - TrackerStatusClusterError: "cluster_error", - TrackerStatusPinError: "pin_error", - TrackerStatusUnpinError: "unpin_error", - TrackerStatusError: "error", - TrackerStatusPinned: "pinned", - TrackerStatusPinning: "pinning", - TrackerStatusUnpinning: "unpinning", - TrackerStatusUnpinned: "unpinned", - TrackerStatusRemote: "remote", - TrackerStatusPinQueued: "pin_queued", - TrackerStatusUnpinQueued: "unpin_queued", - TrackerStatusQueued: "queued", + TrackerStatusUndefined: "undefined", + TrackerStatusClusterError: "cluster_error", + TrackerStatusPinError: "pin_error", + TrackerStatusUnpinError: "unpin_error", + TrackerStatusError: "error", + TrackerStatusPinned: "pinned", + TrackerStatusPinning: "pinning", + TrackerStatusUnpinning: "unpinning", + TrackerStatusUnpinned: "unpinned", + TrackerStatusRemote: "remote", + TrackerStatusPinQueued: "pin_queued", + TrackerStatusUnpinQueued: "unpin_queued", + TrackerStatusQueued: "queued", + TrackerStatusSharded: "sharded", + TrackerStatusUnexpectedlyUnpinned: "unexpectedly_unpinned", } // values autofilled in init() @@ -130,9 +135,11 @@ func (st TrackerStatus) String() string { // Match returns true if the tracker status matches the given filter. // For example TrackerStatusPinError will match TrackerStatusPinError -// and TrackerStatusError +// and TrackerStatusError. func (st TrackerStatus) Match(filter TrackerStatus) bool { - return filter == 0 || st&filter > 0 + return filter == TrackerStatusUndefined || + st == TrackerStatusUndefined || + st&filter > 0 } // MarshalJSON uses the string representation of TrackerStatus for JSON diff --git a/cluster.go b/cluster.go index 59c237cb1..67c566541 100644 --- a/cluster.go +++ b/cluster.go @@ -1087,21 +1087,21 @@ func (c *Cluster) StateSync(ctx context.Context) error { // StatusAll returns the GlobalPinInfo for all tracked Cids in all peers. // If an error happens, the slice will contain as much information as // could be fetched from other peers. -func (c *Cluster) StatusAll(ctx context.Context) ([]*api.GlobalPinInfo, error) { +func (c *Cluster) StatusAll(ctx context.Context, filter api.TrackerStatus) ([]*api.GlobalPinInfo, error) { _, span := trace.StartSpan(ctx, "cluster/StatusAll") defer span.End() ctx = trace.NewContext(c.ctx, span) - return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll") + return c.globalPinInfoSlice(ctx, "PinTracker", "StatusAll", filter) } // StatusAllLocal returns the PinInfo for all the tracked Cids in this peer. -func (c *Cluster) StatusAllLocal(ctx context.Context) []*api.PinInfo { +func (c *Cluster) StatusAllLocal(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo { _, span := trace.StartSpan(ctx, "cluster/StatusAllLocal") defer span.End() ctx = trace.NewContext(c.ctx, span) - return c.tracker.StatusAll(ctx) + return c.tracker.StatusAll(ctx, filter) } // Status returns the GlobalPinInfo for a given Cid as fetched from all @@ -1157,7 +1157,7 @@ func (c *Cluster) RecoverAll(ctx context.Context) ([]*api.GlobalPinInfo, error) defer span.End() ctx = trace.NewContext(c.ctx, span) - return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal") + return c.globalPinInfoSlice(ctx, "Cluster", "RecoverAllLocal", nil) } // RecoverAllLocal triggers a RecoverLocal operation for all Cids tracked @@ -1824,10 +1824,14 @@ func (c *Cluster) globalPinInfoCid(ctx context.Context, comp, method string, h c return gpin, nil } -func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ([]*api.GlobalPinInfo, error) { +func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string, arg interface{}) ([]*api.GlobalPinInfo, error) { ctx, span := trace.StartSpan(ctx, "cluster/globalPinInfoSlice") defer span.End() + if arg == nil { + arg = struct{}{} + } + infos := make([]*api.GlobalPinInfo, 0) fullMap := make(map[cid.Cid]*api.GlobalPinInfo) @@ -1857,7 +1861,7 @@ func (c *Cluster) globalPinInfoSlice(ctx context.Context, comp, method string) ( members, comp, method, - struct{}{}, + arg, rpcutil.CopyPinInfoSliceToIfaces(replies), ) diff --git a/ipfscluster.go b/ipfscluster.go index 92af0cd9b..e09027deb 100644 --- a/ipfscluster.go +++ b/ipfscluster.go @@ -119,8 +119,9 @@ type PinTracker interface { // Untrack tells the tracker that a Cid is to be forgotten. The tracker // may perform an IPFS unpin operation. Untrack(context.Context, cid.Cid) error - // StatusAll returns the list of pins with their local status. - StatusAll(context.Context) []*api.PinInfo + // StatusAll returns the list of pins with their local status. Takes a + // filter to specify which statuses to report. + StatusAll(context.Context, api.TrackerStatus) []*api.PinInfo // Status returns the local status of a given Cid. Status(context.Context, cid.Cid) *api.PinInfo // RecoverAll calls Recover() for all pins tracked. diff --git a/ipfscluster_test.go b/ipfscluster_test.go index 1217c9a38..42a93233d 100644 --- a/ipfscluster_test.go +++ b/ipfscluster_test.go @@ -655,7 +655,7 @@ func TestClustersPin(t *testing.T) { delay() } fpinned := func(t *testing.T, c *Cluster) { - status := c.tracker.StatusAll(ctx) + status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined) for _, v := range status { if v.Status != api.TrackerStatusPinned { t.Errorf("%s should have been pinned but it is %s", v.Cid, v.Status) @@ -704,7 +704,7 @@ func TestClustersPin(t *testing.T) { delay() funpinned := func(t *testing.T, c *Cluster) { - status := c.tracker.StatusAll(ctx) + status := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined) for _, v := range status { t.Errorf("%s should have been unpinned but it is %s", v.Cid, v.Status) } @@ -848,7 +848,7 @@ func TestClustersStatusAll(t *testing.T) { pinDelay() // Global status f := func(t *testing.T, c *Cluster) { - statuses, err := c.StatusAll(ctx) + statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined) if err != nil { t.Error(err) } @@ -910,7 +910,7 @@ func TestClustersStatusAllWithErrors(t *testing.T) { return } - statuses, err := c.StatusAll(ctx) + statuses, err := c.StatusAll(ctx, api.TrackerStatusUndefined) if err != nil { t.Error(err) } @@ -1194,7 +1194,7 @@ func TestClustersReplicationOverall(t *testing.T) { f := func(t *testing.T, c *Cluster) { // confirm that the pintracker state matches the current global state - pinfos := c.tracker.StatusAll(ctx) + pinfos := c.tracker.StatusAll(ctx, api.TrackerStatusUndefined) if len(pinfos) != nClusters { t.Error("Pinfos does not have the expected pins") } diff --git a/pintracker/pintracker_test.go b/pintracker/pintracker_test.go index c04ae5e77..9dd3e2937 100644 --- a/pintracker/pintracker_test.go +++ b/pintracker/pintracker_test.go @@ -204,7 +204,7 @@ func TestPinTracker_StatusAll(t *testing.T) { // in state but not on IPFS Cid: test.Cid4, PinInfoShort: api.PinInfoShort{ - Status: api.TrackerStatusPinError, + Status: api.TrackerStatusUnexpectedlyUnpinned, }, }, }, @@ -216,7 +216,7 @@ func TestPinTracker_StatusAll(t *testing.T) { t.Errorf("PinTracker.Track() error = %v", err) } time.Sleep(200 * time.Millisecond) - got := tt.args.tracker.StatusAll(context.Background()) + got := tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined) if len(got) != len(tt.want) { for _, pi := range got { t.Logf("pinfo: %v", pi) @@ -259,7 +259,7 @@ func BenchmarkPinTracker_StatusAll(b *testing.B) { b.Run(tt.name, func(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { - tt.args.tracker.StatusAll(context.Background()) + tt.args.tracker.StatusAll(context.Background(), api.TrackerStatusUndefined) } }) } diff --git a/pintracker/stateless/stateless.go b/pintracker/stateless/stateless.go index b76203ebb..d40acfa42 100644 --- a/pintracker/stateless/stateless.go +++ b/pintracker/stateless/stateless.go @@ -271,24 +271,31 @@ func (spt *Tracker) Untrack(ctx context.Context, c cid.Cid) error { } // StatusAll returns information for all Cids pinned to the local IPFS node. -func (spt *Tracker) StatusAll(ctx context.Context) []*api.PinInfo { +func (spt *Tracker) StatusAll(ctx context.Context, filter api.TrackerStatus) []*api.PinInfo { ctx, span := trace.StartSpan(ctx, "tracker/stateless/StatusAll") defer span.End() - pininfos, err := spt.localStatus(ctx, true) + pininfos, err := spt.localStatus(ctx, true, filter) if err != nil { return nil } // get all inflight operations from optracker and put them into the // map, deduplicating any existing items with their inflight operation. + // + // we cannot filter in GetAll, because we are meant to replace items in + // pininfos and set the correct status, as otherwise they will remain in + // PinError. for _, infop := range spt.optracker.GetAll(ctx) { pininfos[infop.Cid] = infop } var pis []*api.PinInfo for _, pi := range pininfos { - pis = append(pis, pi) + // Last filter. + if pi.Status.Match(filter) { + pis = append(pis, pi) + } } return pis } @@ -382,7 +389,7 @@ func (spt *Tracker) RecoverAll(ctx context.Context) ([]*api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/RecoverAll") defer span.End() - statuses := spt.StatusAll(ctx) + statuses := spt.StatusAll(ctx, api.TrackerStatusUndefined) resp := make([]*api.PinInfo, 0) for _, st := range statuses { r, err := spt.recoverWithPinInfo(ctx, st) @@ -412,7 +419,7 @@ func (spt *Tracker) Recover(ctx context.Context, c cid.Cid) (*api.PinInfo, error func (spt *Tracker) recoverWithPinInfo(ctx context.Context, pi *api.PinInfo) (*api.PinInfo, error) { var err error switch pi.Status { - case api.TrackerStatusPinError: + case api.TrackerStatusPinError, api.TrackerStatusUnexpectedlyUnpinned: logger.Infof("Restarting pin operation for %s", pi.Cid) err = spt.enqueue(ctx, api.PinCid(pi.Cid), optracker.OperationPin) case api.TrackerStatusUnpinError: @@ -465,11 +472,12 @@ func (spt *Tracker) ipfsStatusAll(ctx context.Context) (map[cid.Cid]*api.PinInfo return pins, nil } -// localStatus returns a joint set of consensusState and ipfsStatus -// marking pins which should be meta or remote and leaving any ipfs pins that -// aren't in the consensusState out. If incExtra is true, Remote and Sharded -// pins will be added to the status slice. -func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid]*api.PinInfo, error) { +// localStatus returns a joint set of consensusState and ipfsStatus marking +// pins which should be meta or remote and leaving any ipfs pins that aren't +// in the consensusState out. If incExtra is true, Remote and Sharded pins +// will be added to the status slice. If a filter is provided, only statuses +// matching the filter will be returned. +func (spt *Tracker) localStatus(ctx context.Context, incExtra bool, filter api.TrackerStatus) (map[cid.Cid]*api.PinInfo, error) { ctx, span := trace.StartSpan(ctx, "tracker/stateless/localStatus") defer span.End() @@ -486,11 +494,14 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid return nil, err } - // get statuses from ipfs node first - localpis, err := spt.ipfsStatusAll(ctx) - if err != nil { - logger.Error(err) - return nil, err + var localpis map[cid.Cid]*api.PinInfo + // Only query IPFS if we want to status for pinned items + if filter.Match(api.TrackerStatusPinned | api.TrackerStatusUnexpectedlyUnpinned) { + localpis, err = spt.ipfsStatusAll(ctx) + if err != nil { + logger.Error(err) + return nil, err + } } pininfos := make(map[cid.Cid]*api.PinInfo, len(statePins)) @@ -509,25 +520,29 @@ func (spt *Tracker) localStatus(ctx context.Context, incExtra bool) (map[cid.Cid switch { case p.Type == api.MetaType: - pinInfo.Status = api.TrackerStatusSharded - if incExtra { - pininfos[p.Cid] = &pinInfo + if !incExtra || !filter.Match(api.TrackerStatusSharded) { + continue } + pinInfo.Status = api.TrackerStatusSharded + pininfos[p.Cid] = &pinInfo case p.IsRemotePin(spt.peerID): - pinInfo.Status = api.TrackerStatusRemote - if incExtra { - pininfos[p.Cid] = &pinInfo + if !incExtra || !filter.Match(api.TrackerStatusRemote) { + continue } - case pinnedInIpfs: + pinInfo.Status = api.TrackerStatusRemote + pininfos[p.Cid] = &pinInfo + case pinnedInIpfs: // always false unless filter matches TrackerStatusPinnned ipfsInfo.Name = p.Name pininfos[p.Cid] = ipfsInfo default: - // report as PIN_ERROR for this peer. this will be - // overwritten if the operation tracker has more info - // for this (an ongoing pinning operation). Otherwise, - // it means something should be pinned and it is not - // known by IPFS. Should be handled to "recover". - pinInfo.Status = api.TrackerStatusPinError + // report as UNEXPECTEDLY_UNPINNED for this peer. + // this will be overwritten if the operation tracker + // has more info for this (an ongoing pinning + // operation). Otherwise, it means something should be + // pinned and it is not known by IPFS. Should be + // handled to "recover". + + pinInfo.Status = api.TrackerStatusUnexpectedlyUnpinned pinInfo.Error = errUnexpectedlyUnpinned.Error() pininfos[p.Cid] = &pinInfo } diff --git a/pintracker/stateless/stateless_test.go b/pintracker/stateless/stateless_test.go index 6925ca19a..0d639a1da 100644 --- a/pintracker/stateless/stateless_test.go +++ b/pintracker/stateless/stateless_test.go @@ -397,7 +397,7 @@ func TestStatusAll(t *testing.T) { // * A slow CID pinning // * Cid1 is pinned // * Cid4 should be in PinError (it's in the state but not on IPFS) - stAll := spt.StatusAll(ctx) + stAll := spt.StatusAll(ctx, api.TrackerStatusUndefined) if len(stAll) != 3 { t.Errorf("wrong status length. Expected 3, got: %d", len(stAll)) } @@ -471,6 +471,6 @@ func BenchmarkTracker_localStatus(b *testing.B) { ctx := context.Background() b.ResetTimer() for i := 0; i < b.N; i++ { - tracker.localStatus(ctx, true) + tracker.localStatus(ctx, true, api.TrackerStatusUndefined) } } diff --git a/rpc_api.go b/rpc_api.go index f148c6685..f472446d9 100644 --- a/rpc_api.go +++ b/rpc_api.go @@ -263,8 +263,8 @@ func (rpcapi *ClusterRPCAPI) Join(ctx context.Context, in api.Multiaddr, out *st } // StatusAll runs Cluster.StatusAll(). -func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - pinfos, err := rpcapi.c.StatusAll(ctx) +func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.GlobalPinInfo) error { + pinfos, err := rpcapi.c.StatusAll(ctx, in) if err != nil { return err } @@ -273,8 +273,8 @@ func (rpcapi *ClusterRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[] } // StatusAllLocal runs Cluster.StatusAllLocal(). -func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - pinfos := rpcapi.c.StatusAllLocal(ctx) +func (rpcapi *ClusterRPCAPI) StatusAllLocal(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error { + pinfos := rpcapi.c.StatusAllLocal(ctx, in) *out = pinfos return nil } @@ -452,10 +452,10 @@ func (rpcapi *PinTrackerRPCAPI) Untrack(ctx context.Context, in *api.Pin, out *s } // StatusAll runs PinTracker.StatusAll(). -func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { +func (rpcapi *PinTrackerRPCAPI) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error { ctx, span := trace.StartSpan(ctx, "rpc/tracker/StatusAll") defer span.End() - *out = rpcapi.tracker.StatusAll(ctx) + *out = rpcapi.tracker.StatusAll(ctx, in) return nil } diff --git a/test/rpc_api_mock.go b/test/rpc_api_mock.go index e11415b62..188773c38 100644 --- a/test/rpc_api_mock.go +++ b/test/rpc_api_mock.go @@ -219,9 +219,9 @@ func (mock *mockCluster) ConnectGraph(ctx context.Context, in struct{}, out *api return nil } -func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { +func (mock *mockCluster) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.GlobalPinInfo) error { pid := peer.Encode(PeerID1) - *out = []*api.GlobalPinInfo{ + gPinInfos := []*api.GlobalPinInfo{ { Cid: Cid1, PeerMap: map[string]*api.PinInfoShort{ @@ -250,10 +250,28 @@ func (mock *mockCluster) StatusAll(ctx context.Context, in struct{}, out *[]*api }, }, } + // If there is no filter match, we will not return that status and we + // will not have an entry for that peer in the peerMap. In turn, when + // a single peer, we will not have an entry for the cid at all. + for _, gpi := range gPinInfos { + for id, pi := range gpi.PeerMap { + if !in.Match(pi.Status) { + delete(gpi.PeerMap, id) + } + } + } + filtered := make([]*api.GlobalPinInfo, 0, len(gPinInfos)) + for _, gpi := range gPinInfos { + if len(gpi.PeerMap) > 0 { + filtered = append(filtered, gpi) + } + } + *out = filtered + return nil } -func (mock *mockCluster) StatusAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { +func (mock *mockCluster) StatusAllLocal(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error { return (&mockPinTracker{}).StatusAll(ctx, in, out) } @@ -278,7 +296,7 @@ func (mock *mockCluster) StatusLocal(ctx context.Context, in cid.Cid, out *api.P } func (mock *mockCluster) RecoverAll(ctx context.Context, in struct{}, out *[]*api.GlobalPinInfo) error { - return mock.StatusAll(ctx, in, out) + return mock.StatusAll(ctx, api.TrackerStatusUndefined, out) } func (mock *mockCluster) RecoverAllLocal(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { @@ -367,8 +385,8 @@ func (mock *mockPinTracker) Untrack(ctx context.Context, in *api.Pin, out *struc return nil } -func (mock *mockPinTracker) StatusAll(ctx context.Context, in struct{}, out *[]*api.PinInfo) error { - *out = []*api.PinInfo{ +func (mock *mockPinTracker) StatusAll(ctx context.Context, in api.TrackerStatus, out *[]*api.PinInfo) error { + pinInfos := []*api.PinInfo{ { Cid: Cid1, Peer: PeerID1, @@ -386,6 +404,14 @@ func (mock *mockPinTracker) StatusAll(ctx context.Context, in struct{}, out *[]* }, }, } + filtered := make([]*api.PinInfo, 0, len(pinInfos)) + for _, pi := range pinInfos { + if in.Match(pi.Status) { + filtered = append(filtered, pi) + } + } + + *out = filtered return nil }