Skip to content

Commit

Permalink
Merge pull request #505 from ipfs/fix/495-proxy-add
Browse files Browse the repository at this point in the history
Fix #495: Hijack proxy /add correctly. Disable keep-alives.
  • Loading branch information
hsanjuan committed Aug 20, 2018
2 parents 77df433 + be651da commit 63fa1ba
Show file tree
Hide file tree
Showing 10 changed files with 194 additions and 326 deletions.
1 change: 1 addition & 0 deletions adder/adder.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
ipfsAdder.Wrap = a.params.Wrap
ipfsAdder.Chunker = a.params.Chunker
ipfsAdder.Out = a.output
ipfsAdder.Progress = a.params.Progress

for {
select {
Expand Down
68 changes: 68 additions & 0 deletions adder/adderutils/adderutils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Package adderutils provides some utilities for adding content to cluster.
package adderutils

import (
"context"
"encoding/json"
"mime/multipart"
"net/http"
"sync"

"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/api"

rpc "github.com/hsanjuan/go-libp2p-gorpc"
cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
)

var logger = logging.Logger("adder")

// AddMultipartHTTPHandler is a helper function to add content
// uploaded using a multipart request.
func AddMultipartHTTPHandler(
ctx context.Context,
rpc *rpc.Client,
params *api.AddParams,
reader *multipart.Reader,
w http.ResponseWriter,
) (*cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
flusher, flush := w.(http.Flusher)
if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = local.New(rpc, params.PinOptions)
}

enc := json.NewEncoder(w)
// This must be application/json otherwise go-ipfs client
// will break.
w.Header().Add("Content-Type", "application/json")
// Browsers should not cache when streaming content.
w.Header().Add("Cache-Control", "no-cache")
w.WriteHeader(http.StatusOK)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(v)
if err != nil {
logger.Error(err)
}
if flush {
flusher.Flush()
}
}
}()

add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
wg.Wait()
return root, err
}
11 changes: 10 additions & 1 deletion api/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type AddParams struct {
Hidden bool
Wrap bool
Shard bool
Progress bool
}

// DefaultAddParams returns a AddParams object with standard defaults
Expand All @@ -42,6 +43,7 @@ func DefaultAddParams() *AddParams {
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
Expand Down Expand Up @@ -108,6 +110,12 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
if err != nil {
return nil, err
}

err = parseBoolParam(query, "progress", &params.Progress)
if err != nil {
return nil, err
}

err = parseIntParam(query, "replication-min", &params.ReplicationFactorMin)
if err != nil {
return nil, err
Expand All @@ -133,7 +141,7 @@ func (p *AddParams) ToQueryString() string {
fmtStr := "replication-min=%d&replication-max=%d&name=%s&"
fmtStr += "shard=%t&shard-size=%d&"
fmtStr += "layout=%s&chunker=%s&raw-leaves=%t&hidden=%t&"
fmtStr += "wrap-with-directory=%t"
fmtStr += "wrap-with-directory=%t&progress=%t"
query := fmt.Sprintf(
fmtStr,
p.ReplicationFactorMin,
Expand All @@ -146,6 +154,7 @@ func (p *AddParams) ToQueryString() string {
p.RawLeaves,
p.Hidden,
p.Wrap,
p.Progress,
)
return query
}
Expand Down
47 changes: 13 additions & 34 deletions api/rest/restapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"sync"
"time"

"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/adder/local"
"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/adder/adderutils"
types "github.com/ipfs/ipfs-cluster/api"

mux "github.com/gorilla/mux"
Expand Down Expand Up @@ -115,7 +113,9 @@ func NewAPIWithHost(cfg *Config, h host.Host) (*API, error) {
IdleTimeout: cfg.IdleTimeout,
Handler: router,
}
s.SetKeepAlivesEnabled(true) // A reminder that this can be changed

// See: https://github.com/ipfs/go-ipfs/issues/5168
s.SetKeepAlivesEnabled(false)

ctx, cancel := context.WithCancel(context.Background())

Expand Down Expand Up @@ -514,36 +514,13 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
return
}

output := make(chan *types.AddedOutput, 200)
var dags adder.ClusterDAGService

if params.Shard {
dags = sharding.New(api.rpcClient, params.PinOptions, output)
} else {
dags = local.New(api.rpcClient, params.PinOptions)
}

enc := json.NewEncoder(w)
w.Header().Add("Content-Type", "application/octet-stream")
w.WriteHeader(http.StatusOK)

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(v)
if err != nil {
logger.Error(err)
}
}
}()

add := adder.New(dags, params, output)
c, err := add.FromMultipart(api.ctx, reader)
_ = c

wg.Wait()
_, err = adderutils.AddMultipartHTTPHandler(
api.ctx,
api.rpcClient,
params,
reader,
w,
)

if err != nil {
errorResp := types.AddedOutput{
Expand All @@ -552,6 +529,8 @@ func (api *API) addHandler(w http.ResponseWriter, r *http.Request) {
Message: err.Error(),
},
}
enc := json.NewEncoder(w)

if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}
Expand Down
Loading

0 comments on commit 63fa1ba

Please sign in to comment.