-
Notifications
You must be signed in to change notification settings - Fork 1.6k
/
head_broadcaster.go
157 lines (134 loc) · 3.75 KB
/
head_broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package services
import (
"context"
"crypto/rand"
"fmt"
"reflect"
"sync"
"time"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/store"
"github.com/smartcontractkit/chainlink/core/store/models"
"github.com/smartcontractkit/chainlink/core/utils"
)
const callbackTimeout = 2 * time.Second
type callbackID [256]byte
// HeadBroadcastable defines the interface for listeners
type HeadBroadcastable interface {
OnNewLongestChain(ctx context.Context, head models.Head)
}
type callbackSet map[callbackID]HeadBroadcastable
func (set callbackSet) clone() callbackSet {
cp := make(callbackSet)
for id, callback := range set {
cp[id] = callback
}
return cp
}
// NewHeadBroadcaster creates a new HeadBroadcaster
func NewHeadBroadcaster() *HeadBroadcaster {
return &HeadBroadcaster{
callbacks: make(callbackSet),
mailbox: utils.NewMailbox(1),
mutex: &sync.RWMutex{},
chClose: make(chan struct{}),
wgDone: sync.WaitGroup{},
StartStopOnce: utils.StartStopOnce{},
}
}
// HeadBroadcaster relays heads from the head tracker to subscribed jobs, it is less robust against
// congestion than the head tracker, and missed heads should be expected by consuming jobs
type HeadBroadcaster struct {
callbacks callbackSet
mailbox *utils.Mailbox
mutex *sync.RWMutex
chClose chan struct{}
wgDone sync.WaitGroup
utils.StartStopOnce
}
var _ store.HeadTrackable = (*HeadBroadcaster)(nil)
func (hr *HeadBroadcaster) Start() error {
return hr.StartOnce("HeadBroadcaster", func() error {
hr.wgDone.Add(1)
go hr.run()
return nil
})
}
func (hr *HeadBroadcaster) Close() error {
if !hr.OkayToStop() {
return errors.New("HeadBroadcaster is already stopped")
}
close(hr.chClose)
hr.wgDone.Wait()
return nil
}
func (hr *HeadBroadcaster) Connect(head *models.Head) error {
return nil
}
func (hr *HeadBroadcaster) Disconnect() {}
func (hr *HeadBroadcaster) OnNewLongestChain(ctx context.Context, head models.Head) {
hr.mailbox.Deliver(head)
}
func (hr *HeadBroadcaster) Subscribe(callback HeadBroadcastable) (unsubscribe func()) {
hr.mutex.Lock()
defer hr.mutex.Unlock()
id, err := newID()
if err != nil {
logger.Errorf("Unable to create ID for head relayble callback: %v", err)
return
}
hr.callbacks[id] = callback
return func() {
hr.mutex.Lock()
defer hr.mutex.Unlock()
delete(hr.callbacks, id)
}
}
func (hr *HeadBroadcaster) run() {
defer hr.wgDone.Done()
for {
select {
case <-hr.chClose:
return
case <-hr.mailbox.Notify():
hr.executeCallbacks()
}
}
}
// DEV: the head relayer makes no promises about head delivery! Subscribing
// Jobs should expect to the relayer to skip heads if there is a large number of listeners
// and all callbacks cannot be completed in the allotted time.
func (hr *HeadBroadcaster) executeCallbacks() {
hr.mutex.RLock()
callbacks := hr.callbacks.clone()
hr.mutex.RUnlock()
head, ok := hr.mailbox.Retrieve().(models.Head)
if !ok {
logger.Errorf("expected `models.Head`, got %T", head)
return
}
wg := sync.WaitGroup{}
wg.Add(len(hr.callbacks))
for _, callback := range callbacks {
go func(hr HeadBroadcastable) {
defer wg.Done()
start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), callbackTimeout)
defer cancel()
hr.OnNewLongestChain(ctx, head)
elapsed := time.Since(start)
logger.Debugw(fmt.Sprintf("HeadBroadcaster: finished callback in %s", elapsed), "callbackType", reflect.TypeOf(hr), "blockNumber", head.Number, "time", elapsed, "id", "head_relayer")
}(callback)
}
wg.Wait()
}
func newID() (id callbackID, _ error) {
randBytes := make([]byte, 256)
_, err := rand.Read(randBytes)
if err != nil {
return id, err
}
copy(id[:], randBytes)
return id, nil
}