-
Notifications
You must be signed in to change notification settings - Fork 0
/
helper.go
132 lines (115 loc) · 4.11 KB
/
helper.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package pvtdatastorage
import (
"math"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/protos/ledger/rwset"
)
func prepareStoreEntries(blockNum uint64, pvtdata []*ledger.TxPvtData, btlPolicy pvtdatapolicy.BTLPolicy) ([]*dataEntry, []*expiryEntry, error) {
dataEntries := prepareDataEntries(blockNum, pvtdata)
expiryEntries, err := prepareExpiryEntries(blockNum, dataEntries, btlPolicy)
if err != nil {
return nil, nil, err
}
return dataEntries, expiryEntries, nil
}
func prepareDataEntries(blockNum uint64, pvtData []*ledger.TxPvtData) []*dataEntry {
var dataEntries []*dataEntry
for _, txPvtdata := range pvtData {
for _, nsPvtdata := range txPvtdata.WriteSet.NsPvtRwset {
for _, collPvtdata := range nsPvtdata.CollectionPvtRwset {
txnum := txPvtdata.SeqInBlock
ns := nsPvtdata.Namespace
coll := collPvtdata.CollectionName
dataKey := &dataKey{blockNum, txnum, ns, coll}
dataEntries = append(dataEntries, &dataEntry{key: dataKey, value: collPvtdata})
}
}
}
return dataEntries
}
func prepareExpiryEntries(committingBlk uint64, dataEntries []*dataEntry, btlPolicy pvtdatapolicy.BTLPolicy) ([]*expiryEntry, error) {
mapByExpiringBlk := make(map[uint64]*ExpiryData)
for _, dataEntry := range dataEntries {
expiringBlk, err := btlPolicy.GetExpiringBlock(dataEntry.key.ns, dataEntry.key.coll, dataEntry.key.blkNum)
if err != nil {
return nil, err
}
if neverExpires(expiringBlk) {
continue
}
expiryData, ok := mapByExpiringBlk[expiringBlk]
if !ok {
expiryData = newExpiryData()
mapByExpiringBlk[expiringBlk] = expiryData
}
expiryData.add(dataEntry.key.ns, dataEntry.key.coll, dataEntry.key.txNum)
}
var expiryEntries []*expiryEntry
for expiryBlk, expiryData := range mapByExpiringBlk {
expiryKey := &expiryKey{expiringBlk: expiryBlk, committingBlk: committingBlk}
expiryEntries = append(expiryEntries, &expiryEntry{key: expiryKey, value: expiryData})
}
return expiryEntries, nil
}
func deriveDataKeys(expiryEntry *expiryEntry) []*dataKey {
var dataKeys []*dataKey
for ns, colls := range expiryEntry.value.Map {
for coll, txNums := range colls.Map {
for _, txNum := range txNums.List {
dataKeys = append(dataKeys, &dataKey{expiryEntry.key.committingBlk, txNum, ns, coll})
}
}
}
return dataKeys
}
func passesFilter(dataKey *dataKey, filter ledger.PvtNsCollFilter) bool {
return filter == nil || filter.Has(dataKey.ns, dataKey.coll)
}
func isExpired(dataKey *dataKey, btl pvtdatapolicy.BTLPolicy, latestBlkNum uint64) (bool, error) {
expiringBlk, err := btl.GetExpiringBlock(dataKey.ns, dataKey.coll, dataKey.blkNum)
if err != nil {
return false, err
}
return latestBlkNum >= expiringBlk, nil
}
func neverExpires(expiringBlkNum uint64) bool {
return expiringBlkNum == math.MaxUint64
}
type txPvtdataAssembler struct {
blockNum, txNum uint64
txWset *rwset.TxPvtReadWriteSet
currentNsWSet *rwset.NsPvtReadWriteSet
firstCall bool
}
func newTxPvtdataAssembler(blockNum, txNum uint64) *txPvtdataAssembler {
return &txPvtdataAssembler{blockNum, txNum, &rwset.TxPvtReadWriteSet{}, nil, true}
}
func (a *txPvtdataAssembler) add(ns string, collPvtWset *rwset.CollectionPvtReadWriteSet) {
// start a NsWset
if a.firstCall {
a.currentNsWSet = &rwset.NsPvtReadWriteSet{Namespace: ns}
a.firstCall = false
}
// if a new ns started, add the existing NsWset to TxWset and start a new one
if a.currentNsWSet.Namespace != ns {
a.txWset.NsPvtRwset = append(a.txWset.NsPvtRwset, a.currentNsWSet)
a.currentNsWSet = &rwset.NsPvtReadWriteSet{Namespace: ns}
}
// add the collWset to the current NsWset
a.currentNsWSet.CollectionPvtRwset = append(a.currentNsWSet.CollectionPvtRwset, collPvtWset)
}
func (a *txPvtdataAssembler) done() {
if a.currentNsWSet != nil {
a.txWset.NsPvtRwset = append(a.txWset.NsPvtRwset, a.currentNsWSet)
}
a.currentNsWSet = nil
}
func (a *txPvtdataAssembler) getTxPvtdata() *ledger.TxPvtData {
a.done()
return &ledger.TxPvtData{SeqInBlock: a.txNum, WriteSet: a.txWset}
}