Skip to content

Commit

Permalink
Merge pull request #767 from ipfs/feat/user-allocs-type
Browse files Browse the repository at this point in the history
Types: make UserAllocations []peer.ID instead of string
  • Loading branch information
hsanjuan committed May 2, 2019
2 parents dfaa141 + f140bdb commit 7e700e2
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 29 deletions.
144 changes: 133 additions & 11 deletions api/ipfsproxy/ipfsproxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ func New(cfg *Config) (*Server, error) {
Path("/pin/ls").
HandlerFunc(proxy.pinLsHandler).
Name("PinLs")
hijackSubrouter.
Path("/pin/update").
HandlerFunc(proxy.pinUpdateHandler).
Name("PinUpdate")
hijackSubrouter.
Path("/add").
HandlerFunc(proxy.addHandler).
Expand Down Expand Up @@ -284,10 +288,14 @@ func (proxy *Server) run() {
}

// ipfsErrorResponder writes an http error response just like IPFS would.
func ipfsErrorResponder(w http.ResponseWriter, errMsg string) {
func ipfsErrorResponder(w http.ResponseWriter, errMsg string, code int) {
res := ipfsError{errMsg}
resBytes, _ := json.Marshal(res)
w.WriteHeader(http.StatusInternalServerError)
if code > 0 {
w.WriteHeader(code)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
w.Write(resBytes)
return
}
Expand All @@ -298,7 +306,7 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
arg := r.URL.Query().Get("arg")
p, err := path.ParsePath(arg)
if err != nil {
ipfsErrorResponder(w, "Error parsing IPFS Path: "+err.Error())
ipfsErrorResponder(w, "Error parsing IPFS Path: "+err.Error(), -1)
return
}

Expand All @@ -312,7 +320,7 @@ func (proxy *Server) pinOpHandler(op string, w http.ResponseWriter, r *http.Requ
&pin,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
ipfsErrorResponder(w, err.Error(), -1)
return
}

Expand Down Expand Up @@ -343,7 +351,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
if arg != "" {
c, err := cid.Decode(arg)
if err != nil {
ipfsErrorResponder(w, err.Error())
ipfsErrorResponder(w, err.Error(), -1)
return
}
var pin api.Pin
Expand All @@ -355,7 +363,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
&pin,
)
if err != nil {
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg))
ipfsErrorResponder(w, fmt.Sprintf("Error: path '%s' is not pinned", arg), -1)
return
}
pinLs.Keys[pin.Cid.String()] = ipfsPinType{
Expand All @@ -371,7 +379,7 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
&pins,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
ipfsErrorResponder(w, err.Error(), -1)
return
}

Expand All @@ -387,18 +395,132 @@ func (proxy *Server) pinLsHandler(w http.ResponseWriter, r *http.Request) {
w.Write(resBytes)
}

func (proxy *Server) pinUpdateHandler(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "ipfsproxy/pinUpdateHandler")
defer span.End()

proxy.setHeaders(w.Header(), r)

// Check that we have enough arguments and mimic ipfs response when not
q := r.URL.Query()
args := q["arg"]
if len(args) == 0 {
ipfsErrorResponder(w, "argument \"from-path\" is required", http.StatusBadRequest)
return
}
if len(args) == 1 {
ipfsErrorResponder(w, "argument \"to-path\" is required", http.StatusBadRequest)
return
}

unpin := !(q.Get("unpin") == "false")
from := args[0]
to := args[1]

// Parse paths (we will need to resolve them)
pFrom, err := path.ParsePath(from)
if err != nil {
ipfsErrorResponder(w, "error parsing \"from-path\" argument: "+err.Error(), -1)
return
}

pTo, err := path.ParsePath(to)
if err != nil {
ipfsErrorResponder(w, "error parsing \"to-path\" argument: "+err.Error(), -1)
return
}

// Resolve the FROM argument
var fromCid cid.Cid
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"IPFSResolve",
pFrom.String(),
&fromCid,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}

// Get existing FROM pin, and send error if not present.
var fromPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinGet",
fromCid,
&fromPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}

// Prepare to pin the TO argument with the options from the FROM pin
// and the allocations of the FROM pin.
toPath := &api.PinPath{
Path: pTo.String(),
PinOptions: fromPin.PinOptions,
}
toPath.PinOptions.UserAllocations = fromPin.Allocations

// Pin the TO pin.
var toPin api.Pin
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"PinPath",
toPath,
&toPin,
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}

// If unpin != "false", unpin the FROM argument
// (it was already resolved).
if unpin {
err = proxy.rpcClient.CallContext(
ctx,
"",
"Cluster",
"Unpin",
&fromPin,
&struct{}{},
)
if err != nil {
ipfsErrorResponder(w, err.Error(), -1)
return
}
}

res := ipfsPinOpResp{
Pins: []string{fromCid.String(), toPin.Cid.String()},
}
resBytes, _ := json.Marshal(res)
w.WriteHeader(http.StatusOK)
w.Write(resBytes)
return
}

func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
proxy.setHeaders(w.Header(), r)

reader, err := r.MultipartReader()
if err != nil {
ipfsErrorResponder(w, "error reading request: "+err.Error())
ipfsErrorResponder(w, "error reading request: "+err.Error(), -1)
return
}

q := r.URL.Query()
if q.Get("only-hash") == "true" {
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster")
ipfsErrorResponder(w, "only-hash is not supported when adding to cluster", -1)
}

unpin := q.Get("pin") == "false"
Expand All @@ -407,7 +529,7 @@ func (proxy *Server) addHandler(w http.ResponseWriter, r *http.Request) {
// /add params. We can parse most of them directly from the query.
params, err := api.AddParamsFromQuery(q)
if err != nil {
ipfsErrorResponder(w, "error parsing options:"+err.Error())
ipfsErrorResponder(w, "error parsing options:"+err.Error(), -1)
return
}
trickle := q.Get("trickle")
Expand Down Expand Up @@ -475,7 +597,7 @@ func (proxy *Server) repoStatHandler(w http.ResponseWriter, r *http.Request) {
&peers,
)
if err != nil {
ipfsErrorResponder(w, err.Error())
ipfsErrorResponder(w, err.Error(), -1)
return
}

Expand Down
102 changes: 102 additions & 0 deletions api/ipfsproxy/ipfsproxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,108 @@ func TestIPFSProxyUnpin(t *testing.T) {
}
}

func TestIPFSProxyPinUpdate(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
defer mock.Close()
defer proxy.Shutdown(ctx)

t.Run("pin/update bad args", func(t *testing.T) {
res, err := http.Post(fmt.Sprintf("%s/pin/update", proxyURL(proxy)), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}

defer res.Body.Close()
if res.StatusCode != http.StatusBadRequest {
t.Error("request should not be successful with a no arguments")
}

res2, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s", proxyURL(proxy), test.PathIPFS1), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}

defer res2.Body.Close()
if res2.StatusCode != http.StatusBadRequest {
t.Error("request should not be successful with a single argument")
}
})

t.Run("pin/update", func(t *testing.T) {
res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.PathIPFS1, test.PathIPFS2), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}

defer res.Body.Close()

var resp ipfsPinOpResp
resBytes, _ := ioutil.ReadAll(res.Body)
err = json.Unmarshal(resBytes, &resp)
if err != nil {
t.Fatal(err)
}
if len(resp.Pins) != 2 ||
resp.Pins[0] != test.Cid2.String() ||
resp.Pins[1] != test.CidResolved.String() { // always resolve to the same
t.Errorf("bad response: %s", string(resBytes))
}
})

t.Run("pin/update check unpin happens", func(t *testing.T) {
// passing an errorCid to unpin should return an error
// when unpinning.

res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s", proxyURL(proxy), test.ErrorCid, test.PathIPFS2), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}

defer res.Body.Close()
if res.StatusCode != http.StatusInternalServerError {
t.Fatal("request should error")
}

resBytes, _ := ioutil.ReadAll(res.Body)
var respErr ipfsError
err = json.Unmarshal(resBytes, &respErr)
if err != nil {
t.Fatal(err)
}

if respErr.Message != test.ErrBadCid.Error() {
t.Error("expected a bad cid error:", respErr.Message)
}
})

t.Run("pin/update check pin happens", func(t *testing.T) {
// passing an errorCid to pin, with unpin=false should return
// an error when pinning

res, err := http.Post(fmt.Sprintf("%s/pin/update?arg=%s&arg=%s&unpin=false", proxyURL(proxy), test.Cid1, test.ErrorCid), "", nil)
if err != nil {
t.Fatal("request should complete: ", err)
}

defer res.Body.Close()
if res.StatusCode != http.StatusInternalServerError {
t.Fatal("request should error")
}

resBytes, _ := ioutil.ReadAll(res.Body)
var respErr ipfsError
err = json.Unmarshal(resBytes, &respErr)
if err != nil {
t.Fatal(err)
}

if respErr.Message != test.ErrBadCid.Error() {
t.Error("expected a bad cid error:", respErr.Message)
}
})
}

func TestIPFSProxyPinLs(t *testing.T) {
ctx := context.Background()
proxy, mock := testIPFSProxy(t)
Expand Down
5 changes: 1 addition & 4 deletions api/rest/client/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,7 @@ func TestPinPath(t *testing.T) {
ReplicationFactorMin: 6,
ReplicationFactorMax: 7,
Name: "hello there",
UserAllocations: []string{
"QmWPKsvv9VCXmnmX4YGNaYUmB4MbwKyyLsVDYxTQXkNdxt",
"QmWPKsvv9VCVTomX4YbNaTUmJ4MbwgyyVsVDtxXQXkNdxt",
},
UserAllocations: []peer.ID{test.PeerID1, test.PeerID2},
}

testF := func(t *testing.T, c Client) {
Expand Down
2 changes: 1 addition & 1 deletion api/rest/restapi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ var testPinOpts = api.PinOptions{
ReplicationFactorMax: 7,
ReplicationFactorMin: 6,
Name: "hello there",
UserAllocations: []string{"QmWPKsvv9VCXmnmX4YGNaYUmB4MbwKyyLsVDYxTQXkNdxt", "QmWPKsvv9VCVTomX4YbNaTUmJ4MbwgyyVsVDtxXQXkNdxt"},
UserAllocations: []peer.ID{test.PeerID1, test.PeerID2},
}

var pathTestCases = []pathCase{
Expand Down
12 changes: 5 additions & 7 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,7 @@ type PinOptions struct {
ReplicationFactorMax int `json:"replication_factor_max" codec:"rx,omitempty"`
Name string `json:"name" codec:"n,omitempty"`
ShardSize uint64 `json:"shard_size" codec:"s,omitempty"`
UserAllocations []string `json:"user_allocations" codec:"ua,omitempty"`
UserAllocations []peer.ID `json:"user_allocations" codec:"ua,omitempty"`
Metadata map[string]string `json:"metadata" codec:"m,omitempty"`
}

Expand Down Expand Up @@ -484,10 +484,8 @@ func (po *PinOptions) Equals(po2 *PinOptions) bool {
}

// avoid side effects in the original objects
allocs1 := make([]string, lenAllocs1, lenAllocs1)
allocs2 := make([]string, lenAllocs2, lenAllocs2)
copy(allocs1, po.UserAllocations)
copy(allocs2, po2.UserAllocations)
allocs1 := PeersToStrings(po.UserAllocations)
allocs2 := PeersToStrings(po2.UserAllocations)
sort.Strings(allocs1)
sort.Strings(allocs2)
if strings.Join(allocs1, ",") != strings.Join(allocs2, ",") {
Expand All @@ -510,7 +508,7 @@ func (po *PinOptions) ToQuery() string {
q.Set("replication-max", fmt.Sprintf("%d", po.ReplicationFactorMax))
q.Set("name", po.Name)
q.Set("shard-size", fmt.Sprintf("%d", po.ShardSize))
q.Set("user-allocations", strings.Join(po.UserAllocations, ","))
q.Set("user-allocations", strings.Join(PeersToStrings(po.UserAllocations), ","))
for k, v := range po.Metadata {
if k == "" {
continue
Expand Down Expand Up @@ -542,7 +540,7 @@ func (po *PinOptions) FromQuery(q url.Values) {
}

if allocs := q.Get("user-allocations"); allocs != "" {
po.UserAllocations = strings.Split(allocs, ",")
po.UserAllocations = StringsToPeers(strings.Split(allocs, ","))
}

po.Metadata = make(map[string]string)
Expand Down
Loading

0 comments on commit 7e700e2

Please sign in to comment.