-
Notifications
You must be signed in to change notification settings - Fork 177
/
indexer.go
149 lines (127 loc) · 5.52 KB
/
indexer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
package indexer
import (
"errors"
"fmt"
"time"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/execution_data/cache"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/jobqueue"
"github.com/onflow/flow-go/module/state_synchronization"
"github.com/onflow/flow-go/module/state_synchronization/requester/jobs"
"github.com/onflow/flow-go/storage"
)
const (
workersCount = 1 // how many workers will concurrently process the tasks in the jobqueue
searchAhead = 1 // how many block heights ahead of the current will be requested and tasked for jobqueue
// fetchTimeout is the timeout for retrieving execution data from the datastore
// This is required by the execution data reader, but in practice, this isn't needed
// here since the data is in a local db.
fetchTimeout = 30 * time.Second
)
// ErrIndexNotInitialized is returned when the indexer is not initialized
//
// This generally indicates that the index databases are still being initialized, and trying again
// later may succeed
var ErrIndexNotInitialized = errors.New("index not initialized")
var _ state_synchronization.IndexReporter = (*Indexer)(nil)
// Indexer handles ingestion of new execution data available and uses the execution data indexer module
// to index the data.
// The processing of new available data is done by creating a jobqueue that uses the execution data reader to
// obtain new jobs. The worker also implements the `highestConsecutiveHeight` method which is used by the execution
// data reader, so it doesn't surpass the highest sealed block height when fetching the data.
// The execution state worker has a callback that is used by the upstream queues which download new execution data to
// notify new data is available and kick off indexing.
type Indexer struct {
component.Component
log zerolog.Logger
exeDataReader *jobs.ExecutionDataReader
exeDataNotifier engine.Notifier
indexer *IndexerCore
jobConsumer *jobqueue.ComponentConsumer
registers storage.RegisterIndex
}
// NewIndexer creates a new execution worker.
func NewIndexer(
log zerolog.Logger,
initHeight uint64,
registers storage.RegisterIndex,
indexer *IndexerCore,
executionCache *cache.ExecutionDataCache,
executionDataLatestHeight func() (uint64, error),
processedHeight storage.ConsumerProgress,
) (*Indexer, error) {
r := &Indexer{
log: log.With().Str("module", "execution_indexer").Logger(),
exeDataNotifier: engine.NewNotifier(),
indexer: indexer,
registers: registers,
}
r.exeDataReader = jobs.NewExecutionDataReader(executionCache, fetchTimeout, executionDataLatestHeight)
// create a jobqueue that will process new available block execution data. The `exeDataNotifier` is used to
// signal new work, which is being triggered on the `OnExecutionData` handler.
jobConsumer, err := jobqueue.NewComponentConsumer(
r.log,
r.exeDataNotifier.Channel(),
processedHeight,
r.exeDataReader,
initHeight,
r.processExecutionData,
workersCount,
searchAhead,
)
if err != nil {
return nil, fmt.Errorf("error creating execution data jobqueue: %w", err)
}
r.jobConsumer = jobConsumer
r.Component = r.jobConsumer
return r, nil
}
// Start the worker jobqueue to consume the available data.
func (i *Indexer) Start(ctx irrecoverable.SignalerContext) {
i.exeDataReader.AddContext(ctx)
i.Component.Start(ctx)
}
// LowestIndexedHeight returns the lowest height indexed by the execution indexer.
func (i *Indexer) LowestIndexedHeight() (uint64, error) {
// TODO: use a separate value to track the lowest indexed height. We're using the registers db's
// value here to start because it's convenient. When pruning support is added, this will need to
// be updated.
return i.registers.FirstHeight(), nil
}
// HighestIndexedHeight returns the highest height indexed by the execution indexer.
func (i *Indexer) HighestIndexedHeight() (uint64, error) {
select {
case <-i.jobConsumer.Ready():
default:
// LastProcessedIndex is not meaningful until the component has completed startup
return 0, fmt.Errorf("HighestIndexedHeight must not be called before the component is ready")
}
// The jobqueue maintains its own highest indexed height value, separate from the register db.
// Since jobs are only marked complete when ALL data is indexed, the lastProcessedIndex must
// be strictly less than or equal to the register db's LatestHeight.
return i.jobConsumer.LastProcessedIndex(), nil
}
// OnExecutionData is used to notify when new execution data is downloaded by the execution data requester jobqueue.
func (i *Indexer) OnExecutionData(_ *execution_data.BlockExecutionDataEntity) {
i.exeDataNotifier.Notify()
}
// processExecutionData is a worker method that is being called by the jobqueue when processing a new job.
// The job data contains execution data which we provide to the execution indexer to index it.
func (i *Indexer) processExecutionData(ctx irrecoverable.SignalerContext, job module.Job, done func()) {
entry, err := jobs.JobToBlockEntry(job)
if err != nil {
i.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error converting execution data job")
ctx.Throw(err)
}
err = i.indexer.IndexBlockData(entry.ExecutionData)
if err != nil {
i.log.Error().Err(err).Str("job_id", string(job.ID())).Msg("error during execution data index processing job")
ctx.Throw(err)
}
done()
}