/
eth_syncer.go
70 lines (60 loc) · 2.13 KB
/
eth_syncer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
package eth
import (
"context"
"time"
"github.com/ethereum/go-ethereum/core/types"
"github.com/specularL2/specular/services/sidecar/utils"
"golang.org/x/sync/errgroup"
)
// TODO: move to config
const (
EthSlotInterval = 12 * time.Second
EthEpochInterval = 6*time.Minute + 24*time.Second
)
type EthSyncer struct {
OnNewHandler
LatestHeaderBroker *utils.Broker[*types.Header]
SafeHeaderBroker *utils.Broker[*types.Header]
FinalizedHeaderBroker *utils.Broker[*types.Header]
eg errgroup.Group
}
type syncerEthClient interface {
HeaderByTag(ctx context.Context, tag BlockTag) (*types.Header, error)
}
type OnNewHandler interface {
OnLatest(ctx context.Context, header *types.Header) error
OnSafe(ctx context.Context, header *types.Header) error
OnFinalized(ctx context.Context, header *types.Header) error
}
func NewEthSyncer(handler OnNewHandler) *EthSyncer {
return &EthSyncer{
OnNewHandler: handler,
LatestHeaderBroker: utils.NewBroker[*types.Header](),
SafeHeaderBroker: utils.NewBroker[*types.Header](),
FinalizedHeaderBroker: utils.NewBroker[*types.Header](),
}
}
// Starts a subscription in a separate goroutine for each commitment level.
func (s *EthSyncer) Start(ctx context.Context, client syncerEthClient) {
s.subscribeNewHead(ctx, client, Latest, s.LatestHeaderBroker, s.OnLatest, EthSlotInterval)
s.subscribeNewHead(ctx, client, Safe, s.SafeHeaderBroker, s.OnSafe, EthEpochInterval)
s.subscribeNewHead(ctx, client, Finalized, s.FinalizedHeaderBroker, s.OnFinalized, EthEpochInterval)
}
func (s *EthSyncer) Stop(ctx context.Context) {
s.LatestHeaderBroker.Stop()
s.FinalizedHeaderBroker.Stop()
s.eg.Wait()
}
// Starts polling for new headers and publishes them to the broker.
func (s *EthSyncer) subscribeNewHead(
ctx context.Context,
client syncerEthClient,
tag BlockTag,
broker *utils.Broker[*types.Header],
fn func(context.Context, *types.Header) error,
pollInterval time.Duration,
) {
sub := SubscribeNewHeadByPolling(ctx, client, broker.PubCh, tag, pollInterval, 10*time.Second)
s.eg.Go(func() error { return broker.Start(ctx, sub) })
broker.SubscribeWithCallback(ctx, fn)
}