-
Notifications
You must be signed in to change notification settings - Fork 178
/
execution_data_requester.go
478 lines (393 loc) · 19.2 KB
/
execution_data_requester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
package requester
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/rs/zerolog"
"github.com/sethvargo/go-retry"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/module/state_synchronization/requester/jobs"
"github.com/onflow/flow-go/module/util"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// The ExecutionDataRequester downloads ExecutionData for sealed blocks from other participants in
// the flow network. The ExecutionData for a sealed block should always downloadable, since a
// sealed block must have been executed.
//
// Once the ExecutionData for a block is downloaded, the node becomes a seeder for other participants
// on the network using the bitswap protocol. The downloading and seeding work is handled by the
// ExecutionDataService.
//
// The ExecutionDataRequester internally uses a job queue to request and download each sealed block
// with multiple workers. It downloads ExecutionData block by block towards the latest sealed block.
// In order to ensure it does not miss any sealed block to download, it persists the last downloaded
// height, and only increments it when the next height has been downloaded. In the event of a crash
// failure, it will read the last downloaded height, and process from the next un-downloaded height.
// The requester listens to block finalization event, and checks if sealed height has been changed,
// if changed, it create job for each un-downloaded and sealed height.
//
// The requester is made up of 3 subcomponents:
//
// * OnBlockFinalized: receives block finalized events from the finalization distributor and
// forwards them to the blockConsumer.
//
// * blockConsumer: is a jobqueue that receives block finalization events. On each event,
// it checks for the latest sealed block, then uses a pool of workers to
// download ExecutionData for each block from the network. After each
// successful download, the blockConsumer sends a notification to the
// notificationConsumer that a new ExecutionData is available.
//
// * notificationConsumer: is a jobqueue that receives ExecutionData fetched events. On each event,
// it checks if ExecutionData for the next consecutive block height is
// available, then uses a single worker to send notifications to registered
// consumers.
// the registered consumers are guaranteed to receive each sealed block in
// consecutive height at least once.
//
// +------------------+ +---------------+ +----------------------+
// -->| OnBlockFinalized |----->| blockConsumer | +-->| notificationConsumer |
// +------------------+ +-------+-------+ | +-----------+----------+
// | | |
// +------+------+ | +------+------+
// xN | Worker Pool |----+ x1 | Worker Pool |----> Registered consumers
// +-------------+ +-------------+
const (
// DefaultFetchTimeout is the default initial timeout for fetching ExecutionData from the
// db/network. The timeout is increased using an incremental backoff until FetchTimeout.
DefaultFetchTimeout = 10 * time.Second
// DefaultMaxFetchTimeout is the default timeout for fetching ExecutionData from the db/network
DefaultMaxFetchTimeout = 10 * time.Minute
// DefaultRetryDelay is the default initial delay used in the exponential backoff for failed
// ExecutionData download retries
DefaultRetryDelay = 1 * time.Second
// DefaultMaxRetryDelay is the default maximum delay used in the exponential backoff for failed
// ExecutionData download retries
DefaultMaxRetryDelay = 5 * time.Minute
// DefaultMaxSearchAhead is the default max number of unsent notifications to allow before
// pausing new fetches.
DefaultMaxSearchAhead = 5000
// Number of goroutines to use for downloading new ExecutionData from the network.
fetchWorkers = 4
)
// ExecutionDataConfig contains configuration options for the ExecutionDataRequester
type ExecutionDataConfig struct {
// The initial value to use as the last processed block height. This should be the
// first block height to sync - 1
InitialBlockHeight uint64
// Max number of unsent notifications to allow before pausing new fetches. After exceeding this
// limit, the requester will stop processing new finalized block notifications. This prevents
// unbounded memory use by the requester if it gets stuck fetching a specific height.
MaxSearchAhead uint64
// The initial timeout for fetching ExecutionData from the db/network
FetchTimeout time.Duration
// The max timeout for fetching ExecutionData from the db/network
MaxFetchTimeout time.Duration
// Exponential backoff settings for download retries
RetryDelay time.Duration
MaxRetryDelay time.Duration
}
type executionDataRequester struct {
component.Component
cm *component.ComponentManager
eds state_synchronization.ExecutionDataService
metrics module.ExecutionDataRequesterMetrics
config ExecutionDataConfig
log zerolog.Logger
// Local db objects
headers storage.Headers
results storage.ExecutionResults
seals storage.Seals
executionDataReader *jobs.ExecutionDataReader
// Notifiers for queue consumers
finalizationNotifier engine.Notifier
// Job queues
blockConsumer *jobqueue.ComponentConsumer
notificationConsumer *jobqueue.ComponentConsumer
// List of callbacks to call when ExecutionData is successfully fetched for a block
consumers []state_synchronization.ExecutionDataReceivedCallback
consumerMu sync.RWMutex
}
var _ state_synchronization.ExecutionDataRequester = (*executionDataRequester)(nil)
// New creates a new execution data requester component
func New(
log zerolog.Logger,
edrMetrics module.ExecutionDataRequesterMetrics,
eds state_synchronization.ExecutionDataService,
processedHeight storage.ConsumerProgress,
processedNotifications storage.ConsumerProgress,
state protocol.State,
headers storage.Headers,
results storage.ExecutionResults,
seals storage.Seals,
cfg ExecutionDataConfig,
) state_synchronization.ExecutionDataRequester {
e := &executionDataRequester{
log: log.With().Str("component", "execution_data_requester").Logger(),
eds: eds,
metrics: edrMetrics,
headers: headers,
results: results,
seals: seals,
config: cfg,
finalizationNotifier: engine.NewNotifier(),
}
executionDataNotifier := engine.NewNotifier()
// jobqueue Jobs object that tracks sealed blocks by height. This is used by the blockConsumer
// to get a sequential list of sealed blocks.
sealedBlockReader := jobqueue.NewSealedBlockHeaderReader(state, headers)
// blockConsumer ensures every sealed block's execution data is downloaded.
// It listens to block finalization events from `finalizationNotifier`, then checks if there
// are new sealed blocks with `sealedBlockReader`. If there are, it starts workers to process
// them with `processingBlockJob`, which fetches execution data. At most `fetchWorkers` workers
// will be created for concurrent processing. When a sealed block's execution data has been
// downloaded, it updates and persists the highest consecutive downloaded height with
// `processedHeight`. That way, if the node crashes, it reads the `processedHeight` and resume
// from `processedHeight + 1`. If the database is empty, rootHeight will be used to init the
// last processed height. Once the execution data is fetched and stored, it notifies
// `executionDataNotifier`.
e.blockConsumer = jobqueue.NewComponentConsumer(
e.log.With().Str("module", "block_consumer").Logger(),
e.finalizationNotifier.Channel(), // to listen to finalization events to find newly sealed blocks
processedHeight, // read and persist the downloaded height
sealedBlockReader, // read sealed blocks by height
e.config.InitialBlockHeight, // initial "last processed" height for empty db
e.processBlockJob, // process the sealed block job to download its execution data
fetchWorkers, // the number of concurrent workers
e.config.MaxSearchAhead, // max number of unsent notifications to allow before pausing new fetches
)
// notifies notificationConsumer when new ExecutionData blobs are available
// SetPostNotifier will notify executionDataNotifier AFTER e.blockConsumer.LastProcessedIndex is updated.
// Even though it doesn't guarantee to notify for every height at least once, the notificationConsumer is
// able to guarantee to process every height at least once, because the notificationConsumer finds new job
// using executionDataReader which finds new height using e.blockConsumer.LastProcessedIndex
e.blockConsumer.SetPostNotifier(func(module.JobID) { executionDataNotifier.Notify() })
// jobqueue Jobs object tracks downloaded execution data by height. This is used by the
// notificationConsumer to get downloaded execution data from storage.
e.executionDataReader = jobs.NewExecutionDataReader(
e.eds,
e.headers,
e.results,
e.seals,
e.config.FetchTimeout,
// method to get highest consecutive height that has downloaded execution data. it is used
// here by the notification job consumer to discover new jobs.
// Note: we don't want to notify notificationConsumer for a block if it has not downloaded
// execution data yet.
e.blockConsumer.LastProcessedIndex,
)
// notificationConsumer consumes `OnExecutionDataFetched` events, and ensures its consumer
// receives this event in consecutive block height order.
// It listens to events from `executionDataNotifier`, which is delivered when
// a block's execution data is downloaded and stored, and checks the `executionDataCache` to
// find if the next un-processed consecutive height is available.
// To know what's the height of the next un-processed consecutive height, it reads the latest
// consecutive height in `processedNotifications`. And it's persisted in storage to be crash-resistant.
// When a new consecutive height is available, it calls `processNotificationJob` to notify all the
// `e.consumers`.
// Note: the `e.consumers` will be guaranteed to receive at least one `OnExecutionDataFetched` event
// for each sealed block in consecutive block height order.
e.notificationConsumer = jobqueue.NewComponentConsumer(
e.log.With().Str("module", "notification_consumer").Logger(),
executionDataNotifier.Channel(), // listen for notifications from the block consumer
processedNotifications, // read and persist the notified height
e.executionDataReader, // read execution data by height
e.config.InitialBlockHeight, // initial "last processed" height for empty db
e.processNotificationJob, // process the job to send notifications for an execution data
1, // use a single worker to ensure notification is delivered in consecutive order
0, // search ahead limit controlled by worker count
)
builder := component.NewComponentManagerBuilder().
AddWorker(e.runBlockConsumer).
AddWorker(e.runNotificationConsumer)
e.cm = builder.Build()
e.Component = e.cm
return e
}
// OnBlockFinalized accepts block finalization notifications from the FinalizationDistributor
func (e *executionDataRequester) OnBlockFinalized(*model.Block) {
e.finalizationNotifier.Notify()
}
// AddOnExecutionDataFetchedConsumer adds a callback to be called when a new ExecutionData is received
// Callback Implementations must:
// * be concurrency safe
// * be non-blocking
// * handle repetition of the same events (with some processing overhead).
func (e *executionDataRequester) AddOnExecutionDataFetchedConsumer(fn state_synchronization.ExecutionDataReceivedCallback) {
e.consumerMu.Lock()
defer e.consumerMu.Unlock()
e.consumers = append(e.consumers, fn)
}
// runBlockConsumer runs the blockConsumer component
func (e *executionDataRequester) runBlockConsumer(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
err := util.WaitClosed(ctx, e.eds.Ready())
if err != nil {
return // context cancelled
}
err = util.WaitClosed(ctx, e.notificationConsumer.Ready())
if err != nil {
return // context cancelled
}
e.blockConsumer.Start(ctx)
err = util.WaitClosed(ctx, e.blockConsumer.Ready())
if err == nil {
ready()
}
<-e.blockConsumer.Done()
}
// runNotificationConsumer runs the notificationConsumer component
func (e *executionDataRequester) runNotificationConsumer(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
e.executionDataReader.AddContext(ctx)
e.notificationConsumer.Start(ctx)
err := util.WaitClosed(ctx, e.notificationConsumer.Ready())
if err == nil {
ready()
}
<-e.notificationConsumer.Done()
}
// Fetch Worker Methods
// processBlockJob consumes jobs from the blockConsumer and attempts to download an ExecutionData
// for the given block height.
func (e *executionDataRequester) processBlockJob(ctx irrecoverable.SignalerContext, job module.Job, jobComplete func()) {
// convert job into a block entry
header, err := jobqueue.JobToBlockHeader(job)
if err != nil {
ctx.Throw(fmt.Errorf("failed to convert job to block: %w", err))
}
err = e.processSealedHeight(ctx, header.ID(), header.Height)
if err == nil {
jobComplete()
return
}
// errors are thrown as irrecoverable errors except context cancellation, and invalid blobs
// invalid blobs are logged, and never completed, which will halt downloads after maxSearchAhead
// is reached.
e.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error encountered while processing block job")
}
// processSealedHeight downloads ExecutionData for the given block height.
// If the download fails, it will retry forever, using exponential backoff.
func (e *executionDataRequester) processSealedHeight(ctx irrecoverable.SignalerContext, blockID flow.Identifier, height uint64) error {
backoff := retry.NewExponential(e.config.RetryDelay)
backoff = retry.WithCappedDuration(e.config.MaxRetryDelay, backoff)
backoff = retry.WithJitterPercent(15, backoff)
// bitswap always waits for either all data to be received or a timeout, even if it encountered an error.
// use an incremental backoff for the timeout so we do faster initial retries, then allow for more
// time in case data is large or there is network congestion.
timeout := retry.NewExponential(e.config.FetchTimeout)
timeout = retry.WithCappedDuration(e.config.MaxFetchTimeout, timeout)
attempt := 0
return retry.Do(ctx, backoff, func(context.Context) error {
if attempt > 0 {
e.log.Debug().
Str("block_id", blockID.String()).
Uint64("height", height).
Uint64("attempt", uint64(attempt)).
Msgf("retrying download")
e.metrics.FetchRetried()
}
attempt++
// download execution data for the block
fetchTimeout, _ := timeout.Next()
err := e.processFetchRequest(ctx, blockID, height, fetchTimeout)
// don't retry if the blob was invalid
if isInvalidBlobError(err) {
return err
}
return retry.RetryableError(err)
})
}
func (e *executionDataRequester) processFetchRequest(ctx irrecoverable.SignalerContext, blockID flow.Identifier, height uint64, fetchTimeout time.Duration) error {
logger := e.log.With().
Str("block_id", blockID.String()).
Uint64("height", height).
Logger()
logger.Debug().Msg("processing fetch request")
seal, err := e.seals.FinalizedSealForBlock(blockID)
if err != nil {
ctx.Throw(fmt.Errorf("failed to get seal for block %s: %w", blockID, err))
}
result, err := e.results.ByID(seal.ResultID)
if err != nil {
ctx.Throw(fmt.Errorf("failed to lookup execution result for block %s: %w", blockID, err))
}
logger = logger.With().Str("execution_data_id", result.ExecutionDataID.String()).Logger()
start := time.Now()
e.metrics.ExecutionDataFetchStarted()
logger.Debug().Msg("downloading execution data")
_, err = e.fetchExecutionData(ctx, result.ExecutionDataID, fetchTimeout)
e.metrics.ExecutionDataFetchFinished(time.Since(start), err == nil, height)
if isInvalidBlobError(err) {
// This means an execution result was sealed with an invalid execution data id (invalid data).
// Eventually, verification nodes will verify that the execution data is valid, and not sign the receipt
logger.Error().Err(err).Msg("HALTING REQUESTER: invalid execution data found")
return err
}
// Some or all of the blob was missing or corrupt. retry
if isBlobNotFoundError(err) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
logger.Error().Err(err).Msg("failed to get execution data for block")
return err
}
// Any other error is unexpected
if err != nil {
logger.Error().Err(err).Msg("unexpected error fetching execution data")
ctx.Throw(err)
}
logger.Info().Msg("execution data fetched")
return nil
}
// fetchExecutionData fetches the ExecutionData by its ID, and times out if fetchTimeout is exceeded
func (e *executionDataRequester) fetchExecutionData(signalerCtx irrecoverable.SignalerContext, executionDataID flow.Identifier, fetchTimeout time.Duration) (*state_synchronization.ExecutionData, error) {
ctx, cancel := context.WithTimeout(signalerCtx, fetchTimeout)
defer cancel()
// Get the data from the network
// this is a blocking call, won't be unblocked until either hitting error (including timeout) or
// the data is received
executionData, err := e.eds.Get(ctx, executionDataID)
if err != nil {
return nil, err
}
return executionData, nil
}
// Notification Worker Methods
func (e *executionDataRequester) processNotificationJob(ctx irrecoverable.SignalerContext, job module.Job, jobComplete func()) {
// convert job into a block entry
entry, err := jobs.JobToBlockEntry(job)
if err != nil {
ctx.Throw(fmt.Errorf("failed to convert job to entry: %w", err))
}
e.processNotification(ctx, entry.Height, entry.ExecutionData)
jobComplete()
}
func (e *executionDataRequester) processNotification(ctx irrecoverable.SignalerContext, height uint64, executionData *state_synchronization.ExecutionData) {
e.log.Debug().Msgf("notifying for block %d", height)
// send notifications
e.notifyConsumers(executionData)
e.metrics.NotificationSent(height)
}
func (e *executionDataRequester) notifyConsumers(executionData *state_synchronization.ExecutionData) {
e.consumerMu.RLock()
defer e.consumerMu.RUnlock()
for _, fn := range e.consumers {
fn(executionData)
}
}
func isInvalidBlobError(err error) bool {
var malformedDataError *state_synchronization.MalformedDataError
var blobSizeLimitExceededError *state_synchronization.BlobSizeLimitExceededError
return errors.As(err, &malformedDataError) ||
errors.As(err, &blobSizeLimitExceededError) ||
errors.Is(err, state_synchronization.ErrBlobTreeDepthExceeded)
}
func isBlobNotFoundError(err error) bool {
var blobNotFoundError *state_synchronization.BlobNotFoundError
return errors.As(err, &blobNotFoundError)
}