Skip to content

Commit

Permalink
Merge pull request #633 from ipfs/feat/632-stream-channels
Browse files Browse the repository at this point in the history
Feat/632: support stream-channels query parameter when adding
  • Loading branch information
hsanjuan committed Jan 8, 2019
2 parents b44c7fb + c51ff00 commit 595db7a
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 70 deletions.
95 changes: 70 additions & 25 deletions adder/adderutils/adderutils.go
Expand Up @@ -34,58 +34,103 @@ func AddMultipartHTTPHandler(
) (cid.Cid, error) {
var dags adder.ClusterDAGService
output := make(chan *api.AddedOutput, 200)
flusher, flush := w.(http.Flusher)

if params.Shard {
dags = sharding.New(rpc, params.PinOptions, output)
} else {
dags = local.New(rpc, params.PinOptions)
}

enc := json.NewEncoder(w)
if outputTransform == nil {
outputTransform = func(in *api.AddedOutput) interface{} { return in }
}

// This must be application/json otherwise go-ipfs client
// will break.
w.Header().Set("Content-Type", "application/json")
// Browsers should not cache when streaming content.
// Browsers should not cache these responses.
w.Header().Set("Cache-Control", "no-cache")
// Custom header which breaks js-ipfs-api if not set
// https://github.com/ipfs-shipyard/ipfs-companion/issues/600
w.Header().Set("X-Chunked-Output", "1")

// Used by go-ipfs to signal errors half-way through the stream.
w.Header().Set("Trailer", "X-Stream-Error")

// We need to ask the clients to close the connection
// (no keep-alive) of things break badly when adding.
// https://github.com/ipfs/go-ipfs-cmds/pull/116
w.Header().Set("Connection", "close")
w.WriteHeader(http.StatusOK)

if outputTransform == nil {
outputTransform = func(in *api.AddedOutput) interface{} { return in }
var wg sync.WaitGroup
if !params.StreamChannels {
// in this case we buffer responses in memory and
// return them as a valid JSON array.
wg.Add(1)
var bufOutput []interface{} // a slice of transformed AddedOutput
go func() {
defer wg.Done()
bufOutput = buildOutput(output, outputTransform)
}()

enc := json.NewEncoder(w)
add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
if err != nil { // Send an error
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
errorResp := api.Error{
Code: http.StatusInternalServerError,
Message: err.Error(),
}

if err := enc.Encode(errorResp); err != nil {
logger.Error(err)
}
wg.Wait()
return root, err
}
wg.Wait()
w.WriteHeader(http.StatusOK)
enc.Encode(bufOutput)
return root, err
}

var wg sync.WaitGroup
// handle stream-adding. This should be the default.

// https://github.com/ipfs-shipyard/ipfs-companion/issues/600
w.Header().Set("X-Chunked-Output", "1")
// Used by go-ipfs to signal errors half-way through the stream.
w.Header().Set("Trailer", "X-Stream-Error")
w.WriteHeader(http.StatusOK)
wg.Add(1)
go func() {
defer wg.Done()
for v := range output {
err := enc.Encode(outputTransform(v))
if err != nil {
logger.Error(err)
break
}
if flush {
flusher.Flush()
}
}
streamOutput(w, output, outputTransform)
}()

add := adder.New(dags, params, output)
root, err := add.FromMultipart(ctx, reader)
if err != nil {
logger.Error(err)
// Set trailer with error
w.Header().Set("X-Stream-Error", err.Error())
}
wg.Wait()
return root, err
}

func streamOutput(w http.ResponseWriter, output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) {
flusher, flush := w.(http.Flusher)
enc := json.NewEncoder(w)
for v := range output {
err := enc.Encode(transform(v))
if err != nil {
logger.Error(err)
break
}
if flush {
flusher.Flush()
}
}
}

func buildOutput(output chan *api.AddedOutput, transform func(*api.AddedOutput) interface{}) []interface{} {
var finalOutput []interface{}
for v := range output {
finalOutput = append(finalOutput, transform(v))
}
return finalOutput
}
53 changes: 31 additions & 22 deletions api/add.go
Expand Up @@ -24,31 +24,33 @@ type AddedOutput struct {
type AddParams struct {
PinOptions

Recursive bool
Layout string
Chunker string
RawLeaves bool
Hidden bool
Wrap bool
Shard bool
Progress bool
CidVersion int
HashFun string
Recursive bool
Layout string
Chunker string
RawLeaves bool
Hidden bool
Wrap bool
Shard bool
Progress bool
CidVersion int
HashFun string
StreamChannels bool
}

// DefaultAddParams returns a AddParams object with standard defaults
func DefaultAddParams() *AddParams {
return &AddParams{
Recursive: false,
Layout: "", // corresponds to balanced layout
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
CidVersion: 0,
HashFun: "sha2-256",
Recursive: false,
Layout: "", // corresponds to balanced layout
Chunker: "size-262144",
RawLeaves: false,
Hidden: false,
Wrap: false,
Shard: false,
Progress: false,
CidVersion: 0,
HashFun: "sha2-256",
StreamChannels: true,
PinOptions: PinOptions{
ReplicationFactorMin: 0,
ReplicationFactorMax: 0,
Expand Down Expand Up @@ -90,7 +92,7 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
case "trickle", "balanced", "":
// nothing
default:
return nil, errors.New("parameter trickle invalid")
return nil, errors.New("layout parameter invalid")
}
params.Layout = layout

Expand Down Expand Up @@ -153,6 +155,11 @@ func AddParamsFromQuery(query url.Values) (*AddParams, error) {
params.ShardSize = shardSize
}

err = parseBoolParam(query, "stream-channels", &params.StreamChannels)
if err != nil {
return nil, err
}

return params, nil
}

Expand All @@ -173,6 +180,7 @@ func (p *AddParams) ToQueryString() string {
query.Set("progress", fmt.Sprintf("%t", p.Progress))
query.Set("cid-version", fmt.Sprintf("%d", p.CidVersion))
query.Set("hash", p.HashFun)
query.Set("stream-channels", fmt.Sprintf("%t", p.StreamChannels))
return query.Encode()
}

Expand All @@ -190,5 +198,6 @@ func (p *AddParams) Equals(p2 *AddParams) bool {
p.Hidden == p2.Hidden &&
p.Wrap == p2.Wrap &&
p.CidVersion == p2.CidVersion &&
p.HashFun == p2.HashFun
p.HashFun == p2.HashFun &&
p.StreamChannels == p2.StreamChannels
}
3 changes: 3 additions & 0 deletions api/rest/client/methods.go
Expand Up @@ -428,6 +428,9 @@ func (c *defaultClient) AddMultiFile(

headers := make(map[string]string)
headers["Content-Type"] = "multipart/form-data; boundary=" + multiFileR.Boundary()

// This method must run with StreamChannels set.
params.StreamChannels = true
queryStr := params.ToQueryString()

// our handler decodes an AddedOutput and puts it
Expand Down
11 changes: 6 additions & 5 deletions api/rest/client/methods_test.go
Expand Up @@ -466,11 +466,12 @@ func TestAddMultiFile(t *testing.T) {
Name: "test something",
ShardSize: 1024,
},
Shard: false,
Layout: "",
Chunker: "",
RawLeaves: false,
Hidden: false,
Shard: false,
Layout: "",
Chunker: "",
RawLeaves: false,
Hidden: false,
StreamChannels: true,
}

out := make(chan *types.AddedOutput, 1)
Expand Down
55 changes: 50 additions & 5 deletions api/rest/restapi_test.go
Expand Up @@ -200,10 +200,14 @@ func makeGet(t *testing.T, rest *API, url string, resp interface{}) {
}

func makePost(t *testing.T, rest *API, url string, body []byte, resp interface{}) {
makePostWithContentType(t, rest, url, body, "application/json", resp)
}

func makePostWithContentType(t *testing.T, rest *API, url string, body []byte, contentType string, resp interface{}) {
h := makeHost(t, rest)
defer h.Close()
c := httpClient(t, h, isHTTPS(url))
httpResp, err := c.Post(url, "application/json", bytes.NewReader(body))
httpResp, err := c.Post(url, contentType, bytes.NewReader(body))
processResp(t, httpResp, err, resp)
checkHeaders(t, rest, url, httpResp.Header)
}
Expand Down Expand Up @@ -381,21 +385,26 @@ func TestAPIAddFileEndpointLocal(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

// This writes generates the testing files and
// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()

tf := func(t *testing.T, url urlF) {
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1"
fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=true"
localURL := url(rest) + fmtStr1
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
resp := api.AddedOutput{}
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
makeStreamingPost(t, rest, localURL, body, mpContentType, &resp)

// resp will contain the last object from the streaming
if resp.Cid != test.ShardingDirBalancedRootCID {
t.Error("Bad Cid after adding: ", resp.Cid)
}
}

testBothEndpoints(t, tf)
Expand All @@ -408,7 +417,7 @@ func TestAPIAddFileEndpointShard(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

// This writes generates the testing files and
// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
Expand All @@ -420,14 +429,50 @@ func TestAPIAddFileEndpointShard(t *testing.T) {
defer closer.Close()
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
resp := api.AddedOutput{}
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1"
fmtStr1 := "/add?shard=true&repl_min=-1&repl_max=-1&stream-channels=true"
shardURL := url(rest) + fmtStr1
makeStreamingPost(t, rest, shardURL, body, mpContentType, &resp)
}

testBothEndpoints(t, tf)
}

func TestAPIAddFileEndpoint_StreamChannelsFalse(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()

sth := test.NewShardingTestHelper()
defer sth.Clean(t)

// This generates the testing files and
// writes them to disk.
// This is necessary here because we run tests
// in parallel, and otherwise a write-race might happen.
_, closer := sth.GetTreeMultiReader(t)
closer.Close()

tf := func(t *testing.T, url urlF) {
body, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
fullBody, err := ioutil.ReadAll(body)
if err != nil {
t.Fatal(err)
}
mpContentType := "multipart/form-data; boundary=" + body.Boundary()
resp := []api.AddedOutput{}
fmtStr1 := "/add?shard=false&repl_min=-1&repl_max=-1&stream-channels=false"
shardURL := url(rest) + fmtStr1

makePostWithContentType(t, rest, shardURL, fullBody, mpContentType, &resp)
lastHash := resp[len(resp)-1]
if lastHash.Cid != test.ShardingDirBalancedRootCID {
t.Error("Bad Cid after adding: ", lastHash.Cid)
}
}

testBothEndpoints(t, tf)
}

func TestAPIPeerRemoveEndpoint(t *testing.T) {
rest := testAPI(t)
defer rest.Shutdown()
Expand Down
24 changes: 11 additions & 13 deletions cmd/ipfs-cluster-ctl/main.go
Expand Up @@ -391,31 +391,29 @@ If you prefer faster adding, add directly to the local IPFS and trigger a
wg.Add(1)
go func() {
defer wg.Done()
var last string
var last *api.AddedOutput
for v := range out {
// Print everything when doing json
if c.GlobalString("encoding") != "text" {
formatResponse(c, *v, nil)
continue
}

// Print last hash only
if c.Bool("quieter") {
last = v.Cid
last = v
continue
}

// Print hashes only
if c.Bool("quiet") {
if c.Bool("quiet") && c.GlobalString("encoding") == "text" {
fmt.Println(v.Cid)
continue
}

// Format normal text representation of AddedOutput
// Print things normally otherwise
// "quiet" does not apply for json
formatResponse(c, *v, nil)
}
if last != "" {
fmt.Println(last)
if last != nil { // "quieter"
if c.GlobalString("encoding") == "text" {
fmt.Println(last.Cid)
} else {
formatResponse(c, *last, nil)
}
}
}()

Expand Down

0 comments on commit 595db7a

Please sign in to comment.