Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sharding: Add some tests for sharding codepaths #615

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
155 changes: 155 additions & 0 deletions add_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"mime/multipart"
"testing"

"github.com/ipfs/ipfs-cluster/adder/sharding"
"github.com/ipfs/ipfs-cluster/api"
"github.com/ipfs/ipfs-cluster/test"
)
Expand Down Expand Up @@ -44,9 +45,163 @@ func TestAdd(t *testing.T) {
}

runF(t, clusters, f)

// for next test, this needs to be unpinned.
err = clusters[0].Unpin(ci)
if err != nil {
t.Fatal(err)
}
pinDelay()
})

t.Run("shard", func(t *testing.T) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great if each of these tests could have a bit more explanation as to what they are testing as the testcase is hard to grok from the code.

params := api.DefaultAddParams()
params.ShardSize = 1024 * 300 // 300kB
params.Shard = true
params.Name = "testsharding"
// replication factor is -1, which doesn't make sense
// but allows to check things more easily.
mfr, closer := sth.GetTreeMultiReader(t)
defer closer.Close()
r := multipart.NewReader(mfr, mfr.Boundary())
ci, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}
if ci.String() != test.ShardingDirBalancedRootCID {
t.Fatal("unexpected root CID for sharding add")
}

pinDelay()

// shardBlocks ignored, as tested in /sharding
_, err = sharding.VerifyShards(t, ci, clusters[0], clusters[0].ipfs, 14)
if err != nil {
t.Fatal(err)
}
err = clusters[0].Unpin(ci)
pinDelay()
f := func(t *testing.T, c *Cluster) {
pins := c.Pins()
if len(pins) > 0 {
t.Error("should have removed all pins from the state")
}
}
runF(t, clusters, f)
})
}

func TestShardingRandFile(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

params := api.DefaultAddParams()
params.ShardSize = 1024 * 1024 * 5 // 5 MB
params.Name = "testingFile"
params.Shard = true

// Add a random 50MB file. Note size in kbs below.
mr, closer := sth.GetRandFileMultiReader(t, 1024*50) // 50 MB
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())

ci, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}

pinDelay()

// shardBlocks ignored, as tested in /sharding
// 11 shards as due to chunking and extra nodes
// things don't fit perfectly in 10*5MB shards.
_, err = sharding.VerifyShards(t, ci, clusters[0], clusters[0].ipfs, 11)
if err != nil {
t.Fatal(err)
}
err = clusters[0].Unpin(ci)
pinDelay()
f := func(t *testing.T, c *Cluster) {
pins := c.Pins()
if len(pins) > 0 {
t.Error("should have removed all pins from the state")
}
}
runF(t, clusters, f)
}

func TestShardingUnpin(t *testing.T) {
countShards := func(pins []api.Pin) int {
n := 0
for _, p := range pins {
if p.Type == api.ShardType {
n++
}
}
return n
}

clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
sth := test.NewShardingTestHelper()
defer sth.Clean(t)

params := api.DefaultAddParams()
params.ShardSize = 1024 * 500 // 500 KB
params.Name = "testingFile"
params.Shard = true

// Add a random file
mr, closer := sth.GetRandFileMultiReader(t, 1024*50) // 50 MB
defer closer.Close()
r := multipart.NewReader(mr, mr.Boundary())

ci, err := clusters[0].AddFile(r, params)
if err != nil {
t.Fatal(err)
}
t.Log("file1: ", ci)
shards1 := countShards(clusters[0].Pins())

pinDelay()

// Add the same file again, except only the first half
params.Name = "testingFile2"
mr2, closer2 := sth.GetRandFileMultiReader(t, 1024*25) // half the size
defer closer2.Close()
r2 := multipart.NewReader(mr2, mr2.Boundary())
ci2, err := clusters[0].AddFile(r2, params)
if err != nil {
t.Fatal(err)
}
t.Log("file2: ", ci2)
pinDelay()

shards2 := countShards(clusters[0].Pins())
if shards2 != shards1+1 {
// The last shard is different because the file
// was cut sooner in it.
t.Error("There should be only one new shard")
}

// Unpin the first file:
// The shards from the second should stay pinned.
err = clusters[0].Unpin(ci)
if err != nil {
t.Fatal(err)
}
pinDelay()

shards3 := countShards(clusters[0].Pins())

t.Logf("shards1: %d. 2: %d. 3: %d", shards1, shards2, shards3)
if shards3 != shards1/2 {
t.Error("half of the shards should still be pinned")
}
}

func TestAddPeerDown(t *testing.T) {
clusters, mock := createClusters(t)
defer shutdownClusters(t, clusters, mock)
Expand Down
13 changes: 8 additions & 5 deletions adder/sharding/dag_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"errors"
"fmt"

"time"

"github.com/ipfs/ipfs-cluster/adder"
Expand Down Expand Up @@ -92,9 +91,13 @@ func (dgs *DAGService) Finalize(ctx context.Context, dataRoot cid.Cid) (cid.Cid,
return lastCid, err
}

if !lastCid.Equals(dataRoot) {
logger.Warningf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot)
}
// The IPFS Adder leaks the CID wrap. It's added to the DAGService,
// but the first children is returned when not wrapping.
// Thus, we comment this warning, but leave it as an explanation on why
// lastCid and dataRoot are different, also when adding a directory.
// if !lastCid.Equals(dataRoot) {
// logger.Warningf("the last added CID (%s) is not the IPFS data root (%s). This is only normal when adding a single file without wrapping in directory.", lastCid, dataRoot)
// }

clusterDAGNodes, err := makeDAG(ctx, dgs.shards)
if err != nil {
Expand Down Expand Up @@ -191,7 +194,7 @@ func (dgs *DAGService) ingestBlock(ctx context.Context, n *api.NodeWithMeta) err
return adder.PutBlock(ctx, dgs.rpcClient, n, shard.Allocations())
}

logger.Debugf("shard %d full: block: %d. shard: %d. limit: %d",
logger.Infof("shard #%d full: block: %d. shard: %d. limit: %d",
len(dgs.shards),
n.Size(),
shard.Size(),
Expand Down
3 changes: 2 additions & 1 deletion api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,8 @@ const (
// which has been sharded.
// They carry a Reference to the previous shard.
// ShardTypes are pinned with MaxDepth=1 (root and
// direct children only).
// direct children only), or 2, if there were too many children
// for the shard and it's multileveled.
ShardType
)

Expand Down
40 changes: 35 additions & 5 deletions test/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ var (
ShardingDirTrickleRootCID = "QmYMbx56GFNBDAaAMchtjmWjDTdqNKCSGuFxtRosiPgJL6"

// These hashes should match all the blocks produced when adding
// the files resulting from GetShardingDir*
// They have been obtained by adding the "shardTesting" folder
// the files resulting from shardingTestDir/shadringTestTree folder.
// They have been obtained by adding the that folder
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the that folder

???

// to go-ipfs (with wrap=true and default parameters). Then doing
// `refs -r` on the result. It contains the wrapping folder hash.
ShardingDirCids = [29]string{
Expand Down Expand Up @@ -112,12 +112,33 @@ func (sth *ShardingTestHelper) GetTreeSerialFile(t *testing.T) files.File {
return sf
}

type limReaderCloser struct {
io.Reader
io.Closer
}

// GetRandFileMultiReader creates and returns a MultiFileReader for
// a testing random file of the given size (in kbs). The random
// file is different every time.
// file is created once. Subsequent calls will read the same test file
// up to the given kbs.
func (sth *ShardingTestHelper) GetRandFileMultiReader(t *testing.T, kbs int) (*files.MultiFileReader, io.Closer) {
st := sth.makeRandFile(t, kbs)
sf, err := files.NewSerialFile("randomfile", sth.path(shardingTestFile), false, st)
var sf files.File
var err error
path := sth.path(shardingTestFile)

if !fileExists(t, path) {
st := sth.makeRandFile(t, kbs)
sf, err = files.NewSerialFile("randomfile", path, false, st)
} else {
f, err2 := os.Open(path)
if err2 != nil {
t.Fatal(err)
}

limReader := io.LimitReader(f, int64(kbs*1024))
sf, err = files.NewReaderPathFile("randomfile", path, limReaderCloser{limReader, f}, nil)
}

if err != nil {
t.Fatal(err)
}
Expand All @@ -144,6 +165,15 @@ func folderExists(t *testing.T, path string) bool {
return true
}

func fileExists(t *testing.T, path string) bool {
if _, err := os.Stat(path); os.IsNotExist(err) {
return false
} else if err != nil {
t.Fatal(err)
}
return true
}

func makeDir(t *testing.T, path string) {
if !folderExists(t, path) {
err := os.MkdirAll(path, os.ModePerm)
Expand Down