forked from etcd-io/etcd
-
Notifications
You must be signed in to change notification settings - Fork 0
/
batch_tx.go
135 lines (118 loc) · 3.04 KB
/
batch_tx.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package backend
import (
"bytes"
"log"
"sync"
"github.com/coreos/etcd/Godeps/_workspace/src/github.com/boltdb/bolt"
)
type BatchTx interface {
Lock()
Unlock()
UnsafeCreateBucket(name []byte)
UnsafePut(bucketName []byte, key []byte, value []byte)
UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte)
UnsafeDelete(bucketName []byte, key []byte)
Commit()
CommitAndStop()
}
type batchTx struct {
sync.Mutex
tx *bolt.Tx
backend *backend
pending int
}
func newBatchTx(backend *backend) *batchTx {
tx := &batchTx{backend: backend}
tx.Commit()
return tx
}
func (t *batchTx) UnsafeCreateBucket(name []byte) {
_, err := t.tx.CreateBucket(name)
if err != nil && err != bolt.ErrBucketExists {
log.Fatalf("storage: cannot create bucket %s (%v)", string(name), err)
}
}
// before calling unsafePut, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafePut(bucketName []byte, key []byte, value []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("storage: bucket %s does not exist", string(bucketName))
}
if err := bucket.Put(key, value); err != nil {
log.Fatalf("storage: cannot put key into bucket (%v)", err)
}
t.pending++
if t.pending >= t.backend.batchLimit {
t.commit(false)
t.pending = 0
}
}
// before calling unsafeRange, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafeRange(bucketName []byte, key, endKey []byte, limit int64) (keys [][]byte, vs [][]byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("storage: bucket %s does not exist", string(bucketName))
}
if len(endKey) == 0 {
if v := bucket.Get(key); v == nil {
return keys, vs
} else {
return append(keys, key), append(vs, v)
}
}
c := bucket.Cursor()
for ck, cv := c.Seek(key); ck != nil && bytes.Compare(ck, endKey) < 0; ck, cv = c.Next() {
vs = append(vs, cv)
keys = append(keys, ck)
if limit > 0 && limit == int64(len(keys)) {
break
}
}
return keys, vs
}
// before calling unsafeDelete, the caller MUST hold the lock on tx.
func (t *batchTx) UnsafeDelete(bucketName []byte, key []byte) {
bucket := t.tx.Bucket(bucketName)
if bucket == nil {
log.Fatalf("storage: bucket %s does not exist", string(bucketName))
}
err := bucket.Delete(key)
if err != nil {
log.Fatalf("storage: cannot delete key from bucket (%v)", err)
}
t.pending++
if t.pending >= t.backend.batchLimit {
t.commit(false)
t.pending = 0
}
}
// Commit commits a previous tx and begins a new writable one.
func (t *batchTx) Commit() {
t.Lock()
defer t.Unlock()
t.commit(false)
}
// CommitAndStop commits the previous tx and do not create a new one.
func (t *batchTx) CommitAndStop() {
t.Lock()
defer t.Unlock()
t.commit(true)
}
func (t *batchTx) commit(stop bool) {
var err error
// commit the last tx
if t.tx != nil {
err = t.tx.Commit()
if err != nil {
log.Fatalf("storage: cannot commit tx (%s)", err)
}
}
if stop {
return
}
// begin a new tx
t.tx, err = t.backend.db.Begin(true)
if err != nil {
log.Fatalf("storage: cannot begin tx (%s)", err)
}
}