Skip to content

Commit

Permalink
Merge pull request #484 from ipfs/feat/sharding-v1
Browse files Browse the repository at this point in the history
Sharding: support adding content directly to IPFS Cluster
  • Loading branch information
hsanjuan committed Aug 14, 2018
2 parents 2675bf8 + df768b0 commit 5057947
Show file tree
Hide file tree
Showing 65 changed files with 4,988 additions and 462 deletions.
7 changes: 5 additions & 2 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ checks:
threshold: 500
method-complexity:
config:
threshold: 12
threshold: 15
method-lines:
config:
threshold: 80
similar-code:
enabled: false
return-statements:
config:
threshold: 10
threshold: 10
argument-count:
config:
threshold: 6

engines:
fixme:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ sharness/trash*

raftFolderFromTest*
peerstore
shardTesting

# Compiled Object files, Static and Dynamic libs (Shared Objects)
*.o
Expand Down
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ all: service ctl
clean: rwundo clean_sharness
$(MAKE) -C ipfs-cluster-service clean
$(MAKE) -C ipfs-cluster-ctl clean
@rm -rf ./test/testingData

gx-clean: clean
@rm -f $(deptools)/*
Expand Down Expand Up @@ -78,7 +79,7 @@ test_problem: deps

$(sharness):
@echo "Downloading sharness"
@wget -q -O sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
@curl -L -s -o sharness/lib/sharness.tar.gz http://github.com/chriscool/sharness/archive/master.tar.gz
@cd sharness/lib; tar -zxf sharness.tar.gz; cd ../..
@mv sharness/lib/sharness-master sharness/lib/sharness
@rm sharness/lib/sharness.tar.gz
Expand Down
154 changes: 154 additions & 0 deletions adder/adder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package adder

import (
"context"
"io"
"mime/multipart"

"github.com/ipfs/ipfs-cluster/adder/ipfsadd"
"github.com/ipfs/ipfs-cluster/api"

cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-cmdkit/files"
ipld "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log"
)

var logger = logging.Logger("adder")

// ClusterDAGService is an implementation of ipld.DAGService plus a Finalize
// method. ClusterDAGServices can be used to provide Adders with a different
// add implementation.
type ClusterDAGService interface {
ipld.DAGService
// Finalize receives the IPFS content root CID as
// returned by the ipfs adder.
Finalize(ctx context.Context, ipfsRoot *cid.Cid) (*cid.Cid, error)
}

// Adder is used to add content to IPFS Cluster using an implementation of
// ClusterDAGService.
type Adder struct {
ctx context.Context
cancel context.CancelFunc

dgs ClusterDAGService

params *api.AddParams

// AddedOutput updates are placed on this channel
// whenever a block is processed. They contain information
// about the block, the CID, the Name etc. and are mostly
// meant to be streamed back to the user.
output chan *api.AddedOutput
}

// New returns a new Adder with the given ClusterDAGService, add options and a
// channel to send updates during the adding process.
//
// An Adder may only be used once.
func New(ds ClusterDAGService, p *api.AddParams, out chan *api.AddedOutput) *Adder {
// Discard all progress update output as the caller has not provided
// a channel for them to listen on.
if out == nil {
out = make(chan *api.AddedOutput, 100)
go func() {
for range out {
}
}()
}

return &Adder{
dgs: ds,
params: p,
output: out,
}
}

func (a *Adder) setContext(ctx context.Context) {
if a.ctx == nil { // only allows first context
ctxc, cancel := context.WithCancel(ctx)
a.ctx = ctxc
a.cancel = cancel
}
}

// FromMultipart adds content from a multipart.Reader. The adder will
// no longer be usable after calling this method.
func (a *Adder) FromMultipart(ctx context.Context, r *multipart.Reader) (*cid.Cid, error) {
logger.Debugf("adding from multipart with params: %+v", a.params)

f := &files.MultipartFile{
Mediatype: "multipart/form-data",
Reader: r,
}
defer f.Close()
return a.FromFiles(ctx, f)
}

// FromFiles adds content from a files.File. The adder will no longer
// be usable after calling this method.
func (a *Adder) FromFiles(ctx context.Context, f files.File) (*cid.Cid, error) {
logger.Debugf("adding from files")
a.setContext(ctx)

if a.ctx.Err() != nil { // don't allow running twice
return nil, a.ctx.Err()
}

defer a.cancel()
defer close(a.output)

ipfsAdder, err := ipfsadd.NewAdder(a.ctx, a.dgs)
if err != nil {
logger.Error(err)
return nil, err
}

ipfsAdder.Hidden = a.params.Hidden
ipfsAdder.Trickle = a.params.Layout == "trickle"
ipfsAdder.RawLeaves = a.params.RawLeaves
ipfsAdder.Wrap = a.params.Wrap
ipfsAdder.Chunker = a.params.Chunker
ipfsAdder.Out = a.output

for {
select {
case <-a.ctx.Done():
return nil, a.ctx.Err()
default:
err := addFile(f, ipfsAdder)
if err == io.EOF {
goto FINALIZE
}
if err != nil {
logger.Error("error adding to cluster: ", err)
return nil, err
}
}
}

FINALIZE:
adderRoot, err := ipfsAdder.Finalize()
if err != nil {
return nil, err
}
clusterRoot, err := a.dgs.Finalize(a.ctx, adderRoot.Cid())
if err != nil {
logger.Error("error finalizing adder:", err)
return nil, err
}
logger.Infof("%s successfully added to cluster", clusterRoot)
return clusterRoot, nil
}

func addFile(fs files.File, ipfsAdder *ipfsadd.Adder) error {
f, err := fs.NextFile()
if err != nil {
return err
}
defer f.Close()

logger.Debugf("ipfsAdder AddFile(%s)", f.FullPath())
return ipfsAdder.AddFile(f)
}
133 changes: 133 additions & 0 deletions adder/adder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package adder

import (
"context"
"mime/multipart"
"sync"
"testing"
"time"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"

cid "github.com/ipfs/go-cid"
ipld "github.com/ipfs/go-ipld-format"
)

type mockCDAGServ struct {
BaseDAGService
resultCids map[string]struct{}
}

func (dag *mockCDAGServ) Add(ctx context.Context, node ipld.Node) error {
dag.resultCids[node.Cid().String()] = struct{}{}
return nil
}

func (dag *mockCDAGServ) AddMany(ctx context.Context, nodes []ipld.Node) error {
for _, node := range nodes {
err := dag.Add(ctx, node)
if err != nil {
return err
}
}
return nil
}

func (dag *mockCDAGServ) Finalize(ctx context.Context, root *cid.Cid) (*cid.Cid, error) {
return root, nil
}

func TestAdder(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

mr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())
p := api.DefaultAddParams()
expectedCids := test.ShardingDirCids[:]

dags := &mockCDAGServ{
resultCids: make(map[string]struct{}),
}

adder := New(dags, p, nil)

root, err := adder.FromMultipart(context.Background(), r)
if err != nil {
t.Fatal(err)
}

if root.String() != test.ShardingDirBalancedRootCID {
t.Error("expected the right content root")
}

if len(expectedCids) != len(dags.resultCids) {
t.Fatal("unexpected number of blocks imported")
}

for _, c := range expectedCids {
_, ok := dags.resultCids[c]
if !ok {
t.Fatal("unexpected block emitted:", c)
}
}
}

func TestAdder_DoubleStart(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

f := sth.GetTreeSerialFile(t)
p := api.DefaultAddParams()

dags := &mockCDAGServ{
resultCids: make(map[string]struct{}),
}

adder := New(dags, p, nil)
_, err := adder.FromFiles(context.Background(), f)
f.Close()
if err != nil {
t.Fatal(err)
}

f = sth.GetTreeSerialFile(t)
_, err = adder.FromFiles(context.Background(), f)
f.Close()
if err == nil {
t.Fatal("expected an error: cannot run importer twice")
}
}

func TestAdder_ContextCancelled(t *testing.T) {
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

mr, closer := sth.GetRandFileMultiReader(t, 50000) // 50 MB
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())

p := api.DefaultAddParams()

dags := &mockCDAGServ{
resultCids: make(map[string]struct{}),
}

ctx, cancel := context.WithCancel(context.Background())
adder := New(dags, p, nil)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := adder.FromMultipart(ctx, r)
if err == nil {
t.Error("expected a context cancelled error")
}
t.Log(err)
}()
time.Sleep(200 * time.Millisecond)
cancel()
wg.Wait()
}
Loading

0 comments on commit 5057947

Please sign in to comment.