From 4946aed228a795001767122ab3a6ad4a83390806 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Fri, 7 Dec 2018 11:22:53 +0100 Subject: [PATCH] Sharding: Add some tests for sharding codepaths License: MIT Signed-off-by: Hector Sanjuan --- add_test.go | 155 ++++++++++++++++++++++++++++++++++ adder/sharding/dag_service.go | 13 +-- api/types.go | 3 +- test/sharding.go | 40 +++++++-- 4 files changed, 200 insertions(+), 11 deletions(-) diff --git a/add_test.go b/add_test.go index 39002e824..5d9373c15 100644 --- a/add_test.go +++ b/add_test.go @@ -6,6 +6,7 @@ import ( "mime/multipart" "testing" + "github.com/ipfs/ipfs-cluster/adder/sharding" "github.com/ipfs/ipfs-cluster/api" "github.com/ipfs/ipfs-cluster/test" ) @@ -44,9 +45,163 @@ func TestAdd(t *testing.T) { } runF(t, clusters, f) + + // for next test, this needs to be unpinned. + err = clusters[0].Unpin(ci) + if err != nil { + t.Fatal(err) + } + pinDelay() + }) + + t.Run("shard", func(t *testing.T) { + params := api.DefaultAddParams() + params.ShardSize = 1024 * 300 // 300kB + params.Shard = true + params.Name = "testsharding" + // replication factor is -1, which doesn't make sense + // but allows to check things more easily. + mfr, closer := sth.GetTreeMultiReader(t) + defer closer.Close() + r := multipart.NewReader(mfr, mfr.Boundary()) + ci, err := clusters[0].AddFile(r, params) + if err != nil { + t.Fatal(err) + } + if ci.String() != test.ShardingDirBalancedRootCID { + t.Fatal("unexpected root CID for sharding add") + } + + pinDelay() + + // shardBlocks ignored, as tested in /sharding + _, err = sharding.VerifyShards(t, ci, clusters[0], clusters[0].ipfs, 14) + if err != nil { + t.Fatal(err) + } + err = clusters[0].Unpin(ci) + pinDelay() + f := func(t *testing.T, c *Cluster) { + pins := c.Pins() + if len(pins) > 0 { + t.Error("should have removed all pins from the state") + } + } + runF(t, clusters, f) }) } +func TestShardingRandFile(t *testing.T) { + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + params := api.DefaultAddParams() + params.ShardSize = 1024 * 1024 * 5 // 5 MB + params.Name = "testingFile" + params.Shard = true + + // Add a random 50MB file. Note size in kbs below. + mr, closer := sth.GetRandFileMultiReader(t, 1024*50) // 50 MB + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + + ci, err := clusters[0].AddFile(r, params) + if err != nil { + t.Fatal(err) + } + + pinDelay() + + // shardBlocks ignored, as tested in /sharding + // 11 shards as due to chunking and extra nodes + // things don't fit perfectly in 10*5MB shards. + _, err = sharding.VerifyShards(t, ci, clusters[0], clusters[0].ipfs, 11) + if err != nil { + t.Fatal(err) + } + err = clusters[0].Unpin(ci) + pinDelay() + f := func(t *testing.T, c *Cluster) { + pins := c.Pins() + if len(pins) > 0 { + t.Error("should have removed all pins from the state") + } + } + runF(t, clusters, f) +} + +func TestShardingUnpin(t *testing.T) { + countShards := func(pins []api.Pin) int { + n := 0 + for _, p := range pins { + if p.Type == api.ShardType { + n++ + } + } + return n + } + + clusters, mock := createClusters(t) + defer shutdownClusters(t, clusters, mock) + sth := test.NewShardingTestHelper() + defer sth.Clean(t) + + params := api.DefaultAddParams() + params.ShardSize = 1024 * 500 // 500 KB + params.Name = "testingFile" + params.Shard = true + + // Add a random file + mr, closer := sth.GetRandFileMultiReader(t, 1024*50) // 50 MB + defer closer.Close() + r := multipart.NewReader(mr, mr.Boundary()) + + ci, err := clusters[0].AddFile(r, params) + if err != nil { + t.Fatal(err) + } + t.Log("file1: ", ci) + shards1 := countShards(clusters[0].Pins()) + + pinDelay() + + // Add the same file again, except only the first half + params.Name = "testingFile2" + mr2, closer2 := sth.GetRandFileMultiReader(t, 1024*25) // half the size + defer closer2.Close() + r2 := multipart.NewReader(mr2, mr2.Boundary()) + ci2, err := clusters[0].AddFile(r2, params) + if err != nil { + t.Fatal(err) + } + t.Log("file2: ", ci2) + pinDelay() + + shards2 := countShards(clusters[0].Pins()) + if shards2 != shards1+1 { + // The last shard is different because the file + // was cut sooner in it. + t.Error("There should be only one new shard") + } + + // Unpin the first file: + // The shards from the second should stay pinned. + err = clusters[0].Unpin(ci) + if err != nil { + t.Fatal(err) + } + pinDelay() + + shards3 := countShards(clusters[0].Pins()) + + t.Logf("shards1: %d. 2: %d. 3: %d", shards1, shards2, shards3) + if shards3 != shards1/2 { + t.Error("half of the shards should still be pinned") + } +} + func TestAddPeerDown(t *testing.T) { clusters, mock := createClusters(t) defer shutdownClusters(t, clusters, mock) diff --git a/adder/sharding/dag_service.go b/adder/sharding/dag_service.go index d2c3d5ea7..96a54f0f0 100644 --- a/adder/sharding/dag_service.go +++ b/adder/sharding/dag_service.go @@ -7,7 +7,6 @@ import ( "context" "errors" "fmt" - "time" "github.com/ipfs/ipfs-cluster/adder" @@ -92,9 +91,13 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid, return lastCid, err } - if !lastCid.Equals(dataRoot) { - logger.Warningf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot) - } + // The IPFS Adder leaks the CID wrap. It's added to the DAGService, + // but the first children is returned when not wrapping. + // Thus, we comment this warning, but leave it as an explanation on why + // lastCid and dataRoot are different, also when adding a directory. + // if !lastCid.Equals(dataRoot) { + // logger.Warningf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot) + // } clusterDAGNodes, err := makeDAG(ctx, dgs.shards) if err != nil { @@ -191,7 +194,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err return adder.PutBlock(ctx, dgs.rpcClient, n, shard.Allocations()) } - logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d", + logger.Infof("shard #%d full: block: %d. shard: %d. limit: %d", len(dgs.shards), n.Size(), shard.Size(), diff --git a/api/types.go b/api/types.go index 23ce15937..1fa3c1c2c 100644 --- a/api/types.go +++ b/api/types.go @@ -598,7 +598,8 @@ const ( // which has been sharded. // They carry a Reference to the previous shard. // ShardTypes are pinned with MaxDepth=1 (root and - // direct children only). + // direct children only), or 2, if there were too many children + // for the shard and it's multileveled. ShardType ) diff --git a/test/sharding.go b/test/sharding.go index b6de9af1b..761e8aa79 100644 --- a/test/sharding.go +++ b/test/sharding.go @@ -22,8 +22,8 @@ var ( ShardingDirTrickleRootCID = "QmYMbx56GFNBDAaAMchtjmWjDTdqNKCSGuFxtRosiPgJL6" // These hashes should match all the blocks produced when adding - // the files resulting from GetShardingDir* - // They have been obtained by adding the "shardTesting" folder + // the files resulting from shardingTestDir/shadringTestTree folder. + // They have been obtained by adding the that folder // to go-ipfs (with wrap=true and default parameters). Then doing // `refs -r` on the result. It contains the wrapping folder hash. ShardingDirCids = [29]string{ @@ -112,12 +112,33 @@ func (sth *ShardingTestHelper) GetTreeSerialFile(t *testing.T) files.File { return sf } +type limReaderCloser struct { + io.Reader + io.Closer +} + // GetRandFileMultiReader creates and returns a MultiFileReader for // a testing random file of the given size (in kbs). The random -// file is different every time. +// file is created once. Subsequent calls will read the same test file +// up to the given kbs. func (sth *ShardingTestHelper) GetRandFileMultiReader(t *testing.T, kbs int) (*files.MultiFileReader, io.Closer) { - st := sth.makeRandFile(t, kbs) - sf, err := files.NewSerialFile("randomfile", sth.path(shardingTestFile), false, st) + var sf files.File + var err error + path := sth.path(shardingTestFile) + + if !fileExists(t, path) { + st := sth.makeRandFile(t, kbs) + sf, err = files.NewSerialFile("randomfile", path, false, st) + } else { + f, err2 := os.Open(path) + if err2 != nil { + t.Fatal(err) + } + + limReader := io.LimitReader(f, int64(kbs*1024)) + sf, err = files.NewReaderPathFile("randomfile", path, limReaderCloser{limReader, f}, nil) + } + if err != nil { t.Fatal(err) } @@ -144,6 +165,15 @@ func folderExists(t *testing.T, path string) bool { return true } +func fileExists(t *testing.T, path string) bool { + if _, err := os.Stat(path); os.IsNotExist(err) { + return false + } else if err != nil { + t.Fatal(err) + } + return true +} + func makeDir(t *testing.T, path string) { if !folderExists(t, path) { err := os.MkdirAll(path, os.ModePerm)