Skip to content

Commit

Permalink
add global config switch for sharding
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
  • Loading branch information
whyrusleeping committed Mar 23, 2017
1 parent e876434 commit 774715f
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 16 deletions.
4 changes: 4 additions & 0 deletions core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
cfg "github.com/ipfs/go-ipfs/repo/config"
uio "github.com/ipfs/go-ipfs/unixfs/io"

ci "gx/ipfs/QmPGxZ1DP2w45WcogpW1h43BvseXbfke9N91qotpoQcUeS/go-libp2p-crypto"
ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
Expand Down Expand Up @@ -175,6 +176,9 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
return err
}

// TEMP: setting global sharding switch here
uio.UseHAMTSharding = conf.Experimental.ShardingEnabled

opts.HasBloomFilterSize = conf.Datastore.BloomFilterSize
if !cfg.Permament {
opts.HasBloomFilterSize = 0
Expand Down
17 changes: 15 additions & 2 deletions core/commands/ls.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,24 @@ The JSON output contains type information.

output := make([]LsObject, len(req.Arguments()))
for i, dagnode := range dagnodes {
dir, err := uio.NewDirectoryFromNode(nd.DAG, dagnode)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

links, err := dir.Links()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

output[i] = LsObject{
Hash: paths[i],
Links: make([]LsLink, len(dagnode.Links())),
Links: make([]LsLink, len(links)),
}
for j, link := range dagnode.Links() {

for j, link := range links {
t := unixfspb.Data_DataType(-1)

linkNode, err := link.GetNode(req.Context(), dserv)
Expand Down
9 changes: 6 additions & 3 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,15 +190,18 @@ func (adder *Adder) PinRoot() error {
func (adder *Adder) Finalize() (node.Node, error) {
root := adder.mr.GetValue()

// cant just call adder.RootNode() here as we need the name for printing
rootNode, err := root.GetNode()
err := root.Flush()
if err != nil {
return nil, err
}

var name string
if !adder.Wrap {
name = rootNode.Links()[0].Name
children, err := root.(*mfs.Directory).ListNames()
if err != nil {
return nil, err
}
name = children[0]

dir, ok := adder.mr.GetValue().(*mfs.Directory)
if !ok {
Expand Down
9 changes: 3 additions & 6 deletions mfs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewDirectory(ctx context.Context, name string, node node.Node, parent child

// closeChild updates the child by the given name to the dag node 'nd'
// and changes its own dag node
func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (d *Directory) closeChild(name string, nd node.Node, sync bool) error {
mynd, err := d.closeChildUpdate(name, nd, sync)
if err != nil {
return err
Expand All @@ -72,7 +72,7 @@ func (d *Directory) closeChild(name string, nd *dag.ProtoNode, sync bool) error
}

// closeChildUpdate is the portion of closeChild that needs to be locked around
func (d *Directory) closeChildUpdate(name string, nd *dag.ProtoNode, sync bool) (*dag.ProtoNode, error) {
func (d *Directory) closeChildUpdate(name string, nd node.Node, sync bool) (*dag.ProtoNode, error) {
d.lock.Lock()
defer d.lock.Unlock()

Expand Down Expand Up @@ -329,13 +329,10 @@ func (d *Directory) Unlink(name string) error {
}

func (d *Directory) Flush() error {
d.lock.Lock()
nd, err := d.flushCurrentNode()
nd, err := d.GetNode()
if err != nil {
d.lock.Unlock()
return err
}
d.lock.Unlock()

return d.parent.closeChild(d.name, nd, true)
}
Expand Down
2 changes: 1 addition & 1 deletion mfs/mfs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,7 +752,7 @@ func TestMfsHugeDir(t *testing.T) {
defer cancel()
_, rt := setupRoot(ctx, t)

for i := 0; i < 100000; i++ {
for i := 0; i < 10000; i++ {
err := Mkdir(rt, fmt.Sprintf("/dir%d", i), false, false)
if err != nil {
t.Fatal(err)
Expand Down
4 changes: 2 additions & 2 deletions mfs/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ var log = logging.Logger("mfs")
var ErrIsDirectory = errors.New("error: is a directory")

type childCloser interface {
closeChild(string, *dag.ProtoNode, bool) error
closeChild(string, node.Node, bool) error
}

type NodeType int
Expand Down Expand Up @@ -124,7 +124,7 @@ func (kr *Root) Flush() error {

// closeChild implements the childCloser interface, and signals to the publisher that
// there are changes ready to be published
func (kr *Root) closeChild(name string, nd *dag.ProtoNode, sync bool) error {
func (kr *Root) closeChild(name string, nd node.Node, sync bool) error {
c, err := kr.dserv.Add(nd)
if err != nil {
return err
Expand Down
1 change: 1 addition & 0 deletions repo/config/experiments.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ package config

type Experiments struct {
FilestoreEnabled bool
ShardingEnabled bool
}
58 changes: 58 additions & 0 deletions test/sharness/t0260-sharding-flag.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
#!/bin/sh
#
# Copyright (c) 2014 Christian Couder
# MIT Licensed; see the LICENSE file in this repository.
#

test_description="Test global enable sharding flag"

. lib/test-lib.sh

test_expect_success "set up test data" '
mkdir testdata
for i in `seq 2000`
do
echo $i > testdata/file$i
done
'

test_add_large_dir() {
exphash="$1"
test_expect_success "ipfs add on very large directory succeeds" '
ipfs add -r -q testdata | tail -n1 > sharddir_out &&
echo "$exphash" > sharddir_exp &&
test_cmp sharddir_exp sharddir_out
'
}

test_init_ipfs

UNSHARDED="QmavrTrQG4VhoJmantURAYuw3bowq3E2WcvP36NRQDAC1N"
test_add_large_dir "$UNSHARDED"

test_launch_ipfs_daemon

test_add_large_dir "$UNSHARDED"

test_kill_ipfs_daemon

test_expect_success "enable sharding" '
ipfs config --json Experimental.ShardingEnabled true
'

SHARDED="QmSCJD1KYLhVVHqBK3YyXuoEqHt7vggyJhzoFYbT8v1XYL"
test_add_large_dir "$SHARDED"

test_launch_ipfs_daemon

test_add_large_dir "$SHARDED"

test_kill_ipfs_daemon

test_expect_success "sharded and unsharded output look the same" '
ipfs ls "$SHARDED" | sort > sharded_out &&
ipfs ls "$UNSHARDED" | sort > unsharded_out &&
test_cmp sharded_out unsharded_out
'

test_done
16 changes: 14 additions & 2 deletions unixfs/io/dirbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ import (
// result in the node being restructured into a sharded object.
var ShardSplitThreshold = 1000

// UseHAMTSharding is a global flag that signifies whether or not to use the
// HAMT sharding scheme for directory creation
var UseHAMTSharding = false

// DefaultShardWidth is the default value used for hamt sharding width.
var DefaultShardWidth = 256

Expand All @@ -31,7 +35,15 @@ type Directory struct {
func NewDirectory(dserv mdag.DAGService) *Directory {
db := new(Directory)
db.dserv = dserv
db.dirnode = format.EmptyDirNode()
if UseHAMTSharding {
s, err := hamt.NewHamtShard(dserv, DefaultShardWidth)
if err != nil {
panic(err) // will only panic if DefaultShardWidth is a bad value
}
db.shard = s
} else {
db.dirnode = format.EmptyDirNode()
}
return db
}

Expand Down Expand Up @@ -70,7 +82,7 @@ func NewDirectoryFromNode(dserv mdag.DAGService, nd node.Node) (*Directory, erro
// AddChild adds a (name, key)-pair to the root node.
func (d *Directory) AddChild(ctx context.Context, name string, nd node.Node) error {
if d.shard == nil {
if len(d.dirnode.Links()) < ShardSplitThreshold {
if !UseHAMTSharding {
_ = d.dirnode.RemoveNodeLink(name)
return d.dirnode.AddNodeLinkClean(name, nd)
}
Expand Down

0 comments on commit 774715f

Please sign in to comment.