-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
registry_synchronizer_core.go
127 lines (110 loc) · 3.76 KB
/
registry_synchronizer_core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package keeper
import (
"context"
"fmt"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/log"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/job"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
// RegistrySynchronizer conforms to the Service and Listener interfaces
var (
_ job.ServiceCtx = (*RegistrySynchronizer)(nil)
_ log.Listener = (*RegistrySynchronizer)(nil)
)
type RegistrySynchronizerOptions struct {
Job job.Job
RegistryWrapper RegistryWrapper
ORM ORM
JRM job.ORM
LogBroadcaster log.Broadcaster
MailMon *utils.MailboxMonitor
SyncInterval time.Duration
MinIncomingConfirmations uint32
Logger logger.Logger
SyncUpkeepQueueSize uint32
EffectiveKeeperAddress common.Address
}
type RegistrySynchronizer struct {
utils.StartStopOnce
chStop chan struct{}
registryWrapper RegistryWrapper
interval time.Duration
job job.Job
jrm job.ORM
logBroadcaster log.Broadcaster
mbLogs *utils.Mailbox[log.Broadcast]
minIncomingConfirmations uint32
effectiveKeeperAddress common.Address
orm ORM
logger logger.SugaredLogger
wgDone sync.WaitGroup
syncUpkeepQueueSize uint32 //Represents the max number of upkeeps that can be synced in parallel
mailMon *utils.MailboxMonitor
}
// NewRegistrySynchronizer is the constructor of RegistrySynchronizer
func NewRegistrySynchronizer(opts RegistrySynchronizerOptions) *RegistrySynchronizer {
return &RegistrySynchronizer{
chStop: make(chan struct{}),
registryWrapper: opts.RegistryWrapper,
interval: opts.SyncInterval,
job: opts.Job,
jrm: opts.JRM,
logBroadcaster: opts.LogBroadcaster,
mbLogs: utils.NewMailbox[log.Broadcast](5_000), // Arbitrary limit, better to have excess capacity
minIncomingConfirmations: opts.MinIncomingConfirmations,
orm: opts.ORM,
effectiveKeeperAddress: opts.EffectiveKeeperAddress,
logger: logger.Sugared(opts.Logger.Named("RegistrySynchronizer")),
syncUpkeepQueueSize: opts.SyncUpkeepQueueSize,
mailMon: opts.MailMon,
}
}
// Start starts RegistrySynchronizer.
func (rs *RegistrySynchronizer) Start(context.Context) error {
return rs.StartOnce("RegistrySynchronizer", func() error {
rs.wgDone.Add(2)
go rs.run()
var upkeepPerformedFilter [][]log.Topic
logListenerOpts, err := rs.registryWrapper.GetLogListenerOpts(rs.minIncomingConfirmations, upkeepPerformedFilter)
if err != nil {
return errors.Wrap(err, "Unable to fetch log listener opts from wrapper")
}
lbUnsubscribe := rs.logBroadcaster.Register(rs, *logListenerOpts)
go func() {
defer rs.wgDone.Done()
defer lbUnsubscribe()
<-rs.chStop
}()
rs.mailMon.Monitor(rs.mbLogs, "RegistrySynchronizer", "Logs", fmt.Sprint(rs.job.ID))
return nil
})
}
func (rs *RegistrySynchronizer) Close() error {
return rs.StopOnce("RegistrySynchronizer", func() error {
close(rs.chStop)
rs.wgDone.Wait()
return rs.mbLogs.Close()
})
}
func (rs *RegistrySynchronizer) run() {
syncTicker := utils.NewResettableTimer()
defer rs.wgDone.Done()
defer syncTicker.Stop()
rs.fullSync()
for {
select {
case <-rs.chStop:
return
case <-syncTicker.Ticks():
rs.fullSync()
syncTicker.Reset(rs.interval)
case <-rs.mbLogs.Notify():
rs.processLogs()
}
}
}