Skip to content

Commit

Permalink
fix: change the batch size to avoid buffering too much
Browse files Browse the repository at this point in the history
Otherwise, the max outstanding blocks/size depends on the number of cores.
  • Loading branch information
Stebalien committed Mar 30, 2020
1 parent 2c2e0d1 commit 80a00f4
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 10 deletions.
18 changes: 12 additions & 6 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import (
cid "github.com/ipfs/go-cid"
)

// ParallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// parallelBatchCommits is the number of batch commits that can be in-flight before blocking.
// TODO(ipfs/go-ipfs#4299): Experiment with multiple datastores, storage
// devices, and CPUs to find the right value/formula.
var ParallelBatchCommits = runtime.NumCPU() * 2
var parallelCommits = runtime.NumCPU()

// ErrNotCommited is returned when closing a batch that hasn't been successfully
// committed.
Expand All @@ -32,11 +32,15 @@ func NewBatch(ctx context.Context, na NodeAdder, opts ...BatchOption) *Batch {
for _, o := range opts {
o(&bopts)
}

// Commit numCPU batches at once, but split the maximum buffer size over all commits in flight.
bopts.maxSize /= parallelCommits
bopts.maxNodes /= parallelCommits
return &Batch{
na: na,
ctx: ctx,
cancel: cancel,
commitResults: make(chan error, ParallelBatchCommits),
commitResults: make(chan error, parallelCommits),
opts: bopts,
}
}
Expand Down Expand Up @@ -78,7 +82,7 @@ func (t *Batch) asyncCommit() {
if numBlocks == 0 {
return
}
if t.activeCommits >= ParallelBatchCommits {
if t.activeCommits >= parallelCommits {
select {
case err := <-t.commitResults:
t.activeCommits--
Expand Down Expand Up @@ -206,14 +210,16 @@ var defaultBatchOptions = batchOptions{
maxNodes: 128,
}

// MaxSizeBatchOption sets the maximum size of a Batch.
// MaxSizeBatchOption sets the maximum amount of buffered data before writing
// blocks.
func MaxSizeBatchOption(size int) BatchOption {
return func(o *batchOptions) {
o.maxSize = size
}
}

// MaxNodesBatchOption sets the maximum number of nodes in a Batch.
// MaxNodesBatchOption sets the maximum number of buffered nodes before writing
// blocks.
func MaxNodesBatchOption(num int) BatchOption {
return func(o *batchOptions) {
o.maxNodes = num
Expand Down
4 changes: 2 additions & 2 deletions batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ func TestBatchOptions(t *testing.T) {
wantMaxNodes := 500
d := newTestDag()
b := NewBatch(ctx, d, MaxSizeBatchOption(wantMaxSize), MaxNodesBatchOption(wantMaxNodes))
if b.opts.maxSize != wantMaxSize {
if b.opts.maxSize != wantMaxSize/parallelCommits {
t.Fatalf("maxSize incorrect, want: %d, got: %d", wantMaxSize, b.opts.maxSize)
}
if b.opts.maxNodes != wantMaxNodes {
if b.opts.maxNodes != wantMaxNodes/parallelCommits {
t.Fatalf("maxNodes incorrect, want: %d, got: %d", wantMaxNodes, b.opts.maxNodes)
}
}
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
module github.com/ipfs/go-ipld-format

go 1.14

require (
github.com/ipfs/go-block-format v0.0.2
github.com/ipfs/go-cid v0.0.2
github.com/libp2p/go-buffer-pool v0.0.2 // indirect
github.com/multiformats/go-multihash v0.0.1
)

go 1.13
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ github.com/ipfs/go-cid v0.0.2 h1:tuuKaZPU1M6HcejsO3AcYWW8sZ8MTvyxfc4uqB4eFE8=
github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-util v0.0.1 h1:Wz9bL2wB2YBJqggkA4dD7oSmqB4cAnpNbGrlHJulv50=
github.com/ipfs/go-ipfs-util v0.0.1/go.mod h1:spsl5z8KUnrve+73pOhSVZND1SIxPW5RyBCNzQxlJBc=
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
Expand Down

0 comments on commit 80a00f4

Please sign in to comment.