/
poll.go
357 lines (301 loc) · 10.1 KB
/
poll.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package poller
import (
"context"
"fmt"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/soyart/gsl"
"go.uber.org/zap"
"github.com/soyart/superwatcher"
"github.com/soyart/superwatcher/pkg/logger/debugger"
)
// mapLogsResult represents information of fresh blocks mapped by mapLogs.
// It contains fresh data, i.e. not from tracker.
type mapLogsResult struct {
forked bool // true if the tracker block hash differs from fresh block hash
superwatcher.Block
}
type param struct {
fromBlock uint64
toBlock uint64
policy superwatcher.Policy
}
func (p *poller) Poll(
ctx context.Context,
fromBlock uint64,
toBlock uint64,
) (
*superwatcher.PollerResult,
error,
) {
p.Lock()
defer p.Unlock()
if p.tracker != nil || p.lastRecordedBlock != 0 {
// Clear all tracker's blocks before fromBlock - filterRange
until := p.lastRecordedBlock - p.filterRange
p.debugger.Debug(2, "clearing tracker", zap.Uint64("untilBlock", until))
p.tracker.clearUntil(until)
}
param := ¶m{
fromBlock: fromBlock,
toBlock: toBlock,
policy: p.policy,
}
pollResults, err := poll(ctx, param, p.addresses, p.topics, p.client, p.debugger)
if err != nil {
return nil, err
}
var blocksMissing []uint64
pollResults, blocksMissing, err = pollMissing(ctx, param, p.client, p.tracker, pollResults, p.debugger)
if err != nil {
return nil, err
}
pollResults, err = findReorg(param, blocksMissing, p.tracker, pollResults, p.debugger)
if err != nil {
return nil, err
}
result, err := processResult(param, p.tracker, pollResults, p.debugger)
if err != nil {
return result, err
}
result.FromBlock, result.ToBlock = fromBlock, toBlock
result.LastGoodBlock = superwatcher.LastGoodBlock(result)
p.lastRecordedBlock = result.LastGoodBlock
fromBlockResult, ok := pollResults[fromBlock]
if ok {
if fromBlockResult.forked && p.doReorg {
return result, errors.Wrapf(
superwatcher.ErrFromBlockReorged, "fromBlock %d was removed/reorged", fromBlock,
)
}
}
return result, nil
}
func poll(
ctx context.Context,
param *param,
addresses []common.Address,
topics [][]common.Hash,
client superwatcher.EthClient,
debugger *debugger.Debugger,
) (
map[uint64]*mapLogsResult,
error,
) {
pollResults := make(map[uint64]*mapLogsResult)
switch {
case param.policy >= superwatcher.PolicyExpensiveBlock: // Get blocks and event logs concurrently
return nil, errors.New("PolicyExpensiveBlock not implemented")
case param.policy == superwatcher.PolicyExpensive:
return pollExpensive(ctx, param.fromBlock, param.toBlock, addresses, topics, client, pollResults, debugger)
case param.policy <= superwatcher.PolicyNormal:
return pollCheap(ctx, param.fromBlock, param.toBlock, addresses, topics, client, pollResults, debugger)
}
panic(superwatcher.ErrBadPolicy.Error() + " " + param.policy.String())
}
// pollMissing polls tracker blocks that were removed/reorged and thus currently missing from the chain.
func pollMissing(
ctx context.Context,
param *param,
client superwatcher.EthClient,
tracker *blockTracker, // tracker is used as read-only in here. Don't write.
pollResults map[uint64]*mapLogsResult,
debugger *debugger.Debugger,
) (
map[uint64]*mapLogsResult,
[]uint64, // tracker's blocks that went missing (no logs)
error,
) {
if tracker == nil {
return pollResults, nil, nil
}
// Find missing blocks (blocks in tracker that are not in pollResults)
var blocksMissing []uint64
for n := param.toBlock; n >= param.fromBlock; n-- {
// Not in tracker => not missing
if _, ok := tracker.getTrackerBlock(n); !ok {
continue
}
// In tracker and in pollResults => not missing
if _, ok := pollResults[n]; ok {
continue
}
blocksMissing = append(blocksMissing, n)
}
lenBlocks := len(blocksMissing)
if lenBlocks == 0 {
debugger.Debug(3, "found no missing blocks")
return pollResults, blocksMissing, nil
}
debugger.Debug(
1, fmt.Sprintf("found %d blocks missing, getting their headers", lenBlocks),
zap.Uint64s("blocksMissing", blocksMissing),
)
headers, err := getHeadersByNumbers(ctx, client, blocksMissing)
if err != nil {
return nil, blocksMissing, errors.Wrap(superwatcher.ErrFetchError, "failed to get block headers in mapLogsNg")
}
lenHeads := len(headers)
if lenHeads != lenBlocks {
return nil, blocksMissing, errors.Wrapf(
superwatcher.ErrFetchError, "expecting %d headers, got %d", lenBlocks, lenHeads,
)
}
// Collect headers for blocksMissing
_, err = collectHeaders(pollResults, param.fromBlock, param.toBlock, headers)
if err != nil {
if errors.Is(err, errHashesDiffer) {
// deleteMapResults(pollResults, lastGood)
return pollResults, blocksMissing, err
}
return nil, nil, errors.Wrap(err, "collectHeaders error")
}
return pollResults, blocksMissing, nil
}
// findReorg compares fresh block hashes with known hashes in tracker.
// If block hashes and logs length do not match, findReorg marks the block as reorged.
func findReorg(
param *param,
blocksMissing []uint64,
tracker *blockTracker,
pollResults map[uint64]*mapLogsResult,
debugger *debugger.Debugger,
) (
map[uint64]*mapLogsResult,
error,
) {
if tracker == nil {
return pollResults, nil
}
// Detect chain reorg using tracker
for n := param.fromBlock; n <= param.toBlock; n++ {
trackerBlock, ok := tracker.getTrackerBlock(n)
if !ok {
continue
}
pollResult, ok := pollResults[n]
if !ok {
// Not in result, but in range + in tracker => kinda sus
return nil, errors.Wrapf(superwatcher.ErrProcessReorg, "pollResult missing for trackerBlock %d", n)
}
if trackerBlock.Hash == pollResult.Hash && len(trackerBlock.Logs) == len(pollResult.Logs) {
continue
}
if gsl.Contains(blocksMissing, n) {
pollResult.LogsMigrated = true
}
debugger.Debug(
1, "chain reorg detected",
zap.Uint64("blockNumber", n),
zap.String("freshHash", pollResult.String()),
zap.String("trackerHash", trackerBlock.String()),
zap.Int("freshLogs", len(pollResult.Logs)),
zap.Int("trackerLogs", len(trackerBlock.Logs)),
)
pollResult.forked = true
}
return pollResults, nil
}
// processResult collects poll results from |tracker| and |pollResults| into superwatcher.PollerResult.
// It collects the result while also removing/adding fresh blocks to tracker, as per param.Policy.
// When collecting, it copies superwatcher.Block values into PollerResult to avoid mutating tracker values.
func processResult(
param *param,
tracker *blockTracker,
pollResults map[uint64]*mapLogsResult,
debugger *debugger.Debugger,
) (
*superwatcher.PollerResult,
error,
) {
// Fills |result| and saves current data back to tracker first.
result := new(superwatcher.PollerResult)
for number := param.fromBlock; number <= param.toBlock; number++ {
// Only blocks in pollResults are worth processing.
// There are 3 reasons a block is in pollResults:
// (1) block has >=1 interesting log
// (2) block _did_ have >= logs from the last call, but was reorged and no longer has any interesting logs
// If (2), then it will removed from tracker, and will no longer appear in pollResults after this call.
pollResult, ok := pollResults[number]
if !ok {
continue
}
// Reorged blocks (the ones that were removed) will be published with data from tracker
if pollResult.forked && tracker != nil {
trackerBlock, ok := tracker.getTrackerBlock(number)
if !ok || trackerBlock == nil {
debugger.Debug(
1, "block marked as reorged but was not found (or nil) in tracker",
zap.Uint64("blockNumber", number),
zap.String("freshHash", trackerBlock.String()),
)
return nil, errors.Wrapf(
superwatcher.ErrProcessReorg, "reorgedBlock %d not found in trackfromBlocker", number,
)
}
// Logs may be moved from blockNumber, hence there's no value in map
freshHash := pollResult.Hash
// Copy to avoid mutated trackerBlock which might break poller logic.
// After the copy, result.ReorgedBlocks consumer may freely mutate their *Block.
copiedFromTracker := *trackerBlock
result.ReorgedBlocks = append(result.ReorgedBlocks, &copiedFromTracker)
// Block used to have interesting logs, but chain reorg occurred
// and its logs were moved to somewhere else, or just removed altogether.
if pollResult.LogsMigrated {
debugger.Debug(
1, "logs missing from block",
zap.Uint64("blockNumber", number),
zap.String("freshHash", freshHash.String()),
zap.String("trackerHash", trackerBlock.String()),
zap.Int("old logs", len(trackerBlock.Logs)),
)
err := handleBlocksMissingPolicy(number, tracker, trackerBlock, freshHash, param.policy)
if err != nil {
return nil, errors.Wrap(superwatcher.ErrProcessReorg, err.Error())
}
}
}
freshBlock := pollResult.Block
addTrackerBlockPolicy(tracker, &freshBlock, param.policy)
// Copy goodBlock to avoid poller users mutating goodBlock values inside of tracker.
goodBlock := freshBlock
result.GoodBlocks = append(result.GoodBlocks, &goodBlock)
}
return result, nil
}
// handleBlocksMissingPolicy handles blocks that is marked with LogsMigrated (0 logs)
func handleBlocksMissingPolicy(
number uint64,
tracker *blockTracker,
trackerBlock *superwatcher.Block,
freshHash common.Hash,
policy superwatcher.Policy,
) error {
switch {
case policy == superwatcher.PolicyFast:
// Remove from tracker if block has 0 logs, and poller will cease to
// get block header for this empty block after this call.
if err := tracker.removeBlock(number); err != nil {
return errors.Wrap(superwatcher.ErrProcessReorg, err.Error())
}
default:
// Save new empty block information back to tracker. This will make poller
// continues to get header for this block until it goes out of filter (poll) scope.
trackerBlock.Hash = freshHash
trackerBlock.Logs = nil
}
return nil
}
// addTrackerBlockPolicy adds blocks to tracker based on PollPolicy.
// PolicyCheap will not save blocks with 0 logs to tracker, so as to avoid expensive header fetching.
func addTrackerBlockPolicy(tracker *blockTracker, block *superwatcher.Block, policy superwatcher.Policy) {
if tracker == nil {
return
}
if policy == superwatcher.PolicyFast {
if len(block.Logs) == 0 {
return
}
}
tracker.addTrackerBlock(block)
}