-
Notifications
You must be signed in to change notification settings - Fork 179
/
execution_data_reader.go
92 lines (76 loc) · 2.79 KB
/
execution_data_reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package jobs
import (
"context"
"fmt"
"time"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/storage"
)
// BlockEntry represents a block that's tracked by the ExecutionDataRequester
type BlockEntry struct {
BlockID flow.Identifier
Height uint64
ExecutionData *execution_data.BlockExecutionDataEntity
}
var _ module.Jobs = (*ExecutionDataReader)(nil)
// ExecutionDataReader provides an abstraction for consumers to read blocks as job.
type ExecutionDataReader struct {
store *cache.ExecutionDataCache
fetchTimeout time.Duration
highestConsecutiveHeight func() (uint64, error)
// TODO: refactor this to accept a context in AtIndex instead of storing it on the struct.
// This requires also refactoring jobqueue.Consumer
ctx irrecoverable.SignalerContext
}
// NewExecutionDataReader creates and returns a ExecutionDataReader.
func NewExecutionDataReader(
store *cache.ExecutionDataCache,
fetchTimeout time.Duration,
highestConsecutiveHeight func() (uint64, error),
) *ExecutionDataReader {
return &ExecutionDataReader{
store: store,
fetchTimeout: fetchTimeout,
highestConsecutiveHeight: highestConsecutiveHeight,
}
}
// AddContext adds a context to the execution data reader
// TODO: this is an anti-pattern, refactor this to accept a context in AtIndex instead of storing
// it on the struct.
func (r *ExecutionDataReader) AddContext(ctx irrecoverable.SignalerContext) {
r.ctx = ctx
}
// AtIndex returns the block entry job at the given height, or storage.ErrNotFound.
// Any other error is unexpected
func (r *ExecutionDataReader) AtIndex(height uint64) (module.Job, error) {
if r.ctx == nil {
return nil, fmt.Errorf("execution data reader is not initialized")
}
// data for the requested height or a lower height, has not been downloaded yet.
highestHeight, err := r.highestConsecutiveHeight()
if err != nil {
return nil, fmt.Errorf("failed to get highest height: %w", err)
}
if height > highestHeight {
return nil, storage.ErrNotFound
}
ctx, cancel := context.WithTimeout(r.ctx, r.fetchTimeout)
defer cancel()
executionData, err := r.store.ByHeight(ctx, height)
if err != nil {
return nil, fmt.Errorf("failed to get execution data for height %d: %w", height, err)
}
return BlockEntryToJob(&BlockEntry{
BlockID: executionData.BlockID,
Height: height,
ExecutionData: executionData,
}), nil
}
// Head returns the highest consecutive block height with downloaded execution data
func (r *ExecutionDataReader) Head() (uint64, error) {
return r.highestConsecutiveHeight()
}