-
Notifications
You must be signed in to change notification settings - Fork 199
/
singleDataInterceptor.go
88 lines (73 loc) · 2.16 KB
/
singleDataInterceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
package interceptors
import (
"sync"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
)
// SingleDataInterceptor is used for intercepting packed multi data
type SingleDataInterceptor struct {
factory process.InterceptedDataFactory
processor process.InterceptorProcessor
throttler process.InterceptorThrottler
}
// NewSingleDataInterceptor hooks a new interceptor for single data
func NewSingleDataInterceptor(
factory process.InterceptedDataFactory,
processor process.InterceptorProcessor,
throttler process.InterceptorThrottler,
) (*SingleDataInterceptor, error) {
if check.IfNil(factory) {
return nil, process.ErrNilInterceptedDataFactory
}
if check.IfNil(processor) {
return nil, process.ErrNilInterceptedDataProcessor
}
if check.IfNil(throttler) {
return nil, process.ErrNilInterceptorThrottler
}
singleDataIntercept := &SingleDataInterceptor{
factory: factory,
processor: processor,
throttler: throttler,
}
return singleDataIntercept, nil
}
// ProcessReceivedMessage is the callback func from the p2p.Messenger and will be called each time a new message was received
// (for the topic this validator was registered to)
func (sdi *SingleDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P, _ func(buffToSend []byte)) error {
err := preProcessMesage(sdi.throttler, message)
if err != nil {
return err
}
interceptedData, err := sdi.factory.Create(message.Data())
if err != nil {
sdi.throttler.EndProcessing()
return err
}
err = interceptedData.CheckValidity()
if err != nil {
sdi.throttler.EndProcessing()
return err
}
if !interceptedData.IsForCurrentShard() {
sdi.throttler.EndProcessing()
log.Trace("intercepted data is for other shards")
return nil
}
wgProcess := &sync.WaitGroup{}
wgProcess.Add(1)
go func() {
wgProcess.Wait()
sdi.throttler.EndProcessing()
}()
go processInterceptedData(sdi.processor, interceptedData, wgProcess)
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (sdi *SingleDataInterceptor) IsInterfaceNil() bool {
if sdi == nil {
return true
}
return false
}