forked from tw-bc-group/fabric
-
Notifications
You must be signed in to change notification settings - Fork 0
/
metadata_retrieval.go
109 lines (96 loc) · 3.31 KB
/
metadata_retrieval.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package statecouchdb
import (
"fmt"
"github.com/hyperledger/fabric/core/ledger/util/couchdb"
)
// nsMetadataRetriever implements `batch` interface and wraps the function `retrieveNsMetadata`
// for allowing parallel execution of this function for different namespaces
type nsMetadataRetriever struct {
ns string
db *couchdb.CouchDatabase
keys []string
executionResult []*couchdb.DocMetadata
}
// subNsMetadataRetriever implements `batch` interface and wraps the function
// `couchdb.BatchRetrieveDocumentMetadata` for allowing parallel execution of
// this function for different sets of keys within a namespace. Different sets
// of keys are expected to be created based on the batch update size configured
// for the database.
type subNsMetadataRetriever nsMetadataRetriever
// retrievedMetadata retrieves the metadata for a collection of `namespace-keys` combination
func (vdb *VersionedDB) retrieveMetadata(nsKeysMap map[string][]string) (map[string][]*couchdb.DocMetadata, error) {
// construct one batch per namespace
nsMetadataRetrievers := []batch{}
for ns, keys := range nsKeysMap {
db, err := vdb.getNamespaceDBHandle(ns)
if err != nil {
return nil, err
}
nsMetadataRetrievers = append(nsMetadataRetrievers, &nsMetadataRetriever{ns: ns, db: db, keys: keys})
}
if err := executeBatches(nsMetadataRetrievers); err != nil {
return nil, err
}
// accumulate results from each batch
executionResults := make(map[string][]*couchdb.DocMetadata)
for _, r := range nsMetadataRetrievers {
nsMetadataRetriever := r.(*nsMetadataRetriever)
executionResults[nsMetadataRetriever.ns] = nsMetadataRetriever.executionResult
}
return executionResults, nil
}
// retrieveNsMetadata retrieves metadata for a given namespace
func retrieveNsMetadata(db *couchdb.CouchDatabase, keys []string) ([]*couchdb.DocMetadata, error) {
// construct one batch per group of keys based on maxBatchSize
maxBatchSize := db.CouchInstance.MaxBatchUpdateSize()
batches := []batch{}
remainingKeys := keys
for {
numKeys := minimum(maxBatchSize, len(remainingKeys))
if numKeys == 0 {
break
}
batch := &subNsMetadataRetriever{db: db, keys: remainingKeys[:numKeys]}
batches = append(batches, batch)
remainingKeys = remainingKeys[numKeys:]
}
if err := executeBatches(batches); err != nil {
return nil, err
}
// accumulate results from each batch
var executionResults []*couchdb.DocMetadata
for _, b := range batches {
executionResults = append(executionResults, b.(*subNsMetadataRetriever).executionResult...)
}
return executionResults, nil
}
func (r *nsMetadataRetriever) execute() error {
var err error
if r.executionResult, err = retrieveNsMetadata(r.db, r.keys); err != nil {
return err
}
return nil
}
func (r *nsMetadataRetriever) String() string {
return fmt.Sprintf("nsMetadataRetriever:ns=%s, num keys=%d", r.ns, len(r.keys))
}
func (b *subNsMetadataRetriever) execute() error {
var err error
if b.executionResult, err = b.db.BatchRetrieveDocumentMetadata(b.keys); err != nil {
return err
}
return nil
}
func (b *subNsMetadataRetriever) String() string {
return fmt.Sprintf("subNsMetadataRetriever:ns=%s, num keys=%d", b.ns, len(b.keys))
}
func minimum(a, b int) int {
if a < b {
return a
}
return b
}