Skip to content

Commit

Permalink
Status filters for ipfs-cluster-ctl status
Browse files Browse the repository at this point in the history
Added filter option to `ipfs-cluster-ctl status`

When the --filter is passed, it will only fetch the peer information
where status of the pin matches with the filter value.
Valid filter values are tracker status types(i.e., "pinned",
"pin_error", "unpinning" etc), an alias of tracker status type (i.e.,
"queued" or "error"), comma separated list of tracker status type
and/or it aliases(i.e., "error,pinning")

On passing invalid filter value no status information will be shown

In particular, the filter would remove elements from []GlobalPinInfo
when none of the peers in GlobalPinInfo match the filter. If one peer
in the GlobalPinInfo matches the filter, the whole object is returned,
including the information for the other peers which may or not match it.

filter option works on statusAll("GET /pins"). For fetching pin status
for a CID("GET /pins/<cid>"), filter option would have no effect

License: MIT
Signed-off-by: Kishan Mohanbhai Sagathiya <kishansagathiya@gmail.com>
  • Loading branch information
kishansagathiya committed Dec 20, 2018
1 parent ccde6eb commit 0d2fb75
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 91 deletions.
5 changes: 1 addition & 4 deletions api/rest/client/client.go
Expand Up @@ -70,7 +70,7 @@ type Client interface {
// is fetched from all cluster peers.
Status(ci cid.Cid, local bool) (api.GlobalPinInfo, error)
// StatusAll gathers Status() for all tracked items.
StatusAll(local bool) ([]api.GlobalPinInfo, error)
StatusAll(filter string, local bool) ([]api.GlobalPinInfo, error)

// Sync makes sure the state of a Cid corresponds to the state reported
// by the ipfs daemon, and returns it. If local is true, this operation
Expand Down Expand Up @@ -155,9 +155,6 @@ type Config struct {

// LogLevel defines the verbosity of the logging facility
LogLevel string

// List of filters for limiting results from ipfs-cluster-ctl
Filter string
}

// DefaultClient provides methods to interact with the ipfs-cluster API. Use
Expand Down
8 changes: 6 additions & 2 deletions api/rest/client/methods.go
Expand Up @@ -136,9 +136,13 @@ func (c *defaultClient) Status(ci cid.Cid, local bool) (api.GlobalPinInfo, error
}

// StatusAll gathers Status() for all tracked items.
func (c *defaultClient) StatusAll(local bool) ([]api.GlobalPinInfo, error) {
// If valid filter value is provided, it would fetch only those status information
// where status is matching the filter value
// Valid filter values are tracker status type, an alias of tracker status type
// (queued or error), comma separated list of track status type and/or it aliases
func (c *defaultClient) StatusAll(filter string, local bool) ([]api.GlobalPinInfo, error) {
var gpis []api.GlobalPinInfoSerial
err := c.do("GET", fmt.Sprintf("/pins?local=%t&filter=%s", local, c.config.Filter), nil, nil, &gpis)
err := c.do("GET", fmt.Sprintf("/pins?local=%t&filter=%s", local, filter), nil, nil, &gpis)
result := make([]api.GlobalPinInfo, len(gpis))
for i, p := range gpis {
result[i] = p.ToGlobalPinInfo()
Expand Down
2 changes: 1 addition & 1 deletion api/rest/client/methods_test.go
Expand Up @@ -219,7 +219,7 @@ func TestStatusAll(t *testing.T) {
defer shutdown(api)

testF := func(t *testing.T, c Client) {
pins, err := c.StatusAll(false)
pins, err := c.StatusAll("", false)
if err != nil {
t.Fatal(err)
}
Expand Down
89 changes: 39 additions & 50 deletions api/rest/restapi.go
Expand Up @@ -669,85 +669,74 @@ func (api *API) allocationHandler(w http.ResponseWriter, r *http.Request) {
}
}

// defined groups / aliases for filtering
var filterGroups = map[string][]string{
"error": {"error", "pin_error", "unpin_error", "cluster_error"},
"queued": {"queued", "pin_queued", "unpin_queued"},
}

func parseAllFilters(rawFilters string) map[string]bool {
// given the initial string of filters, return a slice of all individual strings
// to match on. This includes replacing alises
rawFilterList := strings.Split(strings.ToLower(rawFilters), ",")
var filterList = map[string]bool{}

for _, rawFilter := range rawFilterList {
if aliasSlice, exists := filterGroups[rawFilter]; exists {
for _, alias := range aliasSlice {
filterList[alias] = true
}
} else {
filterList[rawFilter] = true
func match(filters []string, status string) bool {
for _, filter := range filters {
if status == filter {
return true
}
}
return filterList

}

func filterStatusAll(filter string, pinInfos []types.GlobalPinInfoSerial) []types.GlobalPinInfoSerial {
if filter == "" {
return pinInfos
}
filterList := parseAllFilters(filter)
var filtered []types.GlobalPinInfoSerial
NEXTPI:
for _, pi := range pinInfos {
for _, entry := range pi.PeerMap {
if _, exists := filterList[entry.Status]; exists {
filtered = append(filtered, pi)
continue NEXTPI
if filter == "queued" || filter == "error" {
if strings.Contains(status, filter) {
return true
}
}
}
return filtered
return false
}

func filterStatus(filter string, pinInfos []types.PinInfoSerial) []types.PinInfoSerial {
func globalPinInfosByStatus(filter string, globalPinInfos []types.GlobalPinInfoSerial) []types.GlobalPinInfoSerial {
if filter == "" {
return pinInfos
return globalPinInfos
}
filterList := parseAllFilters(filter)
for i := len(pinInfos) - 1; i >= 0; i-- {
if _, exists := filterList[pinInfos[i].Status]; !exists {
pinInfos = append(pinInfos[:i], pinInfos[i+1:]...)
filters := strings.Split(strings.ToLower(filter), ",")

var filteredGlobalPinInfos []types.GlobalPinInfoSerial

for _, globalPinInfo := range globalPinInfos {
for _, pinInfo := range globalPinInfo.PeerMap {
if match(filters, pinInfo.Status) {
filteredGlobalPinInfos = append(filteredGlobalPinInfos, globalPinInfo)
break
}
}
}
return pinInfos

return filteredGlobalPinInfos
}

func (api *API) statusAllHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
local := queryValues.Get("local")

var globalPinInfos []types.GlobalPinInfoSerial

if local == "true" {
var pinInfos []types.PinInfoSerial

err := api.rpcClient.Call("",
"Cluster",
"StatusAllLocal",
struct{}{},
&pinInfos)
pinInfos = filterStatus(queryValues.Get("filter"), pinInfos)
api.sendResponse(w, autoStatus, err, pinInfosToGlobal(pinInfos))
if err != nil {
api.sendResponse(w, autoStatus, err, nil)
return
}
globalPinInfos = pinInfosToGlobal(pinInfos)
} else {
var pinInfos []types.GlobalPinInfoSerial
err := api.rpcClient.Call("",
"Cluster",
"StatusAll",
struct{}{},
&pinInfos)
pinInfos = filterStatusAll(queryValues.Get("filter"), pinInfos)
api.sendResponse(w, autoStatus, err, pinInfos)
&globalPinInfos)
if err != nil {
api.sendResponse(w, autoStatus, err, nil)
return
}
}

globalPinInfos = globalPinInfosByStatus(queryValues.Get("filter"), globalPinInfos)

api.sendResponse(w, autoStatus, nil, globalPinInfos)
}

func (api *API) statusHandler(w http.ResponseWriter, r *http.Request) {
Expand Down
55 changes: 32 additions & 23 deletions api/rest/restapi_test.go
Expand Up @@ -573,7 +573,38 @@ func TestAPIStatusAllEndpoint(t *testing.T) {
var resp2 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?local=true", &resp2)
if len(resp2) != 2 {
t.Errorf("unexpected statusAll+local resp:\n %+v", resp)
t.Errorf("unexpected statusAll+local resp:\n %+v", resp2)
}

// Test with filter
var resp3 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=queue", &resp3)
if len(resp3) != 0 {
t.Errorf("unexpected statusAll+filter=queue resp:\n %+v", resp3)
}

var resp4 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=pinned", &resp4)
if len(resp4) != 1 {
t.Errorf("unexpected statusAll+filter=pinned resp:\n %+v", resp4)
}

var resp5 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=pin_error", &resp5)
if len(resp5) != 1 {
t.Errorf("unexpected statusAll+filter=pin_error resp:\n %+v", resp5)
}

var resp6 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=error", &resp6)
if len(resp6) != 1 {
t.Errorf("unexpected statusAll+filter=error resp:\n %+v", resp6)
}

var resp7 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=error,pinned", &resp7)
if len(resp7) != 2 {
t.Errorf("unexpected statusAll+filter=error,pinned resp:\n %+v", resp7)
}
}

Expand Down Expand Up @@ -618,28 +649,6 @@ func TestAPIStatusEndpoint(t *testing.T) {
testBothEndpoints(t, tf)
}

func TestAPIStatusAllFilterEndpoint(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()

tf := func(t *testing.T, url urlF) {
var resp []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?filter=error", &resp)
if len(resp) != 1 {
t.Errorf("unexpected statusAll resp:\n %+v", resp)
}

// Test local=true
var resp2 []api.GlobalPinInfoSerial
makeGet(t, rest, url(rest)+"/pins?local=true&filter=error", &resp2)
if len(resp2) != 1 {
t.Errorf("unexpected statusAll+local resp:\n %+v \n %d", resp2, len(resp2))
}
}

testBothEndpoints(t, tf)
}

func TestAPISyncAllEndpoint(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()
Expand Down
26 changes: 15 additions & 11 deletions cmd/ipfs-cluster-ctl/main.go
Expand Up @@ -98,11 +98,6 @@ func main() {
Value: "",
Usage: "cluster secret (32 byte pnet-key) as needed. Only when using the LibP2P endpoint",
},
cli.StringFlag{
Name: "filter",
Value: "",
Usage: "filter for the 'status' command, may be one of status flags or a comma separated list",
},
cli.BoolFlag{
Name: "https, s",
Usage: "use https to connect to the API",
Expand Down Expand Up @@ -177,11 +172,6 @@ requires authorization. implies --https, which you can disable with --force-http
checkErr("", errors.New("unsupported encoding"))
}

filter := c.String("filter")
if filter != "" {
cfg.Filter = filter
}

globalClient, err = client.NewDefaultClient(cfg)
checkErr("creating API client", err)
return nil
Expand Down Expand Up @@ -624,10 +614,24 @@ with "sync".
When the --local flag is passed, it will only fetch the status from the
contacted cluster peer. By default, status will be fetched from all peers.
When the --filter is passed, it will only fetch the peer information
where status of the pin matches with the filter value.
Valid filter values are tracker status types("pinned", "pin_error", "unpinning" etc),
an alias of tracker status type (queued or error), comma separated list of
tracker status type and/or it aliases ("error,pinning")
On passing invalid filter value no status information will be shown
List of tracker status types
https://github.com/ipfs/ipfs-cluster/blob/319c41cbf195b0453b8d1987991280d3121bac93/api/types.go#L66
`,
ArgsUsage: "[CID]",
Flags: []cli.Flag{
localFlag(),
cli.StringFlag{
Name: "filter",
Usage: "Comma seperated list of type tracker status and their aliases",
},
},
Action: func(c *cli.Context) error {
cidStr := c.Args().First()
Expand All @@ -637,7 +641,7 @@ contacted cluster peer. By default, status will be fetched from all peers.
resp, cerr := globalClient.Status(ci, c.Bool("local"))
formatResponse(c, resp, cerr)
} else {
resp, cerr := globalClient.StatusAll(c.Bool("local"))
resp, cerr := globalClient.StatusAll(c.String("filter"), c.Bool("local"))
formatResponse(c, resp, cerr)
}
return nil
Expand Down

0 comments on commit 0d2fb75

Please sign in to comment.