/
commit_handling.go
312 lines (272 loc) · 8.58 KB
/
commit_handling.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package statecouchdb
import (
"fmt"
"math"
"sync"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
"github.com/pkg/errors"
)
type committer struct {
db *couchdb.CouchDatabase
batchUpdateMap map[string]*batchableDocument
namespace string
cacheKVs statedb.CacheKVs
cacheEnabled bool
}
func (c *committer) addToCacheUpdate(kv *keyValue) {
if !c.cacheEnabled {
return
}
if kv.Value == nil {
// nil value denotes a delete operation
c.cacheKVs[kv.key] = nil
return
}
c.cacheKVs[kv.key] = &statedb.CacheValue{
VersionBytes: kv.Version.ToBytes(),
Value: kv.Value,
Metadata: kv.Metadata,
AdditionalInfo: []byte(kv.revision),
}
}
func (c *committer) updateRevisionInCacheUpdate(key, rev string) {
if !c.cacheEnabled {
return
}
cv := c.cacheKVs[key]
if cv == nil {
// nil value denotes a delete
return
}
cv.AdditionalInfo = []byte(rev)
}
// buildCommitters builds committers per namespace. Each committer transforms the
// given batch in the form of underlying db and keep it in memory.
func (vdb *VersionedDB) buildCommitters(updates *statedb.UpdateBatch) ([]*committer, error) {
namespaces := updates.GetUpdatedNamespaces()
// for each namespace, we build multiple committers (based on maxBatchSize per namespace)
var wg sync.WaitGroup
nsCommittersChan := make(chan []*committer, len(namespaces))
defer close(nsCommittersChan)
errsChan := make(chan error, len(namespaces))
defer close(errsChan)
// for each namespace, we build committers in parallel. This is because,
// the committer building process requires fetching of missing revisions
// that in turn, we want to do in parallel
for _, ns := range namespaces {
nsUpdates := updates.GetUpdates(ns)
wg.Add(1)
go func(ns string) {
defer wg.Done()
committers, err := vdb.buildCommittersForNs(ns, nsUpdates)
if err != nil {
errsChan <- err
return
}
nsCommittersChan <- committers
}(ns)
}
wg.Wait()
// collect all committers
var allCommitters []*committer
select {
case err := <-errsChan:
return nil, errors.WithStack(err)
default:
for i := 0; i < len(namespaces); i++ {
allCommitters = append(allCommitters, <-nsCommittersChan...)
}
}
return allCommitters, nil
}
func (vdb *VersionedDB) buildCommittersForNs(ns string, nsUpdates map[string]*statedb.VersionedValue) ([]*committer, error) {
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return nil, err
}
// for each namespace, build mutiple committers based on the maxBatchSize
maxBatchSize := db.CouchInstance.MaxBatchUpdateSize()
numCommitters := 1
if maxBatchSize > 0 {
numCommitters = int(math.Ceil(float64(len(nsUpdates)) / float64(maxBatchSize)))
}
committers := make([]*committer, numCommitters)
cacheEnabled := vdb.cache.Enabled(ns)
for i := 0; i < numCommitters; i++ {
committers[i] = &committer{
db: db,
batchUpdateMap: make(map[string]*batchableDocument),
namespace: ns,
cacheKVs: make(statedb.CacheKVs),
cacheEnabled: cacheEnabled,
}
}
// for each committer, create a couchDoc per key-value pair present in the update batch
// which are associated with the committer's namespace.
revisions, err := vdb.getRevisions(ns, nsUpdates)
if err != nil {
return nil, err
}
i := 0
for key, vv := range nsUpdates {
kv := &keyValue{key: key, revision: revisions[key], VersionedValue: vv}
couchDoc, err := keyValToCouchDoc(kv)
if err != nil {
return nil, err
}
committers[i].batchUpdateMap[key] = &batchableDocument{CouchDoc: *couchDoc, Deleted: vv.Value == nil}
committers[i].addToCacheUpdate(kv)
if maxBatchSize > 0 && len(committers[i].batchUpdateMap) == maxBatchSize {
i++
}
}
return committers, nil
}
func (vdb *VersionedDB) executeCommitter(committers []*committer) error {
errsChan := make(chan error, len(committers))
defer close(errsChan)
var wg sync.WaitGroup
wg.Add(len(committers))
for _, c := range committers {
go func(c *committer) {
defer wg.Done()
if err := c.commitUpdates(); err != nil {
errsChan <- err
}
}(c)
}
wg.Wait()
select {
case err := <-errsChan:
return errors.WithStack(err)
default:
return nil
}
}
// commitUpdates commits the given updates to couchdb
func (c *committer) commitUpdates() error {
docs := []*couchdb.CouchDoc{}
for _, update := range c.batchUpdateMap {
docs = append(docs, &update.CouchDoc)
}
// Do the bulk update into couchdb. Note that this will do retries if the entire bulk update fails or times out
responses, err := c.db.BatchUpdateDocuments(docs)
if err != nil {
return err
}
// IF INDIVIDUAL DOCUMENTS IN THE BULK UPDATE DID NOT SUCCEED, TRY THEM INDIVIDUALLY
// iterate through the response from CouchDB by document
for _, resp := range responses {
// If the document returned an error, retry the individual document
if resp.Ok == true {
c.updateRevisionInCacheUpdate(resp.ID, resp.Rev)
continue
}
doc := c.batchUpdateMap[resp.ID]
var err error
//Remove the "_rev" from the JSON before saving
//this will allow the CouchDB retry logic to retry revisions without encountering
//a mismatch between the "If-Match" and the "_rev" tag in the JSON
if doc.CouchDoc.JSONValue != nil {
err = removeJSONRevision(&doc.CouchDoc.JSONValue)
if err != nil {
return err
}
}
// Check to see if the document was added to the batch as a delete type document
if doc.Deleted {
logger.Warningf("CouchDB batch document delete encountered an problem. Retrying delete for document ID:%s", resp.ID)
// If this is a deleted document, then retry the delete
// If the delete fails due to a document not being found (404 error),
// the document has already been deleted and the DeleteDoc will not return an error
err = c.db.DeleteDoc(resp.ID, "")
} else {
logger.Warningf("CouchDB batch document update encountered an problem. Reason:%s, Retrying update for document ID:%s", resp.Reason, resp.ID)
// Save the individual document to couchdb
// Note that this will do retries as needed
var revision string
revision, err = c.db.SaveDoc(resp.ID, "", &doc.CouchDoc)
c.updateRevisionInCacheUpdate(resp.ID, revision)
}
// If the single document update or delete returns an error, then throw the error
if err != nil {
errorString := fmt.Sprintf("error saving document ID: %v. Error: %s, Reason: %s",
resp.ID, resp.Error, resp.Reason)
logger.Errorf(errorString)
return errors.WithMessage(err, errorString)
}
}
return nil
}
func (vdb *VersionedDB) getRevisions(ns string, nsUpdates map[string]*statedb.VersionedValue) (map[string]string, error) {
revisions := make(map[string]string)
nsRevs := vdb.committedDataCache.revs[ns]
var missingKeys []string
var ok bool
for key := range nsUpdates {
if revisions[key], ok = nsRevs[key]; !ok {
missingKeys = append(missingKeys, key)
}
}
if len(missingKeys) == 0 {
// all revisions were present in the committedDataCache
return revisions, nil
}
missingKeys, err := vdb.addMissingRevisionsFromCache(ns, missingKeys, revisions)
if err != nil {
return nil, err
}
if len(missingKeys) == 0 {
// remaining revisions were present in the state cache
return revisions, nil
}
// don't update the cache for missing entries as
// revisions are going to get changed after the commit
if err := vdb.addMissingRevisionsFromDB(ns, missingKeys, revisions); err != nil {
return nil, err
}
return revisions, nil
}
func (vdb *VersionedDB) addMissingRevisionsFromCache(ns string, keys []string, revs map[string]string) ([]string, error) {
if !vdb.cache.Enabled(ns) {
return keys, nil
}
var missingKeys []string
for _, k := range keys {
cv, err := vdb.cache.GetState(vdb.chainName, ns, k)
if err != nil {
return nil, err
}
if cv == nil {
missingKeys = append(missingKeys, k)
continue
}
revs[k] = string(cv.AdditionalInfo)
}
return missingKeys, nil
}
func (vdb *VersionedDB) addMissingRevisionsFromDB(ns string, missingKeys []string, revisions map[string]string) error {
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return err
}
logger.Debugf("Pulling revisions for the [%d] keys for namsespace [%s] that were not part of the readset", len(missingKeys), db.DBName)
retrievedMetadata, err := retrieveNsMetadata(db, missingKeys)
if err != nil {
return err
}
for _, metadata := range retrievedMetadata {
revisions[metadata.ID] = metadata.Rev
}
return nil
}
//batchableDocument defines a document for a batch
type batchableDocument struct {
CouchDoc couchdb.CouchDoc
Deleted bool
}