Skip to content

Commit

Permalink
ipfsproxy: hijack repo/gc and trigger cluster-wide GC
Browse files Browse the repository at this point in the history
This adds hijacking of the repo/gc endpoint to the proxy to do cluster-wide gc.
  • Loading branch information
kishansagathiya authored and hsanjuan committed Dec 6, 2019
1 parent db35253 commit e1faf12
Show file tree
Hide file tree
Showing 4 changed files with 176 additions and 3 deletions.
61 changes: 61 additions & 0 deletions api/ipfsproxy/ipfsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func New(cfg *Config) (*Server, error) {
Path("/repo/stat").
HandlerFunc(proxy.repoStatHandler).
Name("RepoStat")
hijackSubrouter.
Path("/repo/gc").
HandlerFunc(proxy.repoGCHandler).
Name("RepoGC")

// Everything else goes to the IPFS daemon.
router.PathPrefix("/").Handler(reverseProxy)
Expand Down Expand Up @@ -647,6 +651,63 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
return
}

type ipfsRepoGCResp struct {
Key cid.Cid `json:",omitempty"`
Error string `json:",omitempty"`
}

func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
streamErrors := queryValues.Get("stream-errors") == "true"
// ignoring `quiet` since it only affects text output

proxy.setHeaders(w.Header(), r)

w.Header().Set("Trailer", "X-Stream-Error")
var repoGC api.GlobalRepoGC
err := proxy.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"RepoGC",
struct{}{},
&repoGC,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}

w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
var ipfsRepoGC ipfsRepoGCResp
mError := multiError{}
for _, gc := range repoGC.PeerMap {
for _, key := range gc.Keys {
if streamErrors {
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error}
} else {
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key}
if key.Error != "" {
mError.add(key.Error)
}
}

// Cluster tags start with small letter, but IPFS tags with capital letter.
if err := enc.Encode(ipfsRepoGC); err != nil {
logger.Error(err)
}
}
}

mErrStr := mError.Error()
if !streamErrors && mErrStr != "" {
w.Header().Set("X-Stream-Error", mErrStr)
}

return
}

// slashHandler returns a handler which converts a /a/b/c/<argument> request
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
// for it. Our handlers expect that arguments are passed in the ?arg query
Expand Down
77 changes: 77 additions & 0 deletions api/ipfsproxy/ipfsproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -534,6 +535,82 @@ func TestProxyRepoStat(t *testing.T) {

}

func TestProxyRepoGC(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)

type testcase struct {
name string
streamErrors bool
}

testcases := []testcase{
testcase{
name: "With streaming errors",
streamErrors: true,
},
testcase{
name: "Without streaming errors",
streamErrors: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
res1, err := http.Post(fmt.Sprintf("%s/repo/gc?stream-errors=%t", proxyURL(proxy), tc.streamErrors), "", nil)
if err != nil {
t.Fatal(err)
}
defer res1.Body.Close()
if res1.StatusCode != http.StatusOK {
t.Error("request should have succeeded")
}

var repoGC []ipfsRepoGCResp
dec := json.NewDecoder(res1.Body)
for {
resp := ipfsRepoGCResp{}

if err := dec.Decode(&resp); err != nil {
if err == io.EOF {
break
}
t.Error(err)
}

repoGC = append(repoGC, resp)
}

if !repoGC[0].Key.Equals(test.Cid1) {
t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key)
}

xStreamError, ok := res1.Trailer["X-Stream-Error"]
if !ok {
t.Error("trailer header X-Stream-Error not set")
}
if tc.streamErrors {
if repoGC[4].Error != test.ErrLinkNotFound.Error() {
t.Error("expected a different error")
}
if len(xStreamError) != 0 {
t.Error("expected X-Stream-Error header to be empty")
}
} else {
if repoGC[4].Error != "" {
t.Error("did not expect to stream error")
}

if len(xStreamError) == 0 || xStreamError[0] != (test.ErrLinkNotFound.Error()+";") {
t.Error("expected X-Stream-Error header with link not found error")
}
}
})
}
}

func TestProxyAdd(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
Expand Down
19 changes: 19 additions & 0 deletions api/ipfsproxy/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ipfsproxy

import (
"strings"
)

// MultiError contains the results of multiple errors.
type multiError struct {
err strings.Builder
}

func (e *multiError) add(err string) {
e.err.WriteString(err)
e.err.WriteString("; ")
}

func (e *multiError) Error() string {
return e.err.String()
}
22 changes: 19 additions & 3 deletions test/rpc_api_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
)

// ErrBadCid is returned when using ErrorCid. Operations with that CID always
// fail.
var ErrBadCid = errors.New("this is an expected error when using ErrorCid")
var (
// ErrBadCid is returned when using ErrorCid. Operations with that CID always
// fail.
ErrBadCid = errors.New("this is an expected error when using ErrorCid")
// ErrLinkNotFound is error returned when no link is found
ErrLinkNotFound = errors.New("no link by that name")
)

// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
// a client to it.
Expand Down Expand Up @@ -339,6 +343,18 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api.
{
Key: Cid1,
},
{
Key: Cid2,
},
{
Key: Cid3,
},
{
Key: Cid4,
},
{
Error: ErrLinkNotFound.Error(),
},
},
}

Expand Down

0 comments on commit e1faf12

Please sign in to comment.