-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
eth_subscriber.go
272 lines (232 loc) · 8.78 KB
/
eth_subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
package log
import (
"context"
"fmt"
"math/big"
"time"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/null"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
type (
ethSubscriber struct {
ethClient evmclient.Client
config Config
logger logger.Logger
chStop utils.StopChan
}
)
func newEthSubscriber(ethClient evmclient.Client, config Config, logger logger.Logger, chStop chan struct{}) *ethSubscriber {
return ðSubscriber{
ethClient: ethClient,
config: config,
logger: logger.Named("EthSubscriber"),
chStop: chStop,
}
}
// backfillLogs - fetches earlier logs either from a relatively recent block (latest minus BlockBackfillDepth) or from the given fromBlockOverride
// note that the whole operation has no timeout - it relies on BlockBackfillSkip (set outside) to optionally prevent very deep, long backfills
// Max runtime is: (10 sec + 1 min * numBlocks/batchSize) * 3 retries
func (sub *ethSubscriber) backfillLogs(fromBlockOverride null.Int64, addresses []common.Address, topics []common.Hash) (chBackfilledLogs chan types.Log, abort bool) {
sub.logger.Infow("backfilling logs", "from", fromBlockOverride, "addresses", addresses)
if len(addresses) == 0 {
sub.logger.Debug("LogBroadcaster: No addresses to backfill for, returning")
ch := make(chan types.Log)
close(ch)
return ch, false
}
ctxParent, cancel := sub.chStop.NewCtx()
defer cancel()
var latestHeight int64 = -1
retryCount := 0
utils.RetryWithBackoff(ctxParent, func() (retry bool) {
if retryCount > 3 {
return false
}
retryCount++
if latestHeight < 0 {
latestBlock, err := sub.ethClient.HeadByNumber(ctxParent, nil)
if err != nil {
sub.logger.Warnw("LogBroadcaster: Backfill - could not fetch latest block header, will retry", "err", err)
return true
} else if latestBlock == nil {
sub.logger.Warn("LogBroadcaster: Got nil block header, will retry")
return true
}
latestHeight = latestBlock.Number
}
// Backfill from `backfillDepth` blocks ago. It's up to the subscribers to
// filter out logs they've already dealt with.
fromBlock := uint64(latestHeight) - sub.config.BlockBackfillDepth()
if fromBlock > uint64(latestHeight) {
fromBlock = 0 // Overflow protection
}
if fromBlockOverride.Valid {
fromBlock = uint64(fromBlockOverride.Int64)
}
if fromBlock <= uint64(latestHeight) {
sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Starting backfill of logs from %v blocks...", uint64(latestHeight)-fromBlock), "fromBlock", fromBlock, "latestHeight", latestHeight)
} else {
sub.logger.Infow("LogBroadcaster: Backfilling will be nop because fromBlock is above latestHeight",
"fromBlock", fromBlock, "latestHeight", latestHeight)
}
q := ethereum.FilterQuery{
FromBlock: big.NewInt(int64(fromBlock)),
Addresses: addresses,
Topics: [][]common.Hash{topics},
}
logs := make([]types.Log, 0)
start := time.Now()
// If we are significantly behind the latest head, there could be a very large (1000s)
// of blocks to check for logs. We read the blocks in batches to avoid hitting the websocket
// request data limit.
// On matic its 5MB [https://github.com/maticnetwork/bor/blob/3de2110886522ab17e0b45f3c4a6722da72b7519/rpc/http.go#L35]
// On ethereum its 15MB [https://github.com/ethereum/go-ethereum/blob/master/rpc/websocket.go#L40]
batchSize := int64(sub.config.LogBackfillBatchSize())
for from := q.FromBlock.Int64(); from <= latestHeight; from += batchSize {
to := from + batchSize - 1
if to > latestHeight {
to = latestHeight
}
q.FromBlock = big.NewInt(from)
q.ToBlock = big.NewInt(to)
ctx, cancel := context.WithTimeout(ctxParent, time.Minute)
batchLogs, err := sub.fetchLogBatch(ctx, q, start)
cancel()
elapsed := time.Since(start)
var elapsedMessage string
if elapsed > time.Minute {
elapsedMessage = " (backfill is taking a long time, delaying processing of newest logs - if it's an issue, consider setting the EVM.BlockBackfillSkip configuration variable to \"true\")"
}
if err != nil {
if ctx.Err() != nil {
sub.logger.Errorw("LogBroadcaster: Deadline exceeded, unable to backfill a batch of logs. Consider setting EVM.LogBackfillBatchSize to a lower value", "err", err, "elapsed", elapsed, "fromBlock", q.FromBlock.String(), "toBlock", q.ToBlock.String())
} else {
sub.logger.Errorw("LogBroadcaster: Unable to backfill a batch of logs after retries", "err", err, "fromBlock", q.FromBlock.String(), "toBlock", q.ToBlock.String())
}
return true
}
sub.logger.Infow(fmt.Sprintf("LogBroadcaster: Fetched a batch of %v logs from %v to %v%s", len(batchLogs), from, to, elapsedMessage), "len", len(batchLogs), "fromBlock", from, "toBlock", to, "remaining", int64(latestHeight)-to)
select {
case <-sub.chStop:
return false
default:
logs = append(logs, batchLogs...)
}
}
sub.logger.Infof("LogBroadcaster: Fetched a total of %v logs for backfill", len(logs))
// unbufferred channel, as it will be filled in the goroutine,
// while the broadcaster's eventLoop is reading from it
chBackfilledLogs = make(chan types.Log)
go func() {
defer close(chBackfilledLogs)
for _, log := range logs {
select {
case chBackfilledLogs <- log:
case <-sub.chStop:
return
}
}
sub.logger.Infof("LogBroadcaster: Finished async backfill of %v logs", len(logs))
}()
return false
})
select {
case <-sub.chStop:
abort = true
default:
abort = false
}
return
}
func (sub *ethSubscriber) fetchLogBatch(ctx context.Context, query ethereum.FilterQuery, start time.Time) ([]types.Log, error) {
var errOuter error
var result []types.Log
utils.RetryWithBackoff(ctx, func() (retry bool) {
batchLogs, err := sub.ethClient.FilterLogs(ctx, query)
errOuter = err
if err != nil {
if ctx.Err() != nil {
sub.logger.Errorw("LogBroadcaster: Inner deadline exceeded, unable to backfill a batch of logs. Consider setting EVM.LogBackfillBatchSize to a lower value", "err", err, "elapsed", time.Since(start),
"fromBlock", query.FromBlock.String(), "toBlock", query.ToBlock.String())
} else {
sub.logger.Errorw("LogBroadcaster: Unable to backfill a batch of logs", "err", err,
"fromBlock", query.FromBlock.String(), "toBlock", query.ToBlock.String())
}
return true
}
result = batchLogs
return false
})
return result, errOuter
}
// createSubscription creates a new log subscription starting at the current block. If previous logs
// are needed, they must be obtained through backfilling, as subscriptions can only be started from
// the current head.
func (sub *ethSubscriber) createSubscription(addresses []common.Address, topics []common.Hash) (subscr managedSubscription, abort bool) {
if len(addresses) == 0 {
return newNoopSubscription(), false
}
ctx, cancel := sub.chStop.NewCtx()
defer cancel()
utils.RetryWithBackoff(ctx, func() (retry bool) {
filterQuery := ethereum.FilterQuery{
Addresses: addresses,
Topics: [][]common.Hash{topics},
}
chRawLogs := make(chan types.Log)
sub.logger.Debugw("Calling SubscribeFilterLogs with params", "addresses", addresses, "topics", topics)
innerSub, err := sub.ethClient.SubscribeFilterLogs(ctx, filterQuery, chRawLogs)
if err != nil {
sub.logger.Errorw("Log subscriber could not create subscription to Ethereum node", "err", err)
return true
}
subscr = managedSubscriptionImpl{
subscription: innerSub,
chRawLogs: chRawLogs,
}
return false
})
select {
case <-sub.chStop:
abort = true
default:
abort = false
}
return
}
// A managedSubscription acts as wrapper for the Subscription. Specifically, the
// managedSubscription closes the log channel as soon as the unsubscribe request is made
type managedSubscription interface {
Err() <-chan error
Logs() chan types.Log
Unsubscribe()
}
type managedSubscriptionImpl struct {
subscription ethereum.Subscription
chRawLogs chan types.Log
}
func (sub managedSubscriptionImpl) Err() <-chan error {
return sub.subscription.Err()
}
func (sub managedSubscriptionImpl) Logs() chan types.Log {
return sub.chRawLogs
}
func (sub managedSubscriptionImpl) Unsubscribe() {
sub.subscription.Unsubscribe()
<-sub.Err() // ensure sending has stopped before closing the chan
close(sub.chRawLogs)
}
type noopSubscription struct {
chRawLogs chan types.Log
}
func newNoopSubscription() noopSubscription {
return noopSubscription{make(chan types.Log)}
}
func (b noopSubscription) Err() <-chan error { return nil }
func (b noopSubscription) Logs() chan types.Log { return b.chRawLogs }
func (b noopSubscription) Unsubscribe() { close(b.chRawLogs) }