/
tx_batch.go
115 lines (97 loc) · 1.92 KB
/
tx_batch.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package boom
import (
"sync"
"go.mercari.io/datastore"
)
type TransactionBatch struct {
m sync.Mutex
bm *Boom
tx *Transaction
b *datastore.TransactionBatch
earlyErrors []error
}
func (b *TransactionBatch) Get(dst interface{}, h datastore.BatchErrHandler) {
keys, err := b.bm.extractKeys([]interface{}{dst})
if err != nil {
if h != nil {
err = h(err)
}
if err != nil {
b.m.Lock()
b.earlyErrors = append(b.earlyErrors, err)
b.m.Unlock()
}
return
}
b.b.Get(keys[0], dst, h)
}
func (b *TransactionBatch) Put(src interface{}, h datastore.TxBatchPutHandler) {
keys, err := b.bm.extractKeys([]interface{}{src})
if err != nil {
if h != nil {
err = h(nil, err)
}
if err != nil {
b.m.Lock()
b.earlyErrors = append(b.earlyErrors, err)
b.m.Unlock()
}
return
}
b.b.Put(keys[0], src, func(pKey datastore.PendingKey, err error) error {
b.tx.m.Lock()
defer b.tx.m.Unlock()
if err != nil {
if h != nil {
err = h(pKey, err)
}
if err != nil {
b.m.Lock()
b.earlyErrors = append(b.earlyErrors, err)
b.m.Unlock()
}
return err
}
if keys[0].Incomplete() {
b.tx.pendingKeysLater = append(b.tx.pendingKeysLater, &setKeyLater{
pendingKey: pKey,
src: src,
})
}
if h != nil {
return h(pKey, nil)
}
return nil
})
}
func (b *TransactionBatch) Delete(dst interface{}, h datastore.BatchErrHandler) {
keys, err := b.bm.extractKeys([]interface{}{dst})
if err != nil {
if h != nil {
err = h(err)
}
if err != nil {
b.m.Lock()
b.earlyErrors = append(b.earlyErrors, err)
b.m.Unlock()
}
return
}
b.b.Delete(keys[0], h)
}
func (b *TransactionBatch) Exec() error {
b.m.Lock()
defer b.m.Unlock()
err := b.b.Exec()
if merr, ok := err.(datastore.MultiError); ok {
merr = append(merr, b.earlyErrors...)
if len(merr) == 0 {
return nil
}
return merr
} else if err != nil {
return err
}
b.earlyErrors = nil
return nil
}