-
Notifications
You must be signed in to change notification settings - Fork 201
/
p2pBlackListProcessor.go
155 lines (135 loc) · 4.58 KB
/
p2pBlackListProcessor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package blackList
import (
"fmt"
"time"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/storage"
"github.com/multiversx/mx-chain-logger-go"
)
var log = logger.GetOrCreate("process/throttle/antiflood/blacklist")
const minBanDuration = time.Second
const minFloodingRounds = 2
const sizeBlacklistInfo = 4
type p2pBlackListProcessor struct {
thresholdNumReceivedFlood uint32
numFloodingRounds uint32
thresholdSizeReceivedFlood uint64
cacher storage.Cacher
peerBlacklistCacher process.PeerBlackListCacher
banDuration time.Duration
selfPid core.PeerID
name string
}
// NewP2PBlackListProcessor creates a new instance of p2pQuotaBlacklistProcessor able to determine
// a flooding peer and mark it accordingly
// TODO use argument on constructor
func NewP2PBlackListProcessor(
cacher storage.Cacher,
peerBlacklistCacher process.PeerBlackListCacher,
thresholdNumReceivedFlood uint32,
thresholdSizeReceivedFlood uint64,
numFloodingRounds uint32,
banDuration time.Duration,
name string,
selfPid core.PeerID,
) (*p2pBlackListProcessor, error) {
if check.IfNil(cacher) {
return nil, fmt.Errorf("%w, NewP2PBlackListProcessor", process.ErrNilCacher)
}
if check.IfNil(peerBlacklistCacher) {
return nil, fmt.Errorf("%w, NewP2PBlackListProcessor", process.ErrNilBlackListCacher)
}
if thresholdNumReceivedFlood == 0 {
return nil, fmt.Errorf("%w, thresholdNumReceivedFlood == 0", process.ErrInvalidValue)
}
if thresholdSizeReceivedFlood == 0 {
return nil, fmt.Errorf("%w, thresholdSizeReceivedFlood == 0", process.ErrInvalidValue)
}
if numFloodingRounds < minFloodingRounds {
return nil, fmt.Errorf("%w, numFloodingRounds < %d", process.ErrInvalidValue, minFloodingRounds)
}
if banDuration < minBanDuration {
return nil, fmt.Errorf("%w for ban duration in NewP2PBlackListProcessor", process.ErrInvalidValue)
}
return &p2pBlackListProcessor{
cacher: cacher,
peerBlacklistCacher: peerBlacklistCacher,
thresholdNumReceivedFlood: thresholdNumReceivedFlood,
thresholdSizeReceivedFlood: thresholdSizeReceivedFlood,
numFloodingRounds: numFloodingRounds,
banDuration: banDuration,
selfPid: selfPid,
name: name,
}, nil
}
// ResetStatistics checks if an identifier reached its maximum flooding rounds. If it did, it will remove its
// cached information and adds it to the black list handler
func (pbp *p2pBlackListProcessor) ResetStatistics() {
keys := pbp.cacher.Keys()
for _, key := range keys {
val, ok := pbp.getFloodingValue(key)
if !ok {
pbp.cacher.Remove(key)
continue
}
if val >= pbp.numFloodingRounds-1 { //-1 because the reset function is called before the AddQuota
pbp.cacher.Remove(key)
pid := core.PeerID(key)
log.Debug("added new peer to black list",
"peer ID", pid.Pretty(),
"ban period", pbp.banDuration,
)
err := pbp.peerBlacklistCacher.Upsert(pid, pbp.banDuration)
if err != nil {
log.Warn("error adding peer id in peer ids cache", ""+
"pid", p2p.PeerIdToShortString(pid),
"error", err,
)
}
}
}
}
func (pbp *p2pBlackListProcessor) getFloodingValue(key []byte) (uint32, bool) {
obj, ok := pbp.cacher.Peek(key)
if !ok {
return 0, false
}
val, ok := obj.(uint32)
return val, ok
}
// AddQuota checks if the received quota for an identifier has exceeded the set thresholds
func (pbp *p2pBlackListProcessor) AddQuota(pid core.PeerID, numReceived uint32, sizeReceived uint64, _ uint32, _ uint64) {
isFloodingPeer := numReceived >= pbp.thresholdNumReceivedFlood || sizeReceived >= pbp.thresholdSizeReceivedFlood
if !isFloodingPeer {
return
}
if pid == pbp.selfPid {
log.Warn("current peer should have been blacklisted",
"name", pbp.name,
"total num messages", numReceived,
"total size", sizeReceived,
)
return
}
pbp.incrementStatsFloodingPeer(pid)
}
func (pbp *p2pBlackListProcessor) incrementStatsFloodingPeer(pid core.PeerID) {
obj, ok := pbp.cacher.Get(pid.Bytes())
if !ok {
pbp.cacher.Put(pid.Bytes(), uint32(1), sizeBlacklistInfo)
return
}
val, ok := obj.(uint32)
if !ok {
pbp.cacher.Put(pid.Bytes(), uint32(1), sizeBlacklistInfo)
return
}
pbp.cacher.Put(pid.Bytes(), val+1, sizeBlacklistInfo)
}
// IsInterfaceNil returns true if there is no value under the interface
func (pbp *p2pBlackListProcessor) IsInterfaceNil() bool {
return pbp == nil
}