-
Notifications
You must be signed in to change notification settings - Fork 179
/
core.go
406 lines (360 loc) · 15.8 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
package matching
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"time"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/flow/filter"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/mempool"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/module/trace"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
"github.com/onflow/flow-go/utils/logging"
)
// Config is a structure of values that configure behavior of matching engine
type Config struct {
SealingThreshold uint // threshold between sealed and finalized blocks
MaxResultsToRequest uint // maximum number of receipts to request
}
func DefaultConfig() Config {
return Config{
SealingThreshold: 10,
MaxResultsToRequest: 20,
}
}
// Core represents the matching business logic, used to process receipts received from
// p2p network. Performs processing of pending receipts, storing of receipts and re-requesting
// missing execution receipts.
type Core struct {
log zerolog.Logger // used to log relevant actions with context
tracer module.Tracer // used to trace execution
metrics module.ConsensusMetrics // used to track consensus metrics
mempool module.MempoolMetrics // used to track mempool size
state protocol.State // used to access the protocol state
headersDB storage.Headers // used to check sealed headers
receiptsDB storage.ExecutionReceipts // to persist received execution receipts
receipts mempool.ExecutionTree // holds execution receipts; indexes them by height; can search all receipts derived from a given parent result
pendingReceipts mempool.PendingReceipts // buffer for receipts where an ancestor result is missing, so they can't be connected to the sealed results
seals mempool.IncorporatedResultSeals // holds candidate seals for incorporated results that have acquired sufficient approvals; candidate seals are constructed without consideration of the sealability of parent results
receiptValidator module.ReceiptValidator // used to validate receipts
receiptRequester module.Requester // used to request missing execution receipts by block ID
config Config // config for matching core
}
func NewCore(
log zerolog.Logger,
tracer module.Tracer,
metrics module.ConsensusMetrics,
mempool module.MempoolMetrics,
state protocol.State,
headersDB storage.Headers,
receiptsDB storage.ExecutionReceipts,
receipts mempool.ExecutionTree,
pendingReceipts mempool.PendingReceipts,
seals mempool.IncorporatedResultSeals,
receiptValidator module.ReceiptValidator,
receiptRequester module.Requester,
config Config,
) *Core {
return &Core{
log: log.With().Str("engine", "matching.Core").Logger(),
tracer: tracer,
metrics: metrics,
mempool: mempool,
state: state,
headersDB: headersDB,
receiptsDB: receiptsDB,
receipts: receipts,
pendingReceipts: pendingReceipts,
seals: seals,
receiptValidator: receiptValidator,
receiptRequester: receiptRequester,
config: config,
}
}
// ProcessReceipt processes a new execution receipt.
// Any error indicates an unexpected problem in the protocol logic. The node's
// internal state might be corrupted. Hence, returned errors should be treated as fatal.
func (c *Core) ProcessReceipt(receipt *flow.ExecutionReceipt) error {
// When receiving a receipt, we might not be able to verify it if its previous result
// is unknown. In this case, instead of dropping it, we store it in the pending receipts
// mempool, and process it later when its parent result has been received and processed.
// Therefore, if a receipt is processed, we will check if it is the previous results of
// some pending receipts and process them one after another.
receiptID := receipt.ID()
resultID := receipt.ExecutionResult.ID()
processed, err := c.processReceipt(receipt)
if err != nil {
marshalled, encErr := json.Marshal(receipt)
if encErr != nil {
marshalled = []byte("json_marshalling_failed")
}
c.log.Error().Err(err).
Hex("origin", logging.ID(receipt.ExecutorID)).
Hex("receipt_id", receiptID[:]).
Hex("result_id", resultID[:]).
Str("receipt", string(marshalled)).
Msg("internal error processing execution receipt")
return fmt.Errorf("internal error processing execution receipt %x: %w", receipt.ID(), err)
}
if !processed {
return nil
}
childReceipts := c.pendingReceipts.ByPreviousResultID(resultID)
c.pendingReceipts.Remove(receipt.ID())
for _, childReceipt := range childReceipts {
// recursively processing the child receipts
err := c.ProcessReceipt(childReceipt)
if err != nil {
// we don't want to wrap the error with any info from its parent receipt,
// because the error has nothing to do with its parent receipt.
return err
}
}
return nil
}
// processReceipt checks validity of the given receipt and adds it to the node's validated information.
// Returns:
// - bool: true iff receipt is new (previously unknown), and its validity can be confirmed
// - error: any error indicates an unexpected problem in the protocol logic. The node's
// internal state might be corrupted. Hence, returned errors should be treated as fatal.
func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {
// setup logger to capture basic information about the receipt
log := c.log.With().
Hex("receipt_id", logging.Entity(receipt)).
Hex("result_id", logging.Entity(receipt.ExecutionResult)).
Hex("execution_data_id", receipt.ExecutionResult.ExecutionDataID[:]).
Hex("previous_result", receipt.ExecutionResult.PreviousResultID[:]).
Hex("block_id", receipt.ExecutionResult.BlockID[:]).
Hex("executor_id", receipt.ExecutorID[:]).
Logger()
if c.receipts.HasReceipt(receipt) {
log.Debug().Msg("skipping processing of already known receipt")
return false, nil
}
startTime := time.Now()
defer func() {
c.metrics.OnReceiptProcessingDuration(time.Since(startTime))
}()
receiptSpan, _ := c.tracer.StartBlockSpan(context.Background(), receipt.ExecutionResult.BlockID, trace.CONMatchProcessReceipt)
receiptSpan.SetAttributes(
attribute.String("result_id", receipt.ExecutionResult.ID().String()),
attribute.String("executor", receipt.ExecutorID.String()),
)
defer receiptSpan.End()
initialState, finalState, err := getStartAndEndStates(receipt)
if err != nil {
if errors.Is(err, flow.ErrNoChunks) {
log.Error().Err(err).Msg("discarding malformed receipt")
return false, nil
}
return false, fmt.Errorf("internal problem retrieving start- and end-state commitment from receipt: %w", err)
}
log = log.With().
Hex("initial_state", initialState[:]).
Hex("final_state", finalState[:]).Logger()
// if the receipt is for an unknown block, skip it. It will be re-requested
// later by `requestPending` function.
executedBlock, err := c.headersDB.ByBlockID(receipt.ExecutionResult.BlockID)
if err != nil {
log.Debug().Msg("discarding receipt for unknown block")
return false, nil
}
log = log.With().
Uint64("block_view", executedBlock.View).
Uint64("block_height", executedBlock.Height).
Logger()
log.Info().Msg("execution receipt received")
// if Execution Receipt is for block whose height is lower or equal to already sealed height
// => drop Receipt
sealed, err := c.state.Sealed().Head()
if err != nil {
return false, fmt.Errorf("could not find sealed block: %w", err)
}
if executedBlock.Height <= sealed.Height {
log.Debug().Msg("discarding receipt for already sealed and finalized block height")
return false, nil
}
childSpan := c.tracer.StartSpanFromParent(receiptSpan, trace.CONMatchProcessReceiptVal)
err = c.receiptValidator.Validate(receipt)
childSpan.End()
if engine.IsUnverifiableInputError(err) {
// If previous result is missing, we can't validate this receipt.
// Although we will request its previous receipt(s),
// we don't want to drop it now, because when the missing previous arrive
// in a wrong order, they will still be dropped, and causing the catch up
// to be inefficient.
// Instead, we cache the receipt in case it arrives earlier than its
// previous receipt.
// For instance, given blocks A <- B <- C <- D <- E, if we receive their receipts
// in the order of [E,C,D,B,A], then:
// if we drop the missing previous receipts, then only A will be processed;
// if we cache the missing previous receipts, then all of them will be processed, because
// once A is processed, we will check if there is a child receipt pending,
// if yes, then process it.
c.pendingReceipts.Add(receipt)
log.Info().Msg("receipt is cached because its previous result is missing")
return false, nil
}
if err != nil {
if engine.IsInvalidInputError(err) {
log.Err(err).Msg("invalid execution receipt")
return false, nil
}
return false, fmt.Errorf("failed to validate execution receipt: %w", err)
}
added, err := c.storeReceipt(receipt, executedBlock)
if err != nil {
return false, fmt.Errorf("failed to store receipt: %w", err)
}
if added {
log.Info().Msg("execution result processed and stored")
}
return added, nil
}
// storeReceipt adds the receipt to the receipts mempool as well as to the persistent storage layer.
// Return values:
// - bool to indicate whether the receipt is stored.
// - exception in case something (unexpected) went wrong
func (c *Core) storeReceipt(receipt *flow.ExecutionReceipt, head *flow.Header) (bool, error) {
added, err := c.receipts.AddReceipt(receipt, head)
if err != nil {
return false, fmt.Errorf("adding receipt (%x) to mempool failed: %w", receipt.ID(), err)
}
if !added {
return false, nil
}
// TODO: we'd better wrap the `receipts` with the metrics method to avoid the metrics
// getting out of sync
c.mempool.MempoolEntries(metrics.ResourceReceipt, c.receipts.Size())
// persist receipt in database. Even if the receipt is already in persistent storage,
// we still need to process it, as it is not in the mempool. This can happen if the
// mempool was wiped during a node crash.
err = c.receiptsDB.Store(receipt) // internally de-duplicates
if err != nil && !errors.Is(err, storage.ErrAlreadyExists) {
return false, fmt.Errorf("could not persist receipt: %w", err)
}
return true, nil
}
// requestPendingReceipts requests the execution receipts of unsealed finalized
// blocks.
// it returns the number of pending receipts requests being created, and
// the first finalized height at which there is no receipt for the block
func (c *Core) requestPendingReceipts() (int, uint64, error) {
finalSnapshot := c.state.Final()
final, err := finalSnapshot.Head() // last finalized block
if err != nil {
return 0, 0, fmt.Errorf("could not get finalized height: %w", err)
}
_, seal, err := finalSnapshot.SealedResult() // last finalized seal
if err != nil {
return 0, 0, fmt.Errorf("could not retrieve latest finalized seal: %w", err)
}
sealed, err := c.headersDB.ByBlockID(seal.BlockID) // last sealed block
if err != nil {
return 0, 0, fmt.Errorf("could not get sealed height: %w", err)
}
// only request if number of unsealed finalized blocks exceeds the threshold
if uint(final.Height-sealed.Height) < c.config.SealingThreshold {
return 0, 0, nil
}
// order the missing blocks by height from low to high such that when
// passing them to the missing block requester, they can be requested in the
// right order. The right order gives the priority to the execution result
// of lower height blocks to be requested first, since a gap in the sealing
// heights would stop the sealing.
missingBlocksOrderedByHeight := make([]flow.Identifier, 0, c.config.MaxResultsToRequest)
var firstMissingHeight uint64 = math.MaxUint64
// traverse each unsealed and finalized block with height from low to high,
// if the result is missing, then add the blockID to a missing block list in
// order to request them.
HEIGHT_LOOP:
for height := sealed.Height + 1; height <= final.Height; height++ {
// add at most <maxUnsealedResults> number of results
if len(missingBlocksOrderedByHeight) >= int(c.config.MaxResultsToRequest) {
break
}
// get the block header at this height (should not error as heights are finalized)
header, err := c.headersDB.ByHeight(height)
if err != nil {
return 0, 0, fmt.Errorf("could not get header (height=%d): %w", height, err)
}
blockID := header.ID()
receipts, err := c.receiptsDB.ByBlockID(blockID)
if err != nil && !errors.Is(err, storage.ErrNotFound) {
return 0, 0, fmt.Errorf("could not get receipts by block ID: %v, %w", blockID, err)
}
// We require at least 2 consistent receipts from different ENs to seal a block. If don't need to fetching receipts.
// CAUTION: This is a temporary shortcut incompatible with the mature BFT protocol!
// There might be multiple consistent receipts that commit to a wrong result. To guarantee
// sealing liveness, we need to fetch receipts from those ENs, whose receipts we don't have yet.
// TODO: update for full BFT
for _, receiptsForResult := range receipts.GroupByResultID() {
if receiptsForResult.GroupByExecutorID().NumberGroups() >= 2 {
continue HEIGHT_LOOP
}
}
missingBlocksOrderedByHeight = append(missingBlocksOrderedByHeight, blockID)
if height < firstMissingHeight {
firstMissingHeight = height
}
}
// request missing execution results, if sealed height is low enough
for _, blockID := range missingBlocksOrderedByHeight {
c.receiptRequester.Query(blockID, filter.Any)
}
return len(missingBlocksOrderedByHeight), firstMissingHeight, nil
}
func (c *Core) OnBlockFinalization() error {
startTime := time.Now()
// request execution receipts for unsealed finalized blocks
pendingReceiptRequests, firstMissingHeight, err := c.requestPendingReceipts()
if err != nil {
return fmt.Errorf("could not request pending block results: %w", err)
}
// Prune Execution Tree
lastSealed, err := c.state.Sealed().Head()
if err != nil {
return fmt.Errorf("could not retrieve last sealed block : %w", err)
}
err = c.receipts.PruneUpToHeight(lastSealed.Height)
if err != nil {
return fmt.Errorf("failed to prune execution tree up to latest sealed and finalized block %v, height: %v: %w",
lastSealed.ID(), lastSealed.Height, err)
}
err = c.pendingReceipts.PruneUpToHeight(lastSealed.Height)
if err != nil {
return fmt.Errorf("failed to prune pending receipts mempool up to latest sealed and finalized block %v, height: %v: %w",
lastSealed.ID(), lastSealed.Height, err)
}
c.log.Info().
Uint64("first_height_missing_result", firstMissingHeight).
Uint("seals_size", c.seals.Size()).
Uint("receipts_size", c.receipts.Size()).
Int("pending_receipt_requests", pendingReceiptRequests).
Int64("duration_ms", time.Since(startTime).Milliseconds()).
Msg("finalized block processed successfully")
return nil
}
// getStartAndEndStates returns the pair: (start state commitment; final state commitment)
// Error returns:
// - ErrNoChunks: if there are no chunks, i.e. the ExecutionResult is malformed
// - all other errors are unexpected and symptoms of node-internal problems
func getStartAndEndStates(receipt *flow.ExecutionReceipt) (initialState flow.StateCommitment, finalState flow.StateCommitment, err error) {
initialState, err = receipt.ExecutionResult.InitialStateCommit()
if err != nil {
return initialState, finalState, fmt.Errorf("could not get commitment for initial state from receipt: %w", err)
}
finalState, err = receipt.ExecutionResult.FinalStateCommitment()
if err != nil {
return initialState, finalState, fmt.Errorf("could not get commitment for final state from receipt: %w", err)
}
return initialState, finalState, nil
}