-
Notifications
You must be signed in to change notification settings - Fork 199
/
baseDataInterceptor.go
127 lines (107 loc) · 3.67 KB
/
baseDataInterceptor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package interceptors
import (
"bytes"
"sync"
"github.com/ElrondNetwork/elrond-go-core/core"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go/p2p"
"github.com/ElrondNetwork/elrond-go/process"
)
type baseDataInterceptor struct {
throttler process.InterceptorThrottler
antifloodHandler process.P2PAntifloodHandler
topic string
currentPeerId core.PeerID
processor process.InterceptorProcessor
mutDebugHandler sync.RWMutex
debugHandler process.InterceptedDebugger
preferredPeersHolder process.PreferredPeersHolderHandler
}
func (bdi *baseDataInterceptor) preProcessMesage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
if message == nil {
return process.ErrNilMessage
}
if message.Data() == nil {
return process.ErrNilDataToProcess
}
if !bdi.shouldSkipAntifloodChecks(fromConnectedPeer, message) {
err := bdi.antifloodHandler.CanProcessMessage(message, fromConnectedPeer)
if err != nil {
return err
}
err = bdi.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, bdi.topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return err
}
if !bdi.throttler.CanProcess() {
return process.ErrSystemBusy
}
}
bdi.throttler.StartProcessing()
return nil
}
func (bdi *baseDataInterceptor) shouldSkipAntifloodChecks(fromConnectedPeer core.PeerID, message p2p.MessageP2P) bool {
if bdi.isMessageFromSelfToSelf(fromConnectedPeer, message) {
return true
}
return bdi.preferredPeersHolder.Contains(fromConnectedPeer)
}
func (bdi *baseDataInterceptor) isMessageFromSelfToSelf(fromConnectedPeer core.PeerID, message p2p.MessageP2P) bool {
return bytes.Equal(message.Signature(), message.From()) &&
bytes.Equal(message.From(), bdi.currentPeerId.Bytes()) &&
fromConnectedPeer == bdi.currentPeerId
}
func (bdi *baseDataInterceptor) processInterceptedData(data process.InterceptedData, msg p2p.MessageP2P) {
err := bdi.processor.Validate(data, msg.Peer())
if err != nil {
log.Trace("intercepted data is not valid",
"hash", data.Hash(),
"type", data.Type(),
"pid", p2p.MessageOriginatorPid(msg),
"seq no", p2p.MessageOriginatorSeq(msg),
"data", data.String(),
"error", err.Error(),
)
bdi.processDebugInterceptedData(data, err)
return
}
err = bdi.processor.Save(data, msg.Peer(), bdi.topic)
if err != nil {
log.Trace("intercepted data can not be processed",
"hash", data.Hash(),
"type", data.Type(),
"pid", p2p.MessageOriginatorPid(msg),
"seq no", p2p.MessageOriginatorSeq(msg),
"data", data.String(),
"error", err.Error(),
)
bdi.processDebugInterceptedData(data, err)
return
}
log.Trace("intercepted data is processed",
"hash", data.Hash(),
"type", data.Type(),
"pid", p2p.MessageOriginatorPid(msg),
"seq no", p2p.MessageOriginatorSeq(msg),
"data", data.String(),
)
bdi.processDebugInterceptedData(data, err)
}
func (bdi *baseDataInterceptor) processDebugInterceptedData(interceptedData process.InterceptedData, err error) {
identifiers := interceptedData.Identifiers()
bdi.debugHandler.LogProcessedHashes(bdi.topic, identifiers, err)
}
func (bdi *baseDataInterceptor) receivedDebugInterceptedData(interceptedData process.InterceptedData) {
identifiers := interceptedData.Identifiers()
bdi.debugHandler.LogReceivedHashes(bdi.topic, identifiers)
}
// SetInterceptedDebugHandler will set a new intercepted debug handler
func (bdi *baseDataInterceptor) SetInterceptedDebugHandler(handler process.InterceptedDebugger) error {
if check.IfNil(handler) {
return process.ErrNilDebugger
}
bdi.mutDebugHandler.Lock()
bdi.debugHandler = handler
bdi.mutDebugHandler.Unlock()
return nil
}