forked from dddengyunjie/fabric
/
historyleveldb_query_executer.go
167 lines (140 loc) · 6.86 KB
/
historyleveldb_query_executer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package historyleveldb
import (
"bytes"
commonledger "github.com/hyperledger/fabric/common/ledger"
"github.com/hyperledger/fabric/common/ledger/blkstorage"
"github.com/hyperledger/fabric/common/ledger/util"
"github.com/hyperledger/fabric/core/ledger/kvledger/history/historydb"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/ledgerconfig"
"github.com/hyperledger/fabric/protos/common"
"github.com/hyperledger/fabric/protos/ledger/queryresult"
putils "github.com/hyperledger/fabric/protos/utils"
"github.com/pkg/errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
)
// LevelHistoryDBQueryExecutor is a query executor against the LevelDB history DB
type LevelHistoryDBQueryExecutor struct {
historyDB *historyDB
blockStore blkstorage.BlockStore
}
// GetHistoryForKey implements method in interface `ledger.HistoryQueryExecutor`
func (q *LevelHistoryDBQueryExecutor) GetHistoryForKey(namespace string, key string) (commonledger.ResultsIterator, error) {
if ledgerconfig.IsHistoryDBEnabled() == false {
return nil, errors.New("history database not enabled")
}
var compositeStartKey []byte
var compositeEndKey []byte
compositeStartKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, false)
compositeEndKey = historydb.ConstructPartialCompositeHistoryKey(namespace, key, true)
// range scan to find any history records starting with namespace~key
dbItr := q.historyDB.db.GetIterator(compositeStartKey, compositeEndKey)
return newHistoryScanner(compositeStartKey, namespace, key, dbItr, q.blockStore), nil
}
//historyScanner implements ResultsIterator for iterating through history results
type historyScanner struct {
compositePartialKey []byte //compositePartialKey includes namespace~key
namespace string
key string
dbItr iterator.Iterator
blockStore blkstorage.BlockStore
}
func newHistoryScanner(compositePartialKey []byte, namespace string, key string,
dbItr iterator.Iterator, blockStore blkstorage.BlockStore) *historyScanner {
return &historyScanner{compositePartialKey, namespace, key, dbItr, blockStore}
}
func (scanner *historyScanner) Next() (commonledger.QueryResult, error) {
for {
if !scanner.dbItr.Next() {
return nil, nil
}
historyKey := scanner.dbItr.Key() // history key is in the form namespace~key~blocknum~trannum
// SplitCompositeKey(namespace~key~blocknum~trannum, namespace~key~) will return the blocknum~trannum in second position
_, blockNumTranNumBytes := historydb.SplitCompositeHistoryKey(historyKey, scanner.compositePartialKey)
// check that blockNumTranNumBytes does not contain a nil byte (FAB-11244) - except the last byte.
// if this contains a nil byte that indicate that its a different key other than the one we are
// scanning the history for. However, the last byte can be nil even for the valid key (indicating the transaction numer being zero)
// This is because, if 'blockNumTranNumBytes' really is the suffix of the desired key - only possibility of this containing a nil byte
// is the last byte when the transaction number in blockNumTranNumBytes is zero).
// On the other hand, if 'blockNumTranNumBytes' really is NOT the suffix of the desired key, then this has to be a prefix
// of some other key (other than the desired key) and in this case, there has to be at least one nil byte (other than the last byte),
// for the 'last' CompositeKeySep in the composite key
// Take an example of two keys "key" and "key\x00" in a namespace ns. The entries for these keys will be
// of type "ns-\x00-key-\x00-blkNumTranNumBytes" and ns-\x00-key-\x00-\x00-blkNumTranNumBytes respectively.
// "-" in above examples are just for readability. Further, when scanning the range
// {ns-\x00-key-\x00 - ns-\x00-key-xff} for getting the history for <ns, key>, the entry for the other key
// falls in the range and needs to be ignored
if bytes.Contains(blockNumTranNumBytes[:len(blockNumTranNumBytes)-1], historydb.CompositeKeySep) {
logger.Debugf("Some other key [%#v] found in the range while scanning history for key [%#v]. Skipping...",
historyKey, scanner.key)
continue
}
blockNum, bytesConsumed := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[0:])
tranNum, _ := util.DecodeOrderPreservingVarUint64(blockNumTranNumBytes[bytesConsumed:])
logger.Debugf("Found history record for namespace:%s key:%s at blockNumTranNum %v:%v\n",
scanner.namespace, scanner.key, blockNum, tranNum)
// Get the transaction from block storage that is associated with this history record
tranEnvelope, err := scanner.blockStore.RetrieveTxByBlockNumTranNum(blockNum, tranNum)
if err != nil {
return nil, err
}
// Get the txid, key write value, timestamp, and delete indicator associated with this transaction
queryResult, err := getKeyModificationFromTran(tranEnvelope, scanner.namespace, scanner.key)
if err != nil {
return nil, err
}
logger.Debugf("Found historic key value for namespace:%s key:%s from transaction %s\n",
scanner.namespace, scanner.key, queryResult.(*queryresult.KeyModification).TxId)
return queryResult, nil
}
}
func (scanner *historyScanner) Close() {
scanner.dbItr.Release()
}
// getTxIDandKeyWriteValueFromTran inspects a transaction for writes to a given key
func getKeyModificationFromTran(tranEnvelope *common.Envelope, namespace string, key string) (commonledger.QueryResult, error) {
logger.Debugf("Entering getKeyModificationFromTran()\n", namespace, key)
// extract action from the envelope
payload, err := putils.GetPayload(tranEnvelope)
if err != nil {
return nil, err
}
tx, err := putils.GetTransaction(payload.Data)
if err != nil {
return nil, err
}
_, respPayload, err := putils.GetPayloads(tx.Actions[0])
if err != nil {
return nil, err
}
chdr, err := putils.UnmarshalChannelHeader(payload.Header.ChannelHeader)
if err != nil {
return nil, err
}
txID := chdr.TxId
timestamp := chdr.Timestamp
txRWSet := &rwsetutil.TxRwSet{}
// Get the Result from the Action and then Unmarshal
// it into a TxReadWriteSet using custom unmarshalling
if err = txRWSet.FromProtoBytes(respPayload.Results); err != nil {
return nil, err
}
// look for the namespace and key by looping through the transaction's ReadWriteSets
for _, nsRWSet := range txRWSet.NsRwSets {
if nsRWSet.NameSpace == namespace {
// got the correct namespace, now find the key write
for _, kvWrite := range nsRWSet.KvRwSet.Writes {
if kvWrite.Key == key {
return &queryresult.KeyModification{TxId: txID, Value: kvWrite.Value,
Timestamp: timestamp, IsDelete: kvWrite.IsDelete}, nil
}
} // end keys loop
return nil, errors.New("key not found in namespace's writeset")
} // end if
} //end namespaces loop
return nil, errors.New("namespace not found in transaction's ReadWriteSets")
}