/
cyclic_batch.go
78 lines (69 loc) · 1.87 KB
/
cyclic_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package pstoreds
import (
"context"
"errors"
"fmt"
ds "github.com/ipfs/go-datastore"
)
// how many operations are queued in a cyclic batch before we flush it.
var defaultOpsPerCyclicBatch = 20
// cyclicBatch buffers ds write operations and automatically flushes them after defaultOpsPerCyclicBatch (20) have been
// queued. An explicit `Commit()` closes this cyclic batch, erroring all further operations.
//
// It is similar to go-ds autobatch, but it's driven by an actual Batch facility offered by the
// ds.
type cyclicBatch struct {
threshold int
ds.Batch
ds ds.Batching
pending int
}
func newCyclicBatch(ds ds.Batching, threshold int) (ds.Batch, error) {
batch, err := ds.Batch(context.TODO())
if err != nil {
return nil, err
}
return &cyclicBatch{Batch: batch, ds: ds}, nil
}
func (cb *cyclicBatch) cycle() (err error) {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if cb.pending < cb.threshold {
// we haven't reached the threshold yet.
return nil
}
// commit and renew the batch.
if err = cb.Batch.Commit(context.TODO()); err != nil {
return fmt.Errorf("failed while committing cyclic batch: %w", err)
}
if cb.Batch, err = cb.ds.Batch(context.TODO()); err != nil {
return fmt.Errorf("failed while renewing cyclic batch: %w", err)
}
return nil
}
func (cb *cyclicBatch) Put(ctx context.Context, key ds.Key, val []byte) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Put(ctx, key, val)
}
func (cb *cyclicBatch) Delete(ctx context.Context, key ds.Key) error {
if err := cb.cycle(); err != nil {
return err
}
cb.pending++
return cb.Batch.Delete(ctx, key)
}
func (cb *cyclicBatch) Commit(ctx context.Context) error {
if cb.Batch == nil {
return errors.New("cyclic batch is closed")
}
if err := cb.Batch.Commit(ctx); err != nil {
return err
}
cb.pending = 0
cb.Batch = nil
return nil
}