-
Notifications
You must be signed in to change notification settings - Fork 0
/
db.go
472 lines (415 loc) · 18.2 KB
/
db.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package privacyenabledstate
import (
"encoding/base64"
"strings"
"github.com/hyperledger/fabric-lib-go/healthz"
"github.com/osdi23p228/fabric/common/flogging"
"github.com/osdi23p228/fabric/common/metrics"
"github.com/osdi23p228/fabric/core/common/ccprovider"
"github.com/osdi23p228/fabric/core/ledger"
"github.com/osdi23p228/fabric/core/ledger/cceventmgmt"
"github.com/osdi23p228/fabric/core/ledger/internal/version"
"github.com/osdi23p228/fabric/core/ledger/kvledger/bookkeeping"
"github.com/osdi23p228/fabric/core/ledger/kvledger/txmgmt/statedb"
"github.com/osdi23p228/fabric/core/ledger/kvledger/txmgmt/statedb/statecouchdb"
"github.com/osdi23p228/fabric/core/ledger/kvledger/txmgmt/statedb/stateleveldb"
"github.com/osdi23p228/fabric/core/ledger/util"
"github.com/pkg/errors"
)
var logger = flogging.MustGetLogger("privacyenabledstate")
const (
nsJoiner = "$$"
pvtDataPrefix = "p"
hashDataPrefix = "h"
couchDB = "CouchDB"
)
// StateDBConfig encapsulates the configuration for stateDB on the ledger.
type StateDBConfig struct {
// ledger.StateDBConfig is used to configure the stateDB for the ledger.
*ledger.StateDBConfig
// LevelDBPath is the filesystem path when statedb type is "goleveldb".
// It is internally computed by the ledger component,
// so it is not in ledger.StateDBConfig and not exposed to other components.
LevelDBPath string
}
// DBProvider encapsulates other providers such as VersionedDBProvider and
// BookeepingProvider which are required to create DB for a channel
type DBProvider struct {
VersionedDBProvider statedb.VersionedDBProvider
HealthCheckRegistry ledger.HealthCheckRegistry
bookkeepingProvider bookkeeping.Provider
}
// NewDBProvider constructs an instance of DBProvider
func NewDBProvider(
bookkeeperProvider bookkeeping.Provider,
metricsProvider metrics.Provider,
healthCheckRegistry ledger.HealthCheckRegistry,
stateDBConf *StateDBConfig,
sysNamespaces []string,
) (*DBProvider, error) {
var vdbProvider statedb.VersionedDBProvider
var err error
if stateDBConf != nil && stateDBConf.StateDatabase == couchDB {
if vdbProvider, err = statecouchdb.NewVersionedDBProvider(stateDBConf.CouchDB, metricsProvider, sysNamespaces); err != nil {
return nil, err
}
} else {
if vdbProvider, err = stateleveldb.NewVersionedDBProvider(stateDBConf.LevelDBPath); err != nil {
return nil, err
}
}
dbProvider := &DBProvider{vdbProvider, healthCheckRegistry, bookkeeperProvider}
err = dbProvider.RegisterHealthChecker()
if err != nil {
return nil, err
}
return dbProvider, nil
}
// RegisterHealthChecker registers the underlying stateDB with the healthChecker.
// For now, we register only the CouchDB as it runs as a separate process but not
// for the GoLevelDB as it is an embedded database.
func (p *DBProvider) RegisterHealthChecker() error {
if healthChecker, ok := p.VersionedDBProvider.(healthz.HealthChecker); ok {
return p.HealthCheckRegistry.RegisterChecker("couchdb", healthChecker)
}
return nil
}
// GetDBHandle gets a handle to DB for a given id, i.e., a channel
func (p *DBProvider) GetDBHandle(id string, chInfoProvider channelInfoProvider) (*DB, error) {
vdb, err := p.VersionedDBProvider.GetDBHandle(id, &namespaceProvider{chInfoProvider})
if err != nil {
return nil, err
}
bookkeeper := p.bookkeepingProvider.GetDBHandle(id, bookkeeping.MetadataPresenceIndicator)
metadataHint, err := newMetadataHint(bookkeeper)
if err != nil {
return nil, err
}
return NewDB(vdb, id, metadataHint)
}
// Close closes all the VersionedDB instances and releases any resources held by VersionedDBProvider
func (p *DBProvider) Close() {
p.VersionedDBProvider.Close()
}
// DB uses a single database to maintain both the public and private data
type DB struct {
statedb.VersionedDB
metadataHint *metadataHint
}
// NewDB wraps a VersionedDB instance. The public data is managed directly by the wrapped versionedDB.
// For managing the hashed data and private data, this implementation creates separate namespaces in the wrapped db
func NewDB(vdb statedb.VersionedDB, ledgerid string, metadataHint *metadataHint) (*DB, error) {
return &DB{vdb, metadataHint}, nil
}
// IsBulkOptimizable checks whether the underlying statedb implements statedb.BulkOptimizable
func (s *DB) IsBulkOptimizable() bool {
_, ok := s.VersionedDB.(statedb.BulkOptimizable)
return ok
}
// LoadCommittedVersionsOfPubAndHashedKeys loads committed version of given public and hashed states
func (s *DB) LoadCommittedVersionsOfPubAndHashedKeys(pubKeys []*statedb.CompositeKey,
hashedKeys []*HashedCompositeKey) error {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if !ok {
return nil
}
// Here, hashedKeys are merged into pubKeys to get a combined set of keys for combined loading
for _, key := range hashedKeys {
ns := deriveHashedDataNs(key.Namespace, key.CollectionName)
// No need to check for duplicates as hashedKeys are in separate namespace
var keyHashStr string
if !s.BytesKeySupported() {
keyHashStr = base64.StdEncoding.EncodeToString([]byte(key.KeyHash))
} else {
keyHashStr = key.KeyHash
}
pubKeys = append(pubKeys, &statedb.CompositeKey{
Namespace: ns,
Key: keyHashStr,
})
}
err := bulkOptimizable.LoadCommittedVersions(pubKeys)
if err != nil {
return err
}
return nil
}
// ClearCachedVersions clears the version cache
func (s *DB) ClearCachedVersions() {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if ok {
bulkOptimizable.ClearCachedVersions()
}
}
// GetChaincodeEventListener returns a struct that implements cceventmgmt.ChaincodeLifecycleEventListener
// if the underlying statedb implements statedb.IndexCapable.
func (s *DB) GetChaincodeEventListener() cceventmgmt.ChaincodeLifecycleEventListener {
_, ok := s.VersionedDB.(statedb.IndexCapable)
if ok {
return s
}
return nil
}
// GetPrivateData gets the value of a private data item identified by a tuple <namespace, collection, key>
func (s *DB) GetPrivateData(namespace, collection, key string) (*statedb.VersionedValue, error) {
return s.GetState(derivePvtDataNs(namespace, collection), key)
}
// GetPrivateDataHash gets the hash of the value of a private data item identified by a tuple <namespace, collection, key>
func (s *DB) GetPrivateDataHash(namespace, collection, key string) (*statedb.VersionedValue, error) {
return s.GetValueHash(namespace, collection, util.ComputeStringHash(key))
}
// GetPrivateDataHash gets the value hash of a private data item identified by a tuple <namespace, collection, keyHash>
func (s *DB) GetValueHash(namespace, collection string, keyHash []byte) (*statedb.VersionedValue, error) {
keyHashStr := string(keyHash)
if !s.BytesKeySupported() {
keyHashStr = base64.StdEncoding.EncodeToString(keyHash)
}
return s.GetState(deriveHashedDataNs(namespace, collection), keyHashStr)
}
// GetKeyHashVersion gets the version of a private data item identified by a tuple <namespace, collection, keyHash>
func (s *DB) GetKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, error) {
keyHashStr := string(keyHash)
if !s.BytesKeySupported() {
keyHashStr = base64.StdEncoding.EncodeToString(keyHash)
}
return s.GetVersion(deriveHashedDataNs(namespace, collection), keyHashStr)
}
// GetCachedKeyHashVersion retrieves the keyhash version from cache
func (s *DB) GetCachedKeyHashVersion(namespace, collection string, keyHash []byte) (*version.Height, bool) {
bulkOptimizable, ok := s.VersionedDB.(statedb.BulkOptimizable)
if !ok {
return nil, false
}
keyHashStr := string(keyHash)
if !s.BytesKeySupported() {
keyHashStr = base64.StdEncoding.EncodeToString(keyHash)
}
return bulkOptimizable.GetCachedVersion(deriveHashedDataNs(namespace, collection), keyHashStr)
}
// GetPrivateDataMultipleKeys gets the values for the multiple private data items in a single call
func (s *DB) GetPrivateDataMultipleKeys(namespace, collection string, keys []string) ([]*statedb.VersionedValue, error) {
return s.GetStateMultipleKeys(derivePvtDataNs(namespace, collection), keys)
}
// GetPrivateDataRangeScanIterator returns an iterator that contains all the key-values between given key ranges.
// startKey is included in the results and endKey is excluded.
func (s *DB) GetPrivateDataRangeScanIterator(namespace, collection, startKey, endKey string) (statedb.ResultsIterator, error) {
return s.GetStateRangeScanIterator(derivePvtDataNs(namespace, collection), startKey, endKey)
}
// ExecuteQuery executes the given query and returns an iterator that contains results of type specific to the underlying data store.
func (s DB) ExecuteQueryOnPrivateData(namespace, collection, query string) (statedb.ResultsIterator, error) {
return s.ExecuteQuery(derivePvtDataNs(namespace, collection), query)
}
// ApplyUpdates overrides the function in statedb.VersionedDB and throws appropriate error message
// Otherwise, somewhere in the code, usage of this function could lead to updating only public data.
func (s *DB) ApplyUpdates(batch *statedb.UpdateBatch, height *version.Height) error {
return errors.New("this function should not be invoked on this type. Please invoke function ApplyPrivacyAwareUpdates")
}
// ApplyPrivacyAwareUpdates applies the batch to the underlying db
func (s *DB) ApplyPrivacyAwareUpdates(updates *UpdateBatch, height *version.Height) error {
// combinedUpdates includes both updates to public db and private db, which are partitioned by a separate namespace
combinedUpdates := updates.PubUpdates
addPvtUpdates(combinedUpdates, updates.PvtUpdates)
addHashedUpdates(combinedUpdates, updates.HashUpdates, !s.BytesKeySupported())
s.metadataHint.setMetadataUsedFlag(updates)
return s.VersionedDB.ApplyUpdates(combinedUpdates.UpdateBatch, height)
}
// GetStateMetadata implements corresponding function in interface DB. This implementation provides
// an optimization such that it keeps track if a namespaces has never stored metadata for any of
// its items, the value 'nil' is returned without going to the db. This is intended to be invoked
// in the validation and commit path. This saves the chaincodes from paying unnecessary performance
// penalty if they do not use features that leverage metadata (such as key-level endorsement),
func (s *DB) GetStateMetadata(namespace, key string) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetState(namespace, key)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}
// GetPrivateDataMetadataByHash implements corresponding function in interface DB. For additional details, see
// description of the similar function 'GetStateMetadata'
func (s *DB) GetPrivateDataMetadataByHash(namespace, collection string, keyHash []byte) ([]byte, error) {
if !s.metadataHint.metadataEverUsedFor(namespace) {
return nil, nil
}
vv, err := s.GetValueHash(namespace, collection, keyHash)
if err != nil || vv == nil {
return nil, err
}
return vv.Metadata, nil
}
// HandleChaincodeDeploy initializes database artifacts for the database associated with the namespace
// This function deliberately suppresses the errors that occur during the creation of the indexes on couchdb.
// This is because, in the present code, we do not differentiate between the errors because of couchdb interaction
// and the errors because of bad index files - the later being unfixable by the admin. Note that the error suppression
// is acceptable since peer can continue in the committing role without the indexes. However, executing chaincode queries
// may be affected, until a new chaincode with fixed indexes is installed and instantiated
func (s *DB) HandleChaincodeDeploy(chaincodeDefinition *cceventmgmt.ChaincodeDefinition, dbArtifactsTar []byte) error {
//Check to see if the interface for IndexCapable is implemented
indexCapable, ok := s.VersionedDB.(statedb.IndexCapable)
if !ok {
return nil
}
if chaincodeDefinition == nil {
return errors.New("chaincode definition not found while creating couchdb index")
}
dbArtifacts, err := ccprovider.ExtractFileEntries(dbArtifactsTar, indexCapable.GetDBType())
if err != nil {
logger.Errorf("Index creation: error extracting db artifacts from tar for chaincode [%s]: %s", chaincodeDefinition.Name, err)
return nil
}
collectionConfigMap, err := extractCollectionNames(chaincodeDefinition)
if err != nil {
logger.Errorf("Error while retrieving collection config for chaincode=[%s]: %s",
chaincodeDefinition.Name, err)
return nil
}
for directoryPath, indexFiles := range dbArtifacts {
indexFilesData := make(map[string][]byte)
for _, f := range indexFiles {
indexFilesData[f.FileHeader.Name] = f.FileContent
}
indexInfo := getIndexInfo(directoryPath)
switch {
case indexInfo.hasIndexForChaincode:
err := indexCapable.ProcessIndexesForChaincodeDeploy(chaincodeDefinition.Name, indexFilesData)
if err != nil {
logger.Errorf("Error processing index for chaincode [%s]: %s", chaincodeDefinition.Name, err)
}
case indexInfo.hasIndexForCollection:
_, ok := collectionConfigMap[indexInfo.collectionName]
if !ok {
logger.Errorf("Error processing index for chaincode [%s]: cannot create an index for an undefined collection=[%s]",
chaincodeDefinition.Name, indexInfo.collectionName)
continue
}
err := indexCapable.ProcessIndexesForChaincodeDeploy(derivePvtDataNs(chaincodeDefinition.Name, indexInfo.collectionName), indexFilesData)
if err != nil {
logger.Errorf("Error processing collection index for chaincode [%s]: %s", chaincodeDefinition.Name, err)
}
}
}
return nil
}
// ChaincodeDeployDone is a noop for couchdb state impl
func (s *DB) ChaincodeDeployDone(succeeded bool) {
// NOOP
}
func derivePvtDataNs(namespace, collection string) string {
return namespace + nsJoiner + pvtDataPrefix + collection
}
func deriveHashedDataNs(namespace, collection string) string {
return namespace + nsJoiner + hashDataPrefix + collection
}
func isPvtdataNs(namespace string) bool {
return strings.Contains(namespace, nsJoiner+pvtDataPrefix)
}
func isHashedDataNs(namespace string) bool {
return strings.Contains(namespace, nsJoiner+hashDataPrefix)
}
func addPvtUpdates(pubUpdateBatch *PubUpdateBatch, pvtUpdateBatch *PvtUpdateBatch) {
for ns, nsBatch := range pvtUpdateBatch.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
for key, vv := range nsBatch.GetUpdates(coll) {
pubUpdateBatch.Update(derivePvtDataNs(ns, coll), key, vv)
}
}
}
}
func addHashedUpdates(pubUpdateBatch *PubUpdateBatch, hashedUpdateBatch *HashedUpdateBatch, base64Key bool) {
for ns, nsBatch := range hashedUpdateBatch.UpdateMap {
for _, coll := range nsBatch.GetCollectionNames() {
for key, vv := range nsBatch.GetUpdates(coll) {
if base64Key {
key = base64.StdEncoding.EncodeToString([]byte(key))
}
pubUpdateBatch.Update(deriveHashedDataNs(ns, coll), key, vv)
}
}
}
}
func extractCollectionNames(chaincodeDefinition *cceventmgmt.ChaincodeDefinition) (map[string]bool, error) {
collectionConfigs := chaincodeDefinition.CollectionConfigs
collectionConfigsMap := make(map[string]bool)
if collectionConfigs != nil {
for _, config := range collectionConfigs.Config {
sConfig := config.GetStaticCollectionConfig()
if sConfig == nil {
continue
}
collectionConfigsMap[sConfig.Name] = true
}
}
return collectionConfigsMap, nil
}
type indexInfo struct {
hasIndexForChaincode bool
hasIndexForCollection bool
collectionName string
}
const (
// Example for chaincode indexes:
// "META-INF/statedb/couchdb/indexes/indexColorSortName.json"
chaincodeIndexDirDepth = 3
// Example for collection scoped indexes:
// "META-INF/statedb/couchdb/collections/collectionMarbles/indexes/indexCollMarbles.json"
collectionDirDepth = 3
collectionNameDepth = 4
collectionIndexDirDepth = 5
)
func getIndexInfo(indexPath string) *indexInfo {
indexInfo := &indexInfo{}
dirsDepth := strings.Split(indexPath, "/")
switch {
case len(dirsDepth) > chaincodeIndexDirDepth &&
dirsDepth[chaincodeIndexDirDepth] == "indexes":
indexInfo.hasIndexForChaincode = true
case len(dirsDepth) > collectionDirDepth &&
dirsDepth[collectionDirDepth] == "collections" &&
dirsDepth[collectionIndexDirDepth] == "indexes":
indexInfo.hasIndexForCollection = true
indexInfo.collectionName = dirsDepth[collectionNameDepth]
}
return indexInfo
}
// channelInfoProvider interface enables the privateenabledstate package to retrieve all the config blocks
// and namespaces and collections.
type channelInfoProvider interface {
// NamespacesAndCollections returns namespaces and collections for the channel.
NamespacesAndCollections(vdb statedb.VersionedDB) (map[string][]string, error)
}
// namespaceProvider implements statedb.NamespaceProvider interface
type namespaceProvider struct {
channelInfoProvider
}
// PossibleNamespaces returns all possible namespaces for a channel. In ledger, a private data namespace is
// created only if the peer is a member of the collection or owns the implicit collection. However, this function
// adopts a simple implementation that always adds private data namespace for a collection without checking
// peer membership/ownership. As a result, it returns a superset of namespaces that may be created.
// However, it will not cause any inconsistent issue because the caller in statecouchdb will check if any
// existing database matches the namespace and filter out all extra namespaces if no databases match them.
// Checking peer membership is complicated because it requires retrieving all the collection configs from
// the collection config store. Because this is a temporary function needed to retroactively build namespaces
// when upgrading v2.0/2.1 peers to a newer v2.x version and because returning extra private data namespaces
// does not cause inconsistence, it makes sense to use the simple implementation.
func (p *namespaceProvider) PossibleNamespaces(vdb statedb.VersionedDB) ([]string, error) {
retNamespaces := []string{}
nsCollMap, err := p.NamespacesAndCollections(vdb)
if err != nil {
return nil, err
}
for ns, collections := range nsCollMap {
retNamespaces = append(retNamespaces, ns)
for _, collection := range collections {
retNamespaces = append(retNamespaces, deriveHashedDataNs(ns, collection))
retNamespaces = append(retNamespaces, derivePvtDataNs(ns, collection))
}
}
return retNamespaces, nil
}