-
Notifications
You must be signed in to change notification settings - Fork 178
/
cluster.go
225 lines (185 loc) · 6.92 KB
/
cluster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package procedure
import (
"fmt"
"github.com/dgraph-io/badger/v2"
"github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/storage/badger/operation"
)
// This file implements storage functions for blocks in cluster consensus.
// InsertClusterBlock inserts a cluster consensus block, updating all
// associated indexes.
func InsertClusterBlock(block *cluster.Block) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
// check payload integrity
if block.Header.PayloadHash != block.Payload.Hash() {
return fmt.Errorf("computed payload hash does not match header")
}
// store the block header
blockID := block.ID()
err := operation.InsertHeader(blockID, block.Header)(tx)
if err != nil {
return fmt.Errorf("could not insert header: %w", err)
}
// insert the block payload
err = InsertClusterPayload(blockID, block.Payload)(tx)
if err != nil {
return fmt.Errorf("could not insert payload: %w", err)
}
// index the child block for recovery
err = IndexNewBlock(blockID, block.Header.ParentID)(tx)
if err != nil {
return fmt.Errorf("could not index new block: %w", err)
}
return nil
}
}
// RetrieveClusterBlock retrieves a cluster consensus block by block ID.
func RetrieveClusterBlock(blockID flow.Identifier, block *cluster.Block) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
// retrieve the block header
var header flow.Header
err := operation.RetrieveHeader(blockID, &header)(tx)
if err != nil {
return fmt.Errorf("could not retrieve header: %w", err)
}
// retrieve payload
var payload cluster.Payload
err = RetrieveClusterPayload(blockID, &payload)(tx)
if err != nil {
return fmt.Errorf("could not retrieve payload: %w", err)
}
// overwrite block
*block = cluster.Block{
Header: &header,
Payload: &payload,
}
return nil
}
}
// RetrieveLatestFinalizedClusterHeader retrieves the latest finalized for the
// given cluster chain ID.
func RetrieveLatestFinalizedClusterHeader(chainID flow.ChainID, final *flow.Header) func(tx *badger.Txn) error {
return func(tx *badger.Txn) error {
var boundary uint64
err := operation.RetrieveClusterFinalizedHeight(chainID, &boundary)(tx)
if err != nil {
return fmt.Errorf("could not retrieve boundary: %w", err)
}
var finalID flow.Identifier
err = operation.LookupClusterBlockHeight(chainID, boundary, &finalID)(tx)
if err != nil {
return fmt.Errorf("could not retrieve final ID: %w", err)
}
err = operation.RetrieveHeader(finalID, final)(tx)
if err != nil {
return fmt.Errorf("could not retrieve finalized header: %w", err)
}
return nil
}
}
// FinalizeClusterBlock finalizes a block in cluster consensus.
func FinalizeClusterBlock(blockID flow.Identifier) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
// retrieve the header to check the parent
var header flow.Header
err := operation.RetrieveHeader(blockID, &header)(tx)
if err != nil {
return fmt.Errorf("could not retrieve header: %w", err)
}
// get the chain ID, which determines which cluster state to query
chainID := header.ChainID
// retrieve the current finalized state boundary
var boundary uint64
err = operation.RetrieveClusterFinalizedHeight(chainID, &boundary)(tx)
if err != nil {
return fmt.Errorf("could not retrieve boundary: %w", err)
}
// retrieve the ID of the boundary head
var headID flow.Identifier
err = operation.LookupClusterBlockHeight(chainID, boundary, &headID)(tx)
if err != nil {
return fmt.Errorf("could not retrieve head: %w", err)
}
// check that the head ID is the parent of the block we finalize
if header.ParentID != headID {
return fmt.Errorf("can't finalize non-child of chain head")
}
// insert block view -> ID mapping
err = operation.IndexClusterBlockHeight(chainID, header.Height, header.ID())(tx)
if err != nil {
return fmt.Errorf("could not insert view->ID mapping: %w", err)
}
// update the finalized boundary
err = operation.UpdateClusterFinalizedHeight(chainID, header.Height)(tx)
if err != nil {
return fmt.Errorf("could not update finalized boundary: %w", err)
}
// NOTE: we don't want to prune forks that have become invalid here, so
// that we can keep validating entities and generating slashing
// challenges for some time - the pruning should happen some place else
// after a certain delay of blocks
return nil
}
}
// InsertClusterPayload inserts the payload for a cluster block. It inserts
// both the collection and all constituent transactions, allowing duplicates.
func InsertClusterPayload(blockID flow.Identifier, payload *cluster.Payload) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
// cluster payloads only contain a single collection, allow duplicates,
// because it is valid for two competing forks to have the same payload.
light := payload.Collection.Light()
err := operation.SkipDuplicates(operation.InsertCollection(&light))(tx)
if err != nil {
return fmt.Errorf("could not insert payload collection: %w", err)
}
// insert constituent transactions
for _, colTx := range payload.Collection.Transactions {
err = operation.SkipDuplicates(operation.InsertTransaction(colTx.ID(), colTx))(tx)
if err != nil {
return fmt.Errorf("could not insert payload transaction: %w", err)
}
}
// index the transaction IDs within the collection
txIDs := payload.Collection.Light().Transactions
err = operation.SkipDuplicates(operation.IndexCollectionPayload(blockID, txIDs))(tx)
if err != nil {
return fmt.Errorf("could not index collection: %w", err)
}
// insert the reference block ID
err = operation.IndexReferenceBlockByClusterBlock(blockID, payload.ReferenceBlockID)(tx)
if err != nil {
return fmt.Errorf("could not insert reference block ID: %w", err)
}
return nil
}
}
// RetrieveClusterPayload retrieves a cluster consensus block payload by block ID.
func RetrieveClusterPayload(blockID flow.Identifier, payload *cluster.Payload) func(*badger.Txn) error {
return func(tx *badger.Txn) error {
// lookup the reference block ID
var refID flow.Identifier
err := operation.LookupReferenceBlockByClusterBlock(blockID, &refID)(tx)
if err != nil {
return fmt.Errorf("could not retrieve reference block ID: %w", err)
}
// lookup collection transaction IDs
var txIDs []flow.Identifier
err = operation.LookupCollectionPayload(blockID, &txIDs)(tx)
if err != nil {
return fmt.Errorf("could not look up collection payload: %w", err)
}
colTransactions := make([]*flow.TransactionBody, 0, len(txIDs))
// retrieve individual transactions
for _, txID := range txIDs {
var nextTx flow.TransactionBody
err = operation.RetrieveTransaction(txID, &nextTx)(tx)
if err != nil {
return fmt.Errorf("could not retrieve transaction: %w", err)
}
colTransactions = append(colTransactions, &nextTx)
}
*payload = cluster.PayloadFromTransactions(refID, colTransactions...)
return nil
}
}