-
Notifications
You must be signed in to change notification settings - Fork 23
/
fetcher.go
199 lines (176 loc) · 6.94 KB
/
fetcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package indexer
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/synapsecns/sanguine/services/scribe/backend"
"github.com/synapsecns/sanguine/services/scribe/logger"
scribeTypes "github.com/synapsecns/sanguine/services/scribe/types"
"math/big"
"time"
"github.com/synapsecns/sanguine/ethergo/util"
"github.com/ethereum/go-ethereum/core/types"
"github.com/jpillora/backoff"
)
// LogFetcher pre-fetches filter logs into a channel in deterministic order.
type LogFetcher struct {
// iterator is the chunk iterator used for the range.
iterator util.ChunkIterator
// for logging
startBlock *big.Int
// for logging
endBlock *big.Int
// fetchedLogsChan is a channel with the fetched chunks of logs.
fetchedLogsChan chan types.Log
// backend is the ethereum backend used to fetch logs.
backend backend.ScribeBackend
// indexerConfig holds the chain config (config data for the chain)
indexerConfig *scribeTypes.IndexerConfig
// topics is the list of topics to filter logs by.
topics [][]common.Hash
// bufferSize prevents from overloading the scribe indexer with too many logs as well as upstream RPCs with too many requests.
bufferSize int
}
// NewLogFetcher creates a new filtering interface for a range of blocks. If reverse is not set, block heights are filtered from start->end.
func NewLogFetcher(backend backend.ScribeBackend, startBlock, endBlock *big.Int, indexerConfig *scribeTypes.IndexerConfig, ascending bool) *LogFetcher {
// The ChunkIterator is inclusive of the start and ending block resulting in potentially confusing behavior when
// setting the range size in the config. For example, setting a range of 1 would result in two blocks being queried
// instead of 1. This is accounted for by subtracting 1.
chunkSize := int(indexerConfig.GetLogsRange) - 1
// Using the specified StoreConcurrency value from the config, as the buffer size for the fetchedLogsChan
bufferSize := indexerConfig.StoreConcurrency
if bufferSize > 100 {
bufferSize = 100
}
if bufferSize == 0 {
bufferSize = 3 // default buffer size
}
return &LogFetcher{
iterator: util.NewChunkIterator(startBlock, endBlock, chunkSize, ascending),
startBlock: startBlock,
endBlock: endBlock,
fetchedLogsChan: make(chan types.Log, bufferSize),
backend: backend,
indexerConfig: indexerConfig,
bufferSize: bufferSize,
topics: indexerConfig.Topics,
}
}
// GetChunkArr gets the appropriate amount of block chunks (getLogs ranges).
func (f *LogFetcher) GetChunkArr() (chunkArr []*util.Chunk) {
for i := uint64(0); i < f.indexerConfig.GetLogsBatchAmount; i++ {
chunk := f.iterator.NextChunk()
if chunk == nil {
return chunkArr
}
chunkArr = append(chunkArr, chunk)
// Stop appending chunks if the max height of the current chunk exceeds the concurrency threshold
if chunk.EndBlock.Uint64() > f.endBlock.Uint64()-f.indexerConfig.ConcurrencyThreshold {
logger.ReportScribeState(f.indexerConfig.ChainID, chunk.EndBlock.Uint64(), f.indexerConfig.Addresses, logger.ConcurrencyThresholdReached)
return chunkArr
}
}
return chunkArr
}
// Start starts the log fetching process. If the context is canceled, logs will stop being filtered.
// 1. Within an infinite for loop, chunks of getLogs blocks are constructed and used to get logs. This flow is paused
// when the logs channel's buffer of 15 is reached.
// 2. Each time the logs are received, a wait group is used to ensure that there is no race condition
// where channels could be closed before a log could be saved.
// 3. When the range to get logs is completed (GetChunkArr returns a zero array), the wait group is used to ensure
// that all logs are added to the logs channel before returning and terminating the function.
// 4. Completing the Start function triggers the closeOnDone function, which sends a boolean in the done channel
// that signals that the fetcher has completed. The consumer of these logs then performs a drain to fully empty the logs
// channel. See contract.go to learn more how the logs from this file are consumed.
func (f *LogFetcher) Start(ctx context.Context) error {
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
return fmt.Errorf("could not finish filtering range: %w", ctx.Err())
}
return nil
default:
chunks := f.GetChunkArr()
if len(chunks) == 0 {
close(f.fetchedLogsChan)
return nil
}
logs, err := f.FetchLogs(ctx, chunks)
if err != nil {
return fmt.Errorf("could not filter logs: %w", err)
}
select {
case <-ctx.Done():
return fmt.Errorf("context canceled while adding log to chan %w", ctx.Err())
default:
// insert logs into channel
for i := range logs {
f.fetchedLogsChan <- logs[i]
}
}
}
}
}
// FetchLogs safely calls FilterLogs with the filtering implementing a backoff in the case of
// rate limiting and respects context cancellation.
//
// nolint:cyclop
func (f *LogFetcher) FetchLogs(ctx context.Context, chunks []*util.Chunk) ([]types.Log, error) {
backoffConfig := &backoff.Backoff{
Factor: 2,
Jitter: true,
Min: 1 * time.Second,
Max: 8 * time.Second,
}
attempt := 0
timeout := time.Duration(0)
for {
select {
case <-ctx.Done():
logger.ReportIndexerError(ctx.Err(), *f.indexerConfig, logger.GetLogsError)
return nil, fmt.Errorf("context was canceled before logs could be fetched")
case <-time.After(timeout):
attempt++
if attempt > retryTolerance {
logger.ReportIndexerError(fmt.Errorf("retry max reached"), *f.indexerConfig, logger.GetLogsError)
return nil, fmt.Errorf("maximum number of fetch logs attempts exceeded")
}
logs, err := f.getAndUnpackLogs(ctx, chunks, backoffConfig)
if err != nil {
logger.ReportIndexerError(err, *f.indexerConfig, logger.GetLogsError)
timeout = backoffConfig.Duration()
continue
}
return logs, nil
}
}
}
func (f *LogFetcher) getAndUnpackLogs(ctx context.Context, chunks []*util.Chunk, backoffConfig *backoff.Backoff) ([]types.Log, error) {
result, err := backend.GetLogsInRange(ctx, f.backend, f.indexerConfig.Addresses, uint64(f.indexerConfig.ChainID), chunks, f.indexerConfig.Topics)
if err != nil {
backoffConfig.Duration()
return nil, fmt.Errorf("could not get logs: %w", err)
}
var logs []types.Log
resultIterator := result.Iterator()
for !resultIterator.Done() {
select {
case <-ctx.Done():
logger.ReportIndexerError(ctx.Err(), *f.indexerConfig, logger.GetLogsError)
return nil, fmt.Errorf("context canceled while unpacking logs from request: %w", ctx.Err())
default:
_, logChunk := resultIterator.Next()
if logChunk == nil || len(*logChunk) == 0 {
logger.ReportIndexerError(fmt.Errorf("empty log chunk"), *f.indexerConfig, logger.EmptyGetLogsChunk)
continue
}
logs = append(logs, *logChunk...)
}
}
return logs, nil
}
// GetFetchedLogsChan returns the fetchedLogsChan channel as a pointer for access by the indexer and tests.
func (f *LogFetcher) GetFetchedLogsChan() *chan types.Log {
return &f.fetchedLogsChan
}