-
Notifications
You must be signed in to change notification settings - Fork 38
/
listener.go
604 lines (497 loc) · 15.2 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
package event
import (
"context"
"errors"
"fmt"
"sync"
"github.com/nspcc-dev/neo-go/pkg/core/state"
"github.com/nspcc-dev/neo-go/pkg/neorpc/result"
"github.com/nspcc-dev/neo-go/pkg/util"
"github.com/nspcc-dev/neofs-node/pkg/morph/client"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
// Listener is an interface of smart contract notification event listener.
type Listener interface {
// Listen must start the event listener.
//
// Must listen to events with the parser installed.
Listen(context.Context)
// ListenWithError must start the event listener.
//
// Must listen to events with the parser installed.
//
// Must send error to channel if subscriber channel has been closed or
// it could not be started.
ListenWithError(context.Context, chan<- error)
// SetNotificationParser must set the parser of particular contract event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
SetNotificationParser(NotificationParserInfo)
// RegisterNotificationHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
RegisterNotificationHandler(NotificationHandlerInfo)
// EnableNotarySupport enables notary request listening. Passed hashes are
// notary mainTX signer and local key account. In practice, it means that listener
// will subscribe only for notary requests that are going to be paid with passed
// mainTX hash.
//
// Must not be called after Listen or ListenWithError.
EnableNotarySupport(mainSigner util.Uint160, localAcc util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter)
// SetNotaryParser must set the parser of particular notary request event.
//
// Parser of each event must be set once. All parsers must be set before Listen call.
//
// Must ignore nil parsers and all calls after listener has been started.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
SetNotaryParser(NotaryParserInfo)
// RegisterNotaryHandler must register the event handler for particular notification event of contract.
//
// The specified handler must be called after each capture and parsing of the event.
//
// Must ignore nil handlers.
//
// Has no effect if EnableNotarySupport was not called before Listen or ListenWithError.
RegisterNotaryHandler(NotaryHandlerInfo)
// RegisterBlockHandler must register chain block handler.
//
// The specified handler must be called after each capture and parsing of the new block from chain.
//
// Must ignore nil handlers.
RegisterBlockHandler(BlockHandler)
// Stop must stop the event listener.
Stop()
}
// ListenerParams is a group of parameters
// for Listener constructor.
type ListenerParams struct {
Logger *zap.Logger
Client *client.Client
WorkerPoolCapacity int
}
type listener struct {
mtx sync.RWMutex
startOnce, stopOnce sync.Once
started bool
notificationParsers map[scriptHashWithType]NotificationParser
notificationHandlers map[scriptHashWithType][]Handler
listenNotary bool
notaryEventsPreparator preparator
notaryParsers map[notaryRequestTypes]NotaryParser
notaryHandlers map[notaryRequestTypes]Handler
notaryMainTXSigner util.Uint160 // filter for notary subscription
log *zap.Logger
cli *client.Client
blockHandlers []BlockHandler
pool *ants.Pool
}
const newListenerFailMsg = "could not instantiate Listener"
var (
errNilLogger = errors.New("nil logger")
errNilSubscriber = errors.New("nil event client")
)
// Listen starts the listening for events with registered handlers.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) Listen(ctx context.Context) {
l.startOnce.Do(func() {
if err := l.listen(ctx); err != nil {
l.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
}
})
}
// ListenWithError starts the listening for events with registered handlers and
// passing error message to intError channel if subscriber channel has been closed.
//
// Executes once, all subsequent calls do nothing.
//
// Returns an error if listener was already started.
func (l *listener) ListenWithError(ctx context.Context, intError chan<- error) {
l.startOnce.Do(func() {
if err := l.listen(ctx); err != nil {
l.log.Error("could not start listen to events",
zap.String("error", err.Error()),
)
intError <- err
}
})
}
func (l *listener) listen(ctx context.Context) error {
// mark listener as started
l.started = true
subErrCh := make(chan error)
go l.subscribe(subErrCh)
return l.listenLoop(ctx, subErrCh)
}
func (l *listener) subscribe(errCh chan error) {
// create the list of listening contract hashes
hashes := make([]util.Uint160, 0)
// fill the list with the contracts with set event parsers.
l.mtx.RLock()
for hashType := range l.notificationParsers {
scHash := hashType.ScriptHash()
// prevent repetitions
for _, hash := range hashes {
if hash.Equals(scHash) {
continue
}
}
hashes = append(hashes, hashType.ScriptHash())
}
l.mtx.RUnlock()
err := l.cli.ReceiveExecutionNotifications(hashes)
if err != nil {
errCh <- fmt.Errorf("could not subscribe for notifications: %w", err)
return
}
if len(l.blockHandlers) > 0 {
if err = l.cli.ReceiveBlocks(); err != nil {
errCh <- fmt.Errorf("could not subscribe for blocks: %w", err)
return
}
}
if l.listenNotary {
if err = l.cli.ReceiveNotaryRequests(l.notaryMainTXSigner); err != nil {
errCh <- fmt.Errorf("could not subscribe for notary requests: %w", err)
return
}
}
}
func (l *listener) listenLoop(ctx context.Context, subErrCh chan error) error {
nCh, bCh, notaryCh := l.cli.Notifications()
var res error
loop:
for {
select {
case res = <-subErrCh:
l.log.Error("stop event listener by error", zap.Error(res))
break loop
case <-ctx.Done():
l.log.Info("stop event listener by context",
zap.String("reason", ctx.Err().Error()),
)
break loop
case notifyEvent, ok := <-nCh:
if !ok {
l.log.Warn("stop event listener by notification channel")
res = errors.New("event subscriber connection has been terminated")
break loop
}
if err := l.pool.Submit(func() {
l.parseAndHandleNotification(notifyEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
case notaryEvent, ok := <-notaryCh:
if !ok {
l.log.Warn("stop event listener by notary channel")
res = errors.New("notary event subscriber connection has been terminated")
break loop
}
if err := l.pool.Submit(func() {
l.parseAndHandleNotary(notaryEvent)
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
case b, ok := <-bCh:
if !ok {
l.log.Warn("stop event listener by block channel")
res = errors.New("new block notification channel is closed")
break loop
}
if err := l.pool.Submit(func() {
for i := range l.blockHandlers {
l.blockHandlers[i](b)
}
}); err != nil {
l.log.Warn("listener worker pool drained",
zap.Int("capacity", l.pool.Cap()))
}
}
}
return res
}
func (l *listener) parseAndHandleNotification(notifyEvent *state.ContainedNotificationEvent) {
log := l.log.With(
zap.String("script hash LE", notifyEvent.ScriptHash.StringLE()),
)
// calculate event type from bytes
typEvent := TypeFromString(notifyEvent.Name)
log = log.With(
zap.String("event type", notifyEvent.Name),
)
// get the event parser
keyEvent := scriptHashWithType{}
keyEvent.SetScriptHash(notifyEvent.ScriptHash)
keyEvent.SetType(typEvent)
l.mtx.RLock()
parser, ok := l.notificationParsers[keyEvent]
l.mtx.RUnlock()
if !ok {
log.Debug("event parser not set")
return
}
// parse the notification event
event, err := parser(notifyEvent)
if err != nil {
log.Warn("could not parse notification event",
zap.String("error", err.Error()),
)
return
}
// handler the event
l.mtx.RLock()
handlers := l.notificationHandlers[keyEvent]
l.mtx.RUnlock()
if len(handlers) == 0 {
log.Info("notification handlers for parsed notification event were not registered",
zap.Any("event", event),
)
return
}
for _, handler := range handlers {
handler(event)
}
}
func (l *listener) parseAndHandleNotary(nr *result.NotaryRequestEvent) {
// prepare the notary event
notaryEvent, err := l.notaryEventsPreparator.Prepare(nr.NotaryRequest)
if err != nil {
switch {
case errors.Is(err, ErrTXAlreadyHandled) || errors.Is(err, ErrUnknownEvent):
case errors.Is(err, ErrMainTXExpired):
l.log.Warn("skip expired main TX notary event",
zap.String("error", err.Error()),
)
default:
l.log.Warn("could not prepare and validate notary event",
zap.String("error", err.Error()),
)
}
return
}
log := l.log.With(
zap.String("contract", notaryEvent.ScriptHash().StringLE()),
zap.Stringer("method", notaryEvent.Type()),
)
notaryKey := notaryRequestTypes{}
notaryKey.SetMempoolType(nr.Type)
notaryKey.SetRequestType(notaryEvent.Type())
notaryKey.SetScriptHash(notaryEvent.ScriptHash())
// get notary parser
l.mtx.RLock()
parser, ok := l.notaryParsers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Debug("notary parser not set")
return
}
// parse the notary event
event, err := parser(notaryEvent)
if err != nil {
log.Warn("could not parse notary event",
zap.String("error", err.Error()),
)
return
}
// handle the event
l.mtx.RLock()
handler, ok := l.notaryHandlers[notaryKey]
l.mtx.RUnlock()
if !ok {
log.Info("notary handlers for parsed notification event were not registered",
zap.Any("event", event),
)
return
}
handler(event)
}
// SetNotificationParser sets the parser of particular contract event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotificationParser(pi NotificationParserInfo) {
log := l.log.With(
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("event_type", pi.getType()),
)
parser := pi.parser()
if parser == nil {
log.Info("ignore nil event parser")
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn("listener has been already started, ignore parser")
return
}
// add event parser
if _, ok := l.notificationParsers[pi.scriptHashWithType]; !ok {
l.notificationParsers[pi.scriptHashWithType] = pi.parser()
}
log.Debug("registered new event parser")
}
// RegisterNotificationHandler registers the handler for particular notification event of contract.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotificationHandler(hi NotificationHandlerInfo) {
log := l.log.With(
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("event_type", hi.GetType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn("ignore nil event handler")
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notificationParsers[hi.scriptHashWithType]
l.mtx.RUnlock()
if !ok {
log.Warn("ignore handler of event w/o parser")
return
}
// add event handler
l.mtx.Lock()
l.notificationHandlers[hi.scriptHashWithType] = append(
l.notificationHandlers[hi.scriptHashWithType],
hi.Handler(),
)
l.mtx.Unlock()
log.Debug("registered new event handler")
}
// EnableNotarySupport enables notary request listening. Passed hash is
// notary mainTX signer. In practise, it means that listener will subscribe
// for only notary requests that are going to be paid with passed hash.
//
// Must not be called after Listen or ListenWithError.
func (l *listener) EnableNotarySupport(mainTXSigner util.Uint160, localAcc util.Uint160, alphaKeys client.AlphabetKeys, bc BlockCounter) {
l.mtx.Lock()
defer l.mtx.Unlock()
l.listenNotary = true
l.notaryMainTXSigner = mainTXSigner
l.notaryHandlers = make(map[notaryRequestTypes]Handler)
l.notaryParsers = make(map[notaryRequestTypes]NotaryParser)
l.notaryEventsPreparator = notaryPreparator(localAcc, alphaKeys, bc)
}
// SetNotaryParser sets the parser of particular notary request event.
//
// Ignores nil and already set parsers.
// Ignores the parser if listener is started.
func (l *listener) SetNotaryParser(pi NotaryParserInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", pi.GetMempoolType()),
zap.String("contract", pi.ScriptHash().StringLE()),
zap.Stringer("notary_type", pi.RequestType()),
)
parser := pi.parser()
if parser == nil {
log.Info("ignore nil notary event parser")
return
}
l.mtx.Lock()
defer l.mtx.Unlock()
// check if the listener was started
if l.started {
log.Warn("listener has been already started, ignore notary parser")
return
}
// add event parser
if _, ok := l.notaryParsers[pi.notaryRequestTypes]; !ok {
l.notaryParsers[pi.notaryRequestTypes] = pi.parser()
}
l.notaryEventsPreparator.allowNotaryEvent(pi.notaryScriptWithHash)
log.Info("registered new event parser")
}
// RegisterNotaryHandler registers the handler for particular notification notary request event.
//
// Ignores nil handlers.
// Ignores handlers of event without parser.
func (l *listener) RegisterNotaryHandler(hi NotaryHandlerInfo) {
if !l.listenNotary {
return
}
log := l.log.With(
zap.Stringer("mempool_type", hi.GetMempoolType()),
zap.String("contract", hi.ScriptHash().StringLE()),
zap.Stringer("notary type", hi.RequestType()),
)
handler := hi.Handler()
if handler == nil {
log.Warn("ignore nil notary event handler")
return
}
// check if parser was set
l.mtx.RLock()
_, ok := l.notaryParsers[hi.notaryRequestTypes]
l.mtx.RUnlock()
if !ok {
log.Warn("ignore handler of notary event w/o parser")
return
}
// add notary event handler
l.mtx.Lock()
l.notaryHandlers[hi.notaryRequestTypes] = hi.Handler()
l.mtx.Unlock()
log.Info("registered new event handler")
}
// Stop closes subscription channel with remote neo node.
func (l *listener) Stop() {
l.stopOnce.Do(func() {
l.cli.Close()
})
}
func (l *listener) RegisterBlockHandler(handler BlockHandler) {
if handler == nil {
l.log.Warn("ignore nil block handler")
return
}
l.blockHandlers = append(l.blockHandlers, handler)
}
// NewListener create the notification event listener instance and returns Listener interface.
func NewListener(p ListenerParams) (Listener, error) {
// defaultPoolCap is a default worker
// pool capacity if it was not specified
// via params
const defaultPoolCap = 10
switch {
case p.Logger == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilLogger)
case p.Client == nil:
return nil, fmt.Errorf("%s: %w", newListenerFailMsg, errNilSubscriber)
}
poolCap := p.WorkerPoolCapacity
if poolCap == 0 {
poolCap = defaultPoolCap
}
pool, err := ants.NewPool(poolCap, ants.WithNonblocking(true))
if err != nil {
return nil, fmt.Errorf("could not init worker pool: %w", err)
}
return &listener{
notificationParsers: make(map[scriptHashWithType]NotificationParser),
notificationHandlers: make(map[scriptHashWithType][]Handler),
log: p.Logger,
cli: p.Client,
pool: pool,
}, nil
}