forked from mercari/datastore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
transaction.go
191 lines (162 loc) · 5.25 KB
/
transaction.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
package boom
import (
"reflect"
"sync"
"go.mercari.io/datastore"
)
// Transaction represents a set of datastore operations to be committed atomically.
//
// Operations are enqueued by calling the Put and Delete methods on Transaction
// (or their Multi-equivalents). These operations are only committed when the
// Commit method is invoked. To ensure consistency, reads must be performed by
// using Transaction's Get method or by using the Transaction method when
// building a query.
//
// A Transaction must be committed or rolled back exactly once.
type Transaction struct {
m sync.Mutex
bm *Boom
tx datastore.Transaction
pendingKeysLater []*setKeyLater
}
// DatastoreTransaction returns datastore.Transaction that contains in boom's Transaction.
// This function should not be used unless you have a special reason.
func DatastoreTransaction(tx *Transaction) datastore.Transaction {
return tx.tx
}
type setKeyLater struct {
pendingKey datastore.PendingKey
src interface{}
}
// Boom object that is the source of the Batch object is returned.
func (tx *Transaction) Boom() *Boom {
return tx.bm
}
// Kind retrieves kind name from struct.
func (tx *Transaction) Kind(src interface{}) string {
return tx.bm.Kind(src)
}
// Key retrieves datastore key from struct without error occurred.
func (tx *Transaction) Key(src interface{}) datastore.Key {
return tx.bm.Key(src)
}
// KeyError retrieves datastore key from struct with error occurred.
func (tx *Transaction) KeyError(src interface{}) (datastore.Key, error) {
return tx.bm.KeyError(src)
}
// Get loads the entity stored for key into dst, which must be a struct pointer or implement PropertyLoadSaver.
// key will be extracted from dst.
//
// If there is no such entity for the key, Get returns ErrNoSuchEntity.
// The values of dst's unmatched struct fields are not modified, and matching slice-typed fields are not reset before appending to them.
// In particular, it is recommended to pass a pointer to a zero valued struct on each Get call.
func (tx *Transaction) Get(dst interface{}) error {
dsts := []interface{}{dst}
err := tx.GetMulti(dsts)
if merr, ok := err.(datastore.MultiError); ok {
return merr[0]
} else if err != nil {
return err
}
return nil
}
// GetMulti is a batch version of Get.
// key will be extracted from each struct of dst.
//
// dst must be a []S, []*S, []I or []P, for some struct type S, some interface type I, or some non-interface non-pointer type P such that P or *P implements PropertyLoadSaver.
// If an []I, each element must be a valid dst for Get: it must be a struct pointer or implement PropertyLoadSaver.
func (tx *Transaction) GetMulti(dst interface{}) error {
keys, err := tx.bm.extractKeys(dst)
if err != nil {
return err
}
return tx.tx.GetMulti(keys, dst)
}
// Put saves the entity src into the datastore.
// key will be extract from src struct.
// src must be a struct pointer or implement PropertyLoadSaver; if a struct pointer then any unexported fields of that struct will be skipped.
// If k is an incomplete key, the returned key will be a unique key generated by the datastore,
// and inject key to src struct.
func (tx *Transaction) Put(src interface{}) (datastore.PendingKey, error) {
srcs := []interface{}{src}
keys, err := tx.PutMulti(srcs)
if merr, ok := err.(datastore.MultiError); ok {
return nil, merr[0]
} else if err != nil {
return nil, err
}
return keys[0], nil
}
// PutMulti is a batch version of Put.
//
// src must satisfy the same conditions as the dst argument to GetMulti.
func (tx *Transaction) PutMulti(src interface{}) ([]datastore.PendingKey, error) {
keys, err := tx.bm.extractKeys(src)
if err != nil {
return nil, err
}
pKeys, err := tx.tx.PutMulti(keys, src)
if err != nil {
return nil, err
}
v := reflect.Indirect(reflect.ValueOf(src))
tx.m.Lock()
defer tx.m.Unlock()
for idx, pKey := range pKeys {
if !keys[idx].Incomplete() {
continue
}
tx.pendingKeysLater = append(tx.pendingKeysLater, &setKeyLater{
pendingKey: pKey,
src: v.Index(idx).Interface(),
})
}
return pKeys, nil
}
// Delete deletes the entity.
// key will be extract from src struct.
func (tx *Transaction) Delete(src interface{}) error {
srcs := []interface{}{src}
err := tx.DeleteMulti(srcs)
if merr, ok := err.(datastore.MultiError); ok {
return merr[0]
} else if err != nil {
return err
}
return nil
}
// DeleteMulti is a batch version of Delete.
func (tx *Transaction) DeleteMulti(src interface{}) error {
keys, err := tx.bm.extractKeys(src)
if err != nil {
return err
}
return tx.tx.DeleteMulti(keys)
}
// Commit applies the enqueued operations atomically.
func (tx *Transaction) Commit() (datastore.Commit, error) {
commit, err := tx.tx.Commit()
if err != nil {
return nil, err
}
tx.m.Lock()
defer tx.m.Unlock()
for _, s := range tx.pendingKeysLater {
key := commit.Key(s.pendingKey)
err = tx.bm.setStructKey(s.src, key)
if err != nil {
return nil, err
}
}
tx.pendingKeysLater = nil
return commit, nil
}
// Rollback abandons a pending transaction.
func (tx *Transaction) Rollback() error {
return tx.tx.Rollback()
}
// Batch creates batch mode objects.
func (tx *Transaction) Batch() *TransactionBatch {
b := tx.tx.Batch()
return &TransactionBatch{bm: tx.bm, tx: tx, b: b}
}