-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
block_subscriber.go
292 lines (260 loc) · 8.47 KB
/
block_subscriber.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
package evm
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum/common"
ocr2keepers "github.com/smartcontractkit/ocr2keepers/pkg/v3/types"
httypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/headtracker/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/pg"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
const (
// cleanUpInterval is the interval for cleaning up block maps
cleanUpInterval = 15 * time.Minute
// channelSize represents the channel size for head broadcaster
channelSize = 100
// lookbackDepth decides valid trigger block lookback range
lookbackDepth = 1024
// blockHistorySize decides the block history size sent to subscribers
blockHistorySize = int64(256)
)
var (
BlockSubscriberServiceName = "BlockSubscriber"
)
type BlockSubscriber struct {
utils.StartStopOnce
threadCtrl utils.ThreadControl
mu sync.RWMutex
hb httypes.HeadBroadcaster
lp logpoller.LogPoller
headC chan *evmtypes.Head
unsubscribe func()
subscribers map[int]chan ocr2keepers.BlockHistory
blocks map[int64]string
maxSubId int
lastClearedBlock int64
lastSentBlock int64
latestBlock atomic.Pointer[ocr2keepers.BlockKey]
blockHistorySize int64
blockSize int64
finalityDepth uint32
lggr logger.Logger
}
var _ ocr2keepers.BlockSubscriber = &BlockSubscriber{}
func NewBlockSubscriber(hb httypes.HeadBroadcaster, lp logpoller.LogPoller, finalityDepth uint32, lggr logger.Logger) *BlockSubscriber {
return &BlockSubscriber{
threadCtrl: utils.NewThreadControl(),
hb: hb,
lp: lp,
headC: make(chan *evmtypes.Head, channelSize),
subscribers: map[int]chan ocr2keepers.BlockHistory{},
blocks: map[int64]string{},
blockHistorySize: blockHistorySize,
blockSize: lookbackDepth,
finalityDepth: finalityDepth,
latestBlock: atomic.Pointer[ocr2keepers.BlockKey]{},
lggr: lggr.Named("BlockSubscriber"),
}
}
func (bs *BlockSubscriber) getBlockRange(ctx context.Context) ([]uint64, error) {
h, err := bs.lp.LatestBlock(pg.WithParentCtx(ctx))
if err != nil {
return nil, err
}
bs.lggr.Infof("latest block from log poller is %d", h)
var blocks []uint64
for i := bs.blockSize - 1; i >= 0; i-- {
if h-i > 0 {
blocks = append(blocks, uint64(h-i))
}
}
return blocks, nil
}
func (bs *BlockSubscriber) initializeBlocks(ctx context.Context, blocks []uint64) error {
logpollerBlocks, err := bs.lp.GetBlocksRange(ctx, blocks)
if err != nil {
return err
}
for i, b := range logpollerBlocks {
if i == 0 {
bs.lastClearedBlock = b.BlockNumber - 1
bs.lggr.Infof("lastClearedBlock is %d", bs.lastClearedBlock)
}
bs.blocks[b.BlockNumber] = b.BlockHash.Hex()
}
bs.lggr.Infof("initialize with %d blocks", len(logpollerBlocks))
return nil
}
func (bs *BlockSubscriber) buildHistory(block int64) ocr2keepers.BlockHistory {
var keys []ocr2keepers.BlockKey
// populate keys slice in block DES order
for i := int64(0); i < bs.blockHistorySize; i++ {
if block-i > 0 {
if h, ok := bs.blocks[block-i]; ok {
keys = append(keys, ocr2keepers.BlockKey{
Number: ocr2keepers.BlockNumber(block - i),
Hash: common.HexToHash(h),
})
} else {
bs.lggr.Debugf("block %d is missing", block-i)
}
}
}
return keys
}
func (bs *BlockSubscriber) cleanup() {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.lggr.Debugf("start clearing blocks from %d to %d", bs.lastClearedBlock+1, bs.lastSentBlock-bs.blockSize)
for i := bs.lastClearedBlock + 1; i <= bs.lastSentBlock-bs.blockSize; i++ {
delete(bs.blocks, i)
}
bs.lastClearedBlock = bs.lastSentBlock - bs.blockSize
bs.lggr.Infof("lastClearedBlock is set to %d", bs.lastClearedBlock)
}
func (bs *BlockSubscriber) initialize(ctx context.Context) {
bs.mu.Lock()
defer bs.mu.Unlock()
// initialize the blocks map with the recent blockSize blocks
blocks, err := bs.getBlockRange(ctx)
if err != nil {
bs.lggr.Errorf("failed to get block range", err)
}
err = bs.initializeBlocks(ctx, blocks)
if err != nil {
bs.lggr.Errorf("failed to get log poller blocks", err)
}
_, bs.unsubscribe = bs.hb.Subscribe(&headWrapper{headC: bs.headC, lggr: bs.lggr})
}
func (bs *BlockSubscriber) Start(ctx context.Context) error {
return bs.StartOnce(BlockSubscriberServiceName, func() error {
bs.lggr.Info("block subscriber started.")
bs.initialize(ctx)
// poll from head broadcaster channel and push to subscribers
bs.threadCtrl.Go(func(ctx context.Context) {
for {
select {
case h := <-bs.headC:
if h != nil {
bs.processHead(h)
}
case <-ctx.Done():
return
}
}
})
// cleanup old blocks
bs.threadCtrl.Go(func(ctx context.Context) {
ticker := time.NewTicker(cleanUpInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
bs.cleanup()
case <-ctx.Done():
return
}
}
})
return nil
})
}
func (bs *BlockSubscriber) Close() error {
return bs.StopOnce(BlockSubscriberServiceName, func() error {
bs.lggr.Info("stop block subscriber")
bs.threadCtrl.Close()
bs.unsubscribe()
return nil
})
}
func (bs *BlockSubscriber) Subscribe() (int, chan ocr2keepers.BlockHistory, error) {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.maxSubId++
subId := bs.maxSubId
newC := make(chan ocr2keepers.BlockHistory, channelSize)
bs.subscribers[subId] = newC
bs.lggr.Infof("new subscriber %d", subId)
return subId, newC, nil
}
func (bs *BlockSubscriber) Unsubscribe(subId int) error {
bs.mu.Lock()
defer bs.mu.Unlock()
c, ok := bs.subscribers[subId]
if !ok {
return fmt.Errorf("subscriber %d does not exist", subId)
}
close(c)
delete(bs.subscribers, subId)
bs.lggr.Infof("subscriber %d unsubscribed", subId)
return nil
}
func (bs *BlockSubscriber) processHead(h *evmtypes.Head) {
bs.mu.Lock()
defer bs.mu.Unlock()
// head parent is a linked list with EVM finality depth
// when re-org happens, new heads will have pointers to the new blocks
i := int64(0)
for cp := h; cp != nil; cp = cp.Parent {
// we don't stop when a matching (block number/hash) entry is seen in the map because parent linked list may be
// cut short during a re-org if head broadcaster backfill is not complete. This can cause some re-orged blocks
// left in the map. for example, re-org happens for block 98, 99, 100. next head 101 from broadcaster has parent list
// of 100, so block 100 and 101 are updated. when next head 102 arrives, it has full parent history of finality depth.
// if we stop when we see a block number/hash match, we won't look back and correct block 98 and 99.
// hence, we make a compromise here and check previous max(finality depth, blockSize) blocks and update the map.
existingHash, ok := bs.blocks[cp.Number]
if !ok {
bs.lggr.Debugf("filling block %d with new hash %s", cp.Number, cp.Hash.Hex())
} else if existingHash != cp.Hash.Hex() {
bs.lggr.Warnf("overriding block %d old hash %s with new hash %s due to re-org", cp.Number, existingHash, cp.Hash.Hex())
}
bs.blocks[cp.Number] = cp.Hash.Hex()
i++
if i > int64(bs.finalityDepth) || i > bs.blockSize {
break
}
}
bs.lggr.Debugf("blocks block %d hash is %s", h.Number, h.Hash.Hex())
history := bs.buildHistory(h.Number)
block := &ocr2keepers.BlockKey{
Number: ocr2keepers.BlockNumber(h.Number),
}
copy(block.Hash[:], h.Hash[:])
bs.latestBlock.Store(block)
bs.lastSentBlock = h.Number
// send history to all subscribers
for _, subC := range bs.subscribers {
// wrapped in a select to not get blocked by certain subscribers
select {
case subC <- history:
default:
bs.lggr.Warnf("subscriber channel is full, dropping block history with length %d", len(history))
}
}
bs.lggr.Debugf("published block history with length %d and latestBlock %d to %d subscriber(s)", len(history), bs.latestBlock.Load(), len(bs.subscribers))
}
func (bs *BlockSubscriber) queryBlocksMap(bn int64) (string, bool) {
bs.mu.RLock()
defer bs.mu.RUnlock()
v, ok := bs.blocks[bn]
return v, ok
}
type headWrapper struct {
headC chan *evmtypes.Head
lggr logger.Logger
}
func (w *headWrapper) OnNewLongestChain(_ context.Context, head *evmtypes.Head) {
if head != nil {
select {
case w.headC <- head:
default:
w.lggr.Debugf("head channel is full, discarding head %+v", head)
}
}
}