-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
event_broadcaster.go
344 lines (294 loc) · 9.91 KB
/
event_broadcaster.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
package pg
import (
"context"
"database/sql"
"net/url"
"sync"
"time"
"github.com/google/uuid"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/static"
"github.com/smartcontractkit/chainlink/v2/core/utils"
)
//go:generate mockery --quiet --name EventBroadcaster --output ./mocks/ --case=underscore
//go:generate mockery --quiet --name Subscription --output ./mocks/ --case=underscore
// EventBroadcaster opaquely manages a collection of Postgres event listeners
// and broadcasts events to subscribers (with an optional payload filter).
type EventBroadcaster interface {
services.ServiceCtx
Subscribe(channel, payloadFilter string) (Subscription, error)
Notify(channel string, payload string) error
}
type eventBroadcaster struct {
uri string
minReconnectInterval time.Duration
maxReconnectDuration time.Duration
db *sql.DB
listener *pq.Listener
subscriptions map[string]map[Subscription]struct{}
subscriptionsMu sync.RWMutex
chStop chan struct{}
chDone chan struct{}
lggr logger.Logger
utils.StartStopOnce
}
var _ EventBroadcaster = (*eventBroadcaster)(nil)
type Event struct {
Channel string
Payload string
}
func NewEventBroadcaster(uri url.URL, minReconnectInterval time.Duration, maxReconnectDuration time.Duration, lggr logger.Logger, appID uuid.UUID) *eventBroadcaster {
if minReconnectInterval == time.Duration(0) {
minReconnectInterval = 1 * time.Second
}
if maxReconnectDuration == time.Duration(0) {
maxReconnectDuration = 1 * time.Minute
}
static.SetConsumerName(&uri, "EventBroadcaster", &appID)
return &eventBroadcaster{
uri: uri.String(),
minReconnectInterval: minReconnectInterval,
maxReconnectDuration: maxReconnectDuration,
subscriptions: make(map[string]map[Subscription]struct{}),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
lggr: lggr.Named("EventBroadcaster"),
}
}
// Start starts EventBroadcaster.
func (b *eventBroadcaster) Start(context.Context) error {
return b.StartOnce("Postgres event broadcaster", func() (err error) {
// Explicitly using the lib/pq for notifications so we use the postgres driverName
// and NOT pgx.
db, err := sql.Open("postgres", b.uri)
if err != nil {
return err
}
b.db = db
b.listener = pq.NewListener(b.uri, b.minReconnectInterval, b.maxReconnectDuration, func(ev pq.ListenerEventType, err error) {
// sanity check since these can still be called after closing the listener
select {
case <-b.chStop:
return
default:
}
// These are always connection-related events, and the pq library
// automatically handles reconnecting to the DB. Therefore, we do not
// need to terminate, but rather simply log these events for node
// operators' sanity.
switch ev {
case pq.ListenerEventConnected:
b.lggr.Debug("Postgres event broadcaster: connected")
case pq.ListenerEventDisconnected:
b.lggr.Warnw("Postgres event broadcaster: disconnected, trying to reconnect...", "err", err)
case pq.ListenerEventReconnected:
b.lggr.Debug("Postgres event broadcaster: reconnected")
case pq.ListenerEventConnectionAttemptFailed:
b.lggr.Warnw("Postgres event broadcaster: reconnect attempt failed, trying again...", "err", err)
}
})
go b.runLoop()
return nil
})
}
// Stop permanently destroys the EventBroadcaster. Calling this does not clean
// up any outstanding subscriptions. Subscribers must explicitly call `.Close()`
// or they will leak goroutines.
func (b *eventBroadcaster) Close() error {
return b.StopOnce("Postgres event broadcaster", func() (err error) {
b.subscriptionsMu.RLock()
defer b.subscriptionsMu.RUnlock()
b.subscriptions = nil
err = services.CloseAll(b.db, b.listener)
close(b.chStop)
<-b.chDone
return err
})
}
func (b *eventBroadcaster) Name() string {
return b.lggr.Name()
}
func (b *eventBroadcaster) HealthReport() map[string]error {
return map[string]error{b.Name(): b.Healthy()}
}
func (b *eventBroadcaster) runLoop() {
defer close(b.chDone)
for {
select {
case <-b.chStop:
return
case notification, open := <-b.listener.NotificationChannel():
if !open {
return
} else if notification == nil {
continue
}
b.lggr.Debugw("Postgres event broadcaster: received notification",
"channel", notification.Channel,
"payload", notification.Extra,
)
b.broadcast(notification)
}
}
}
func (b *eventBroadcaster) Notify(channel string, payload string) error {
_, err := b.db.Exec(`SELECT pg_notify($1, $2)`, channel, payload)
return errors.Wrap(err, "Postgres event broadcaster could not notify")
}
func (b *eventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error) {
b.subscriptionsMu.Lock()
defer b.subscriptionsMu.Unlock()
if _, exists := b.subscriptions[channel]; !exists {
err := b.listener.Listen(channel)
if err != nil {
return nil, errors.Wrap(err, "Postgres event broadcaster could not subscribe")
}
b.subscriptions[channel] = make(map[Subscription]struct{})
}
sub := &subscription{
channel: channel,
payloadFilter: payloadFilter,
eventBroadcaster: b,
queue: utils.NewBoundedQueue[Event](1000),
chEvents: make(chan Event),
chDone: make(chan struct{}),
lggr: logger.Sugared(b.lggr),
}
sub.processQueueWorker = utils.NewSleeperTask(
utils.SleeperFuncTask(sub.processQueue, "SubscriptionQueueProcessor"),
)
b.subscriptions[channel][sub] = struct{}{}
return sub, nil
}
func (b *eventBroadcaster) removeSubscription(sub Subscription) {
b.subscriptionsMu.Lock()
defer b.subscriptionsMu.Unlock()
// The following conditions can occur on shutdown when .Stop() is called
// before one or more subscriptions' .Close() methods are called
if b.subscriptions == nil {
return
}
subs, exists := b.subscriptions[sub.ChannelName()]
if !exists || subs == nil {
return
}
delete(b.subscriptions[sub.ChannelName()], sub)
if len(b.subscriptions[sub.ChannelName()]) == 0 {
err := b.listener.Unlisten(sub.ChannelName())
if err != nil {
b.lggr.Errorw("Postgres event broadcaster: failed to unsubscribe", "err", err)
}
delete(b.subscriptions, sub.ChannelName())
}
}
func (b *eventBroadcaster) broadcast(notification *pq.Notification) {
b.subscriptionsMu.RLock()
defer b.subscriptionsMu.RUnlock()
event := Event{
Channel: notification.Channel,
Payload: notification.Extra,
}
var wg sync.WaitGroup
for sub := range b.subscriptions[event.Channel] {
if sub.InterestedIn(event) {
wg.Add(1)
go func(sub Subscription) {
defer wg.Done()
sub.Send(event)
}(sub)
}
}
wg.Wait()
}
// Subscription represents a subscription to a Postgres event channel
type Subscription interface {
Events() <-chan Event
Close()
ChannelName() string
InterestedIn(event Event) bool
Send(event Event)
}
type subscription struct {
channel string
payloadFilter string
eventBroadcaster *eventBroadcaster
queue *utils.BoundedQueue[Event]
processQueueWorker utils.SleeperTask
chEvents chan Event
chDone chan struct{}
lggr logger.SugaredLogger
}
var _ Subscription = (*subscription)(nil)
func (sub *subscription) InterestedIn(event Event) bool {
return sub.payloadFilter == event.Payload || sub.payloadFilter == ""
}
func (sub *subscription) Send(event Event) {
sub.queue.Add(event)
sub.processQueueWorker.WakeUpIfStarted()
}
const broadcastTimeout = 10 * time.Second
func (sub *subscription) processQueue() {
deadline := time.Now().Add(broadcastTimeout)
for !sub.queue.Empty() {
event := sub.queue.Take()
select {
case sub.chEvents <- event:
case <-time.After(time.Until(deadline)):
sub.lggr.Warnf("Postgres event broadcaster: SLOW processQueue(), timed out after %s", broadcastTimeout)
return
case <-sub.chDone:
sub.lggr.Debugw("Postgres event broadcaster: request cancelled during processQueue()")
return
}
}
}
func (sub *subscription) Events() <-chan Event {
return sub.chEvents
}
func (sub *subscription) ChannelName() string {
return sub.channel
}
func (sub *subscription) Close() {
sub.eventBroadcaster.removeSubscription(sub)
// Close chDone before stopping the SleeperTask to avoid deadlocks
close(sub.chDone)
err := sub.processQueueWorker.Stop()
if err != nil {
sub.lggr.Errorw("THIS NEVER RETURNS AN ERROR", "err", err)
}
close(sub.chEvents)
}
// NullEventBroadcaster implements null pattern for event broadcaster
type NullEventBroadcaster struct {
Sub *NullSubscription
}
func NewNullEventBroadcaster() *NullEventBroadcaster {
sub := &NullSubscription{make(chan (Event))}
return &NullEventBroadcaster{sub}
}
var _ EventBroadcaster = &NullEventBroadcaster{}
func (*NullEventBroadcaster) Name() string { return "NullEventBroadcaster" }
// Start does no-op.
func (*NullEventBroadcaster) Start(context.Context) error { return nil }
// Close does no-op.
func (*NullEventBroadcaster) Close() error { return nil }
// Ready does no-op.
func (*NullEventBroadcaster) Ready() error { return nil }
// HealthReport does no-op
func (*NullEventBroadcaster) HealthReport() map[string]error { return map[string]error{} }
func (ne *NullEventBroadcaster) Subscribe(channel, payloadFilter string) (Subscription, error) {
return ne.Sub, nil
}
func (*NullEventBroadcaster) Notify(channel string, payload string) error { return nil }
var _ Subscription = &NullSubscription{}
type NullSubscription struct {
Ch chan (Event)
}
func (ns *NullSubscription) Events() <-chan Event { return ns.Ch }
func (ns *NullSubscription) Close() {}
func (ns *NullSubscription) ChannelName() string { return "" }
func (ns *NullSubscription) InterestedIn(event Event) bool { return false }
func (ns *NullSubscription) Send(event Event) {}