Skip to content

Commit

Permalink
Merge branch 'master' of github.com:ipfs/ipfs-cluster into fix/api-im…
Browse files Browse the repository at this point in the history
…provements
  • Loading branch information
Kishan Mohanbhai Sagathiya committed Aug 14, 2019
2 parents 06729de + dc2e73d commit 229953f
Show file tree
Hide file tree
Showing 26 changed files with 355 additions and 172 deletions.
1 change: 1 addition & 0 deletions Dockerfile
Expand Up @@ -37,6 +37,7 @@ MAINTAINER Hector Sanjuan <hector@protocol.ai>
ENV GOPATH /go
ENV SRC_PATH /go/src/github.com/ipfs/ipfs-cluster
ENV IPFS_CLUSTER_PATH /data/ipfs-cluster
ENV IPFS_CLUSTER_CONSENSUS crdt

EXPOSE 9094
EXPOSE 9095
Expand Down
5 changes: 3 additions & 2 deletions Dockerfile-bundle
@@ -1,4 +1,4 @@
FROM golang:1.9-stretch AS builder
FROM golang:1.12-stretch AS builder
MAINTAINER Hector Sanjuan <hector@protocol.ai>

# This dockerfile builds cluster and runs it along with go-ipfs.
Expand All @@ -22,6 +22,7 @@ MAINTAINER Hector Sanjuan <hector@protocol.ai>
ENV GOPATH /go
ENV SRC_PATH /go/src/github.com/ipfs/ipfs-cluster
ENV IPFS_CLUSTER_PATH /data/ipfs-cluster
ENV IPFS_CLUSTER_CONSENSUS crdt

EXPOSE 9094
EXPOSE 9095
Expand All @@ -38,4 +39,4 @@ VOLUME $IPFS_CLUSTER_PATH
ENTRYPOINT ["/sbin/tini", "--", "/usr/local/bin/start-daemons.sh"]

# Defaults for ipfs-cluster-service go here
CMD ["daemon", "--upgrade"]
CMD ["daemon"]
7 changes: 5 additions & 2 deletions Dockerfile-test
Expand Up @@ -27,14 +27,17 @@ MAINTAINER Hector Sanjuan <hector@protocol.ai>
ENV GOPATH /go
ENV SRC_PATH /go/src/github.com/ipfs/ipfs-cluster
ENV IPFS_CLUSTER_PATH /data/ipfs-cluster
ENV IPFS_CLUSTER_CONSENSUS crdt
ENV IPFS_CLUSTER_RESTAPI_HTTPLISTENMULTIADDRESS /ip4/0.0.0.0/tcp/9094
ENV IPFS_CLUSTER_IPFSPROXY_LISTENMULTIADDRESS /ip4/0.0.0.0/tcp/9095

EXPOSE 9094
EXPOSE 9095
EXPOSE 9096

COPY --from=builder $GOPATH/bin/ipfs-cluster-service /usr/local/bin/ipfs-cluster-service
COPY --from=builder $GOPATH/bin/ipfs-cluster-ctl /usr/local/bin/ipfs-cluster-ctl
COPY --from=builder $SRC_PATH/docker/test-entrypoint.sh /usr/local/bin/start-daemons.sh
COPY --from=builder $SRC_PATH/docker/test-entrypoint.sh /usr/local/bin/test-entrypoint.sh
COPY --from=builder $SRC_PATH/docker/random-stopper.sh /usr/local/bin/random-stopper.sh
COPY --from=builder $SRC_PATH/docker/random-killer.sh /usr/local/bin/random-killer.sh
COPY --from=builder $SRC_PATH/docker/wait-killer-stopper.sh /usr/local/bin/wait-killer-stopper.sh
Expand All @@ -55,7 +58,7 @@ RUN mkdir -p $IPFS_CLUSTER_PATH && \
USER ipfs

VOLUME $IPFS_CLUSTER_PATH
ENTRYPOINT ["/usr/local/bin/start-daemons.sh"]
ENTRYPOINT ["/usr/local/bin/test-entrypoint.sh"]

# Defaults would go here
CMD ["daemon"]
6 changes: 3 additions & 3 deletions Makefile
Expand Up @@ -65,9 +65,9 @@ docker-compose:
mkdir -p compose/ipfs0 compose/ipfs1 compose/cluster0 compose/cluster1
chmod -R 0777 compose
CLUSTER_SECRET=$(shell od -vN 32 -An -tx1 /dev/urandom | tr -d ' \n') docker-compose up -d
sleep 20
docker exec cluster0 ipfs-cluster-ctl peers ls | grep -o "Sees 1 other peers" | uniq -c | grep 2
docker exec cluster1 ipfs-cluster-ctl peers ls | grep -o "Sees 1 other peers" | uniq -c | grep 2
sleep 35
docker exec cluster0 ipfs-cluster-ctl peers ls | grep -o "Sees 2 other peers" | uniq -c | grep 3
docker exec cluster1 ipfs-cluster-ctl peers ls | grep -o "Sees 2 other peers" | uniq -c | grep 3
docker-compose down

prcheck: check service ctl test
Expand Down
105 changes: 105 additions & 0 deletions add_test.go
Expand Up @@ -5,11 +5,15 @@ package ipfscluster
import (
"context"
"mime/multipart"
"sync"
"testing"
"time"

files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/ipfs-cluster/adder"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
peer "github.com/libp2p/go-libp2p-core/peer"
)

func TestAdd(t *testing.T) {
Expand Down Expand Up @@ -102,3 +106,104 @@ func TestAddPeerDown(t *testing.T) {
runF(t, clusters, f)
})
}

func TestAddOnePeerFails(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

waitForLeaderAndMetrics(t, clusters)

t.Run("local", func(t *testing.T) {
params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
defer closer.Close()

mr := files.NewMultiFileReader(lg, true)
r := multipart.NewReader(mr, mr.Boundary())

var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}
}()

// Disconnect 1 cluster (the last). Things should keep working.
// Important that we close the hosts, otherwise the RPC
// Servers keep working along with BlockPuts.
time.Sleep(100 * time.Millisecond)
c := clusters[nClusters-1]
c.Shutdown(context.Background())
c.dht.Close()
c.host.Close()
wg.Wait()
})
}

func TestAddAllPeersFail(t *testing.T) {
ctx := context.Background()
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

waitForLeaderAndMetrics(t, clusters)

t.Run("local", func(t *testing.T) {
// Prevent added content to be allocated to cluster 0
// as it is already going to have something.
_, err := clusters[0].Pin(ctx, test.Cid1, api.PinOptions{
ReplicationFactorMin: 1,
ReplicationFactorMax: 1,
UserAllocations: []peer.ID{clusters[0].host.ID()},
})
if err != nil {
t.Fatal(err)
}

ttlDelay()

params := api.DefaultAddParams()
params.Shard = false
params.Name = "testlocal"
// Allocate to every peer except 0 (which already has a pin)
params.PinOptions.ReplicationFactorMax = nClusters - 1
params.PinOptions.ReplicationFactorMin = nClusters - 1

lg, closer := sth.GetRandFileReader(t, 100000) // 100 MB
defer closer.Close()
mr := files.NewMultiFileReader(lg, true)
r := multipart.NewReader(mr, mr.Boundary())

// var cid cid.Cid
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
_, err := clusters[0].AddFile(r, params)
if err != adder.ErrBlockAdder {
t.Fatal("expected ErrBlockAdder. Got: ", err)
}
}()

time.Sleep(100 * time.Millisecond)

// Shutdown all clusters except 0 to see the right error.
// Important that we shut down the hosts, otherwise
// the RPC Servers keep working along with BlockPuts.
// Note that this kills raft.
runF(t, clusters[1:], func(t *testing.T, c *Cluster) {
c.Shutdown(ctx)
c.dht.Close()
c.host.Close()
})
wg.Wait()
})
}
1 change: 1 addition & 0 deletions adder/adder_test.go
Expand Up @@ -137,6 +137,7 @@ func TestAdder_ContextCancelled(t *testing.T) {
}
t.Log(err)
}()
// adder.FromMultipart will finish, if sleep more
time.Sleep(100 * time.Millisecond)
cancel()
wg.Wait()
Expand Down
15 changes: 4 additions & 11 deletions adder/local/dag_service.go
Expand Up @@ -30,6 +30,8 @@ type DAGService struct {

dests []peer.ID
pinOpts api.PinOptions

ba *adder.BlockAdder
}

// New returns a new Adder with the given rpc Client. The client is used
Expand All @@ -50,19 +52,10 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return err
}
dgs.dests = dests
dgs.ba = adder.NewBlockAdder(dgs.rpcClient, dests)
}

size, err := node.Size()
if err != nil {
return err
}
nodeSerial := &api.NodeWithMeta{
Cid: node.Cid(),
Data: node.RawData(),
CumSize: size,
}

return adder.PutBlock(ctx, dgs.rpcClient, nodeSerial, dgs.dests)
return dgs.ba.Add(ctx, node)
}

// Finalize pins the last Cid added to this DAGService.
Expand Down
35 changes: 0 additions & 35 deletions adder/sharding/dag.go
Expand Up @@ -20,16 +20,11 @@ import (
"context"
"fmt"

"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/rpcutil"

blocks "github.com/ipfs/go-block-format"
cid "github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
ipld "github.com/ipfs/go-ipld-format"
dag "github.com/ipfs/go-merkledag"
peer "github.com/libp2p/go-libp2p-core/peer"
rpc "github.com/libp2p/go-libp2p-gorpc"
mh "github.com/multiformats/go-multihash"
)

Expand Down Expand Up @@ -123,36 +118,6 @@ func makeDAG(ctx context.Context, dagObj map[string]cid.Cid) ([]ipld.Node, error
return nodes, nil
}

func putDAG(ctx context.Context, rpcC *rpc.Client, nodes []ipld.Node, dests []peer.ID) error {
for _, n := range nodes {
//logger.Debugf("The dag cbor Node Links: %+v", n.Links())
b := &api.NodeWithMeta{
Cid: n.Cid(), // Tests depend on this.
Data: n.RawData(),
Format: "cbor",
}
//logger.Debugf("Here is the serialized ipld: %x", b.Data)

ctxs, cancels := rpcutil.CtxsWithCancel(ctx, len(dests))
defer rpcutil.MultiCancel(cancels)

logger.Debugf("DAG block put %s", n.Cid())
errs := rpcC.MultiCall(
ctxs,
dests,
"IPFSConnector",
"BlockPut",
b,
rpcutil.RPCDiscardReplies(len(dests)),
)

if err := rpcutil.CheckErrs(errs); err != nil {
return err
}
}
return nil
}

// TODO: decide whether this is worth including. Is precision important for
// most usecases? Is being a little over the shard size a serious problem?
// Is precision worth the cost to maintain complex accounting for metadata
Expand Down
29 changes: 11 additions & 18 deletions adder/sharding/dag_service.go
Expand Up @@ -71,17 +71,7 @@ func (dgs *DAGService) Add(ctx context.Context, node ipld.Node) error {
return nil
}

size, err := node.Size()
if err != nil {
return err
}
nodeSerial := &api.NodeWithMeta{
Cid: node.Cid(),
Data: node.RawData(),
CumSize: size,
}

return dgs.ingestBlock(ctx, nodeSerial)
return dgs.ingestBlock(ctx, node)
}

// Finalize finishes sharding, creates the cluster DAG and pins it along
Expand All @@ -102,7 +92,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
}

// PutDAG to ourselves
err = putDAG(ctx, dgs.rpcClient, clusterDAGNodes, []peer.ID{""})
err = adder.NewBlockAdder(dgs.rpcClient, []peer.ID{""}).AddMany(ctx, clusterDAGNodes)
if err != nil {
return dataRoot, err
}
Expand Down Expand Up @@ -164,7 +154,7 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,

// ingests a block to the current shard. If it get's full, it
// Flushes the shard and retries with a new one.
func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) error {
func (dgs *DAGService) ingestBlock(ctx context.Context, n ipld.Node) error {
shard := dgs.currentShard

// if we have no currentShard, create one
Expand All @@ -178,17 +168,20 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err
dgs.currentShard = shard
}

logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid, len(dgs.shards), dgs.pinOpts.Name)
logger.Debugf("ingesting block %s in shard %d (%s)", n.Cid(), len(dgs.shards), dgs.pinOpts.Name)

// this is not same as n.Size()
size := uint64(len(n.RawData()))

// add the block to it if it fits and return
if shard.Size()+n.Size() < shard.Limit() {
shard.AddLink(ctx, n.Cid, n.Size())
return adder.PutBlock(ctx, dgs.rpcClient, n, shard.Allocations())
if shard.Size()+size < shard.Limit() {
shard.AddLink(ctx, n.Cid(), size)
return dgs.currentShard.ba.Add(ctx, n)
}

logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d",
len(dgs.shards),
n.Size(),
size,
shard.Size(),
shard.Limit(),
)
Expand Down
4 changes: 3 additions & 1 deletion adder/sharding/shard.go
Expand Up @@ -21,6 +21,7 @@ type shard struct {
rpc *rpc.Client
allocations []peer.ID
pinOptions api.PinOptions
ba *adder.BlockAdder
// dagNode represents a node with links and will be converted
// to Cbor.
dagNode map[string]cid.Cid
Expand Down Expand Up @@ -50,6 +51,7 @@ func newShard(ctx context.Context, rpc *rpc.Client, opts api.PinOptions) (*shard
rpc: rpc,
allocations: allocs,
pinOptions: opts,
ba: adder.NewBlockAdder(rpc, allocs),
dagNode: make(map[string]cid.Cid),
currentSize: 0,
sizeLimit: opts.ShardSize,
Expand Down Expand Up @@ -82,7 +84,7 @@ func (sh *shard) Flush(ctx context.Context, shardN int, prev cid.Cid) (cid.Cid,
return cid.Undef, err
}

err = putDAG(ctx, sh.rpc, nodes, sh.allocations)
err = sh.ba.AddMany(ctx, nodes)
if err != nil {
return cid.Undef, err
}
Expand Down

0 comments on commit 229953f

Please sign in to comment.