Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

repo gc in ipfs proxy #961

Merged
merged 8 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
61 changes: 61 additions & 0 deletions api/ipfsproxy/ipfsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ func New(cfg *Config) (*Server, error) {
Path("/repo/stat").
HandlerFunc(proxy.repoStatHandler).
Name("RepoStat")
hijackSubrouter.
Path("/repo/gc").
HandlerFunc(proxy.repoGCHandler).
Name("RepoGC")

// Everything else goes to the IPFS daemon.
router.PathPrefix("/").Handler(reverseProxy)
Expand Down Expand Up @@ -647,6 +651,63 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
return
}

type ipfsRepoGCResp struct {
Key cid.Cid `json:",omitempty"`
Error string `json:",omitempty"`
}

func (proxy *Server) repoGCHandler(w http.ResponseWriter, r *http.Request) {
queryValues := r.URL.Query()
streamErrors := queryValues.Get("stream-errors") == "true"
// ignoring `quiet` since it only affects text output

proxy.setHeaders(w.Header(), r)

w.Header().Set("Trailer", "X-Stream-Error")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We will need to add this to the add handler, I think. It's a bug (new PR).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding to all requests.

var repoGC api.GlobalRepoGC
err := proxy.rpcClient.CallContext(
r.Context(),
"",
"Cluster",
"RepoGC",
struct{}{},
&repoGC,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}

w.WriteHeader(http.StatusOK)
enc := json.NewEncoder(w)
var ipfsRepoGC ipfsRepoGCResp
mError := multiError{}
for _, gc := range repoGC.PeerMap {
for _, key := range gc.Keys {
if streamErrors {
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key, Error: key.Error}
} else {
ipfsRepoGC = ipfsRepoGCResp{Key: key.Key}
if key.Error != "" {
mError.add(key.Error)
}
}

// Cluster tags start with small letter, but IPFS tags with capital letter.
if err := enc.Encode(ipfsRepoGC); err != nil {
logger.Error(err)
}
}
}

mErrStr := mError.Error()
if !streamErrors && mErrStr != "" {
w.Header().Set("X-Stream-Error", mErrStr)
}

return
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}

// slashHandler returns a handler which converts a /a/b/c/<argument> request
// into an /a/b/c/<argument>?arg=<argument> one. And uses the given origHandler
// for it. Our handlers expect that arguments are passed in the ?arg query
Expand Down
77 changes: 77 additions & 0 deletions api/ipfsproxy/ipfsproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
Expand Down Expand Up @@ -534,6 +535,82 @@ func TestProxyRepoStat(t *testing.T) {

}

func TestProxyRepoGC(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)

type testcase struct {
name string
streamErrors bool
}

testcases := []testcase{
testcase{
name: "With streaming errors",
streamErrors: true,
},
testcase{
name: "Without streaming errors",
streamErrors: false,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
res1, err := http.Post(fmt.Sprintf("%s/repo/gc?stream-errors=%t", proxyURL(proxy), tc.streamErrors), "", nil)
if err != nil {
t.Fatal(err)
}
defer res1.Body.Close()
if res1.StatusCode != http.StatusOK {
t.Error("request should have succeeded")
}

var repoGC []ipfsRepoGCResp
dec := json.NewDecoder(res1.Body)
for {
resp := ipfsRepoGCResp{}

if err := dec.Decode(&resp); err != nil {
if err == io.EOF {
break
}
t.Error(err)
}

repoGC = append(repoGC, resp)
}

if !repoGC[0].Key.Equals(test.Cid1) {
t.Errorf("expected a different cid, expected: %s, found: %s", test.Cid1, repoGC[0].Key)
}

xStreamError, ok := res1.Trailer["X-Stream-Error"]
if !ok {
t.Error("trailer header X-Stream-Error not set")
}
if tc.streamErrors {
if repoGC[4].Error != test.ErrLinkNotFound.Error() {
t.Error("expected a different error")
}
if len(xStreamError) != 0 {
t.Error("expected X-Stream-Error header to be empty")
}
} else {
if repoGC[4].Error != "" {
t.Error("did not expect to stream error")
}

if len(xStreamError) == 0 || xStreamError[0] != (test.ErrLinkNotFound.Error()+";") {
t.Error("expected X-Stream-Error header with link not found error")
}
}
})
}
}

func TestProxyAdd(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
Expand Down
19 changes: 19 additions & 0 deletions api/ipfsproxy/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ipfsproxy

import (
"strings"
)

// MultiError contains the results of multiple errors.
type multiError struct {
err strings.Builder
}

func (e *multiError) add(err string) {
e.err.WriteString(err)
e.err.WriteString("; ")
}

func (e *multiError) Error() string {
return e.err.String()
}
22 changes: 19 additions & 3 deletions test/rpc_api_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@ import (
rpc "github.com/libp2p/go-libp2p-gorpc"
)

// ErrBadCid is returned when using ErrorCid. Operations with that CID always
// fail.
var ErrBadCid = errors.New("this is an expected error when using ErrorCid")
var (
// ErrBadCid is returned when using ErrorCid. Operations with that CID always
// fail.
ErrBadCid = errors.New("this is an expected error when using ErrorCid")
// ErrLinkNotFound is error returned when no link is found
ErrLinkNotFound = errors.New("no link by that name")
)

// NewMockRPCClient creates a mock ipfs-cluster RPC server and returns
// a client to it.
Expand Down Expand Up @@ -339,6 +343,18 @@ func (mock *mockCluster) RepoGCLocal(ctx context.Context, in struct{}, out *api.
{
Key: Cid1,
},
{
Key: Cid2,
},
{
Key: Cid3,
},
{
Key: Cid4,
},
{
Error: ErrLinkNotFound.Error(),
},
},
}

Expand Down