diff --git a/cmd/node/config/config.toml b/cmd/node/config/config.toml index 3d704abc8a5..f55492e3698 100644 --- a/cmd/node/config/config.toml +++ b/cmd/node/config/config.toml @@ -343,8 +343,8 @@ Capacity = 5000 Type = "LRU" [Antiflood.Topic] - DefaultMaxMessagesPerSec = 10000 - MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 10 }, + DefaultMaxMessagesPerSec = 15000 + MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 20 }, { Topic = "shardBlocks*", NumMessagesPerSec = 20 }, { Topic = "metachainBlocks", NumMessagesPerSec = 20 }] [Antiflood.WebServer] diff --git a/consensus/interface.go b/consensus/interface.go index 5f54f272d40..211ac18aa20 100644 --- a/consensus/interface.go +++ b/consensus/interface.go @@ -84,7 +84,7 @@ type NetworkShardingCollector interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ResetForTopic(topic string) SetMaxMessagesForTopic(topic string, maxNum uint32) IsInterfaceNil() bool diff --git a/consensus/mock/p2pAntifloodHandlerStub.go b/consensus/mock/p2pAntifloodHandlerStub.go index 15f11a92a99..b6f57e91ae1 100644 --- a/consensus/mock/p2pAntifloodHandlerStub.go +++ b/consensus/mock/p2pAntifloodHandlerStub.go @@ -8,7 +8,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error } // ResetForTopic - @@ -29,9 +29,9 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled != nil { - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } return nil diff --git a/consensus/spos/bls/blsWorker.go b/consensus/spos/bls/blsWorker.go index 8746fd52833..63295062be7 100644 --- a/consensus/spos/bls/blsWorker.go +++ b/consensus/spos/bls/blsWorker.go @@ -5,7 +5,17 @@ import ( "github.com/ElrondNetwork/elrond-go/consensus/spos" ) -const peerMaxMessagesPerSec = uint32(2) +// peerMaxMessagesPerSec defines how many messages can be propagated by a pid in a round. The value was chosen by +// following the next premises: +// 1. a leader can propagate as maximum as 3 messages per round: proposed header block + proposed body + final info; +// 2. due to the fact that a delayed signature of the proposer (from previous round) can be received in the current round +// adds an extra 1 to the total value, reaching value 4; +// 3. Because the leader might be selected in the next round and might have an empty data pool, it can send the newly +// empty proposed block at the very beginning of the next round. One extra message here, yielding to a total of 5. +// 4. If we consider the forks that can appear on the system wee need to add one more to the value. +// Validators only send one signature message in a round, treating the edge case of a delayed message, will need at most +// 2 messages per round (which is ok as it is below the set value of 5) +const peerMaxMessagesPerSec = uint32(6) // worker defines the data needed by spos to communicate between nodes which are in the validators group type worker struct { diff --git a/consensus/spos/worker.go b/consensus/spos/worker.go index 47cf0648b50..f2a39f7a341 100644 --- a/consensus/spos/worker.go +++ b/consensus/spos/worker.go @@ -317,7 +317,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP } topic := GetConsensusTopicIDFromShardCoordinator(wrk.shardCoordinator) - err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data()))) + err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())), message.SeqNo()) if err != nil { return err } diff --git a/consensus/spos/worker_test.go b/consensus/spos/worker_test.go index 192c56cacca..4a00b313c10 100644 --- a/consensus/spos/worker_test.go +++ b/consensus/spos/worker_test.go @@ -112,7 +112,7 @@ func createMockP2PAntifloodHandler() *mock.P2PAntifloodHandlerStub { CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, } @@ -395,7 +395,7 @@ func TestWorker_ProcessReceivedMessageShouldErrIfFloodIsDetectedOnTopic(t *testi CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return expectedErr }, } diff --git a/dataRetriever/interface.go b/dataRetriever/interface.go index 55cdcfb7b98..dcae97223c7 100644 --- a/dataRetriever/interface.go +++ b/dataRetriever/interface.go @@ -347,7 +347,7 @@ type RequestedItemsHandler interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64) error + CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64, sequence []byte) error IsInterfaceNil() bool } diff --git a/dataRetriever/mock/p2pAntifloodHandlerStub.go b/dataRetriever/mock/p2pAntifloodHandlerStub.go index 7fbef89cd35..fb79be5277a 100644 --- a/dataRetriever/mock/p2pAntifloodHandlerStub.go +++ b/dataRetriever/mock/p2pAntifloodHandlerStub.go @@ -8,7 +8,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error } // CanProcessMessage - @@ -21,12 +21,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled == nil { return nil } - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } // IsInterfaceNil - diff --git a/dataRetriever/resolvers/headerResolver_test.go b/dataRetriever/resolvers/headerResolver_test.go index b143c86031c..dd006ca16a9 100644 --- a/dataRetriever/resolvers/headerResolver_test.go +++ b/dataRetriever/resolvers/headerResolver_test.go @@ -153,7 +153,7 @@ func TestHeaderResolver_ProcessReceivedCanProcessMessageErrorsShouldErr(t *testi CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return expectedErr }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, } diff --git a/dataRetriever/resolvers/messageProcessor.go b/dataRetriever/resolvers/messageProcessor.go index 1a60e4bac86..edcc578d3a9 100644 --- a/dataRetriever/resolvers/messageProcessor.go +++ b/dataRetriever/resolvers/messageProcessor.go @@ -26,7 +26,7 @@ func (mp *messageProcessor) canProcessMessage(message p2p.MessageP2P, fromConnec if err != nil { return fmt.Errorf("%w on resolver topic %s", err, mp.topic) } - err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data()))) + err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())), message.SeqNo()) if err != nil { return fmt.Errorf("%w on resolver topic %s", err, mp.topic) } diff --git a/dataRetriever/resolvers/messageProcessor_test.go b/dataRetriever/resolvers/messageProcessor_test.go index e6a9805a3d5..8efa485c1e3 100644 --- a/dataRetriever/resolvers/messageProcessor_test.go +++ b/dataRetriever/resolvers/messageProcessor_test.go @@ -40,7 +40,7 @@ func TestMessageProcessor_CanProcessOnTopicErrorsShouldErr(t *testing.T) { CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return expectedErr }, }, @@ -60,7 +60,7 @@ func TestMessageProcessor_CanProcessThrottlerNotAllowingShouldErr(t *testing.T) CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, }, @@ -87,7 +87,7 @@ func TestMessageProcessor_CanProcessShouldWork(t *testing.T) { CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, }, diff --git a/dataRetriever/resolvers/transactionResolver_test.go b/dataRetriever/resolvers/transactionResolver_test.go index 5f638fb3d4d..30ed1fede95 100644 --- a/dataRetriever/resolvers/transactionResolver_test.go +++ b/dataRetriever/resolvers/transactionResolver_test.go @@ -131,7 +131,7 @@ func TestTxResolver_ProcessReceivedMessageCanProcessMessageErrorsShouldErr(t *te CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return expectedErr }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, } diff --git a/dataRetriever/resolvers/trieNodeResolver_test.go b/dataRetriever/resolvers/trieNodeResolver_test.go index 274e24e4a6b..7a29a2ad6ab 100644 --- a/dataRetriever/resolvers/trieNodeResolver_test.go +++ b/dataRetriever/resolvers/trieNodeResolver_test.go @@ -102,7 +102,7 @@ func TestTrieNodeResolver_ProcessReceivedAntiflooderCanProcessMessageErrShouldEr CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return expectedErr }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, } diff --git a/debug/antiflood/debugger.go b/debug/antiflood/debugger.go index 4e20371ae97..afdf11549b6 100644 --- a/debug/antiflood/debugger.go +++ b/debug/antiflood/debugger.go @@ -2,7 +2,9 @@ package antiflood import ( "context" + "encoding/binary" "fmt" + "sort" "strings" "sync" "time" @@ -20,11 +22,14 @@ const sizeUint64 = 8 const sizeBool = 1 const newLineChar = "\n" const minIntervalInSeconds = 1 +const maxSequencesToPrint = 5 +const moreSequencesPresent = "..." var log = logger.GetOrCreate("debug/antiflood") type event struct { pid core.PeerID + sequences map[uint64]struct{} topic string numRejected uint32 sizeRejected uint64 @@ -37,8 +42,21 @@ func (ev *event) Size() int { } func (ev *event) String() string { - return fmt.Sprintf("pid: %s, topic: %s, num rejected: %d, size rejected: %d, is blacklisted: %v", - ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, ev.isBlackListed) + sequences := make([]string, 0, len(ev.sequences)) + for seq := range ev.sequences { + sequences = append(sequences, fmt.Sprintf("%d", seq)) + } + + sort.Slice(sequences, func(i, j int) bool { + return sequences[i] < sequences[j] + }) + + if len(sequences) > maxSequencesToPrint { + sequences = append([]string{moreSequencesPresent}, sequences[len(sequences)-maxSequencesToPrint:]...) + } + + return fmt.Sprintf("pid: %s; topic: %s; num rejected: %d; size rejected: %d; seqences: %s; is blacklisted: %v", + ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, strings.Join(sequences, ", "), ev.isBlackListed) } type debugger struct { @@ -78,10 +96,16 @@ func (d *debugger) AddData( topic string, numRejected uint32, sizeRejected uint64, + sequence []byte, isBlacklisted bool, ) { identifier := d.computeIdentifier(pid, topic) + seqVal := uint64(0) + if len(sequence) >= 8 { + seqVal = binary.BigEndian.Uint64(sequence) + } + d.mut.Lock() defer d.mut.Unlock() @@ -93,6 +117,7 @@ func (d *debugger) AddData( numRejected: 0, sizeRejected: 0, isBlackListed: false, + sequences: map[uint64]struct{}{seqVal: {}}, } } @@ -104,12 +129,14 @@ func (d *debugger) AddData( numRejected: 0, sizeRejected: 0, isBlackListed: false, + sequences: map[uint64]struct{}{seqVal: {}}, } } ev.numRejected += numRejected ev.sizeRejected += sizeRejected ev.isBlackListed = isBlacklisted + ev.sequences[seqVal] = struct{}{} d.cache.Put(identifier, ev, ev.Size()) } diff --git a/debug/antiflood/debugger_test.go b/debug/antiflood/debugger_test.go index 6811aaf6507..d9eae1b75e1 100644 --- a/debug/antiflood/debugger_test.go +++ b/debug/antiflood/debugger_test.go @@ -1,7 +1,9 @@ package antiflood import ( + "encoding/binary" "errors" + "fmt" "strings" "sync/atomic" "testing" @@ -65,7 +67,7 @@ func TestAntifloodDebugger_AddDataNotExistingShouldAdd(t *testing.T) { topic := "topic" numRejected := uint32(272) sizeRejected := uint64(7272) - d.AddData(pid, topic, numRejected, sizeRejected, true) + d.AddData(pid, topic, numRejected, sizeRejected, make([]byte, 8), true) assert.Equal(t, 1, d.cache.Len()) ev := d.GetData([]byte(string(pid) + topic)) @@ -89,8 +91,8 @@ func TestAntifloodDebugger_AddDataExistingShouldChange(t *testing.T) { topic := "topic" numRejected := uint32(272) sizeRejected := uint64(7272) - d.AddData(pid, topic, numRejected, sizeRejected, true) - d.AddData(pid, topic, numRejected, sizeRejected, false) + d.AddData(pid, topic, numRejected, sizeRejected, nil, true) + d.AddData(pid, topic, numRejected, sizeRejected, nil, false) assert.Equal(t, 1, d.cache.Len()) ev := d.GetData([]byte(string(pid) + topic)) @@ -126,8 +128,8 @@ func TestAntifloodDebugger_PrintShouldWork(t *testing.T) { topic := "topic" numRejected := uint32(272) sizeRejected := uint64(7272) - d.AddData(pid1, topic, numRejected, sizeRejected, true) - d.AddData(pid2, topic, numRejected, sizeRejected, false) + d.AddData(pid1, topic, numRejected, sizeRejected, nil, true) + d.AddData(pid2, topic, numRejected, sizeRejected, nil, false) time.Sleep(time.Millisecond * 1500) @@ -164,7 +166,7 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) { d.printEventFunc = func(data string) { atomic.AddInt32(&numPrinted, 1) } - d.AddData("", "", 0, 0, true) + d.AddData("", "", 0, 0, nil, true) time.Sleep(time.Millisecond * 2500) assert.True(t, atomic.LoadInt32(&numPrinted) > 0) @@ -178,3 +180,67 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) { assert.Equal(t, int32(0), atomic.LoadInt32(&numPrinted)) } + +func TestAntifloodDebugger_DifferentSequenceShouldAppend(t *testing.T) { + t.Parallel() + + d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{ + CacheSize: 100, + IntervalAutoPrintInSeconds: 1, + }) + + seq1 := uint64(2347234) + seq1Buff := make([]byte, 8) + binary.BigEndian.PutUint64(seq1Buff, seq1) + + seq2 := uint64(110) + seq2Buff := make([]byte, 8) + binary.BigEndian.PutUint64(seq2Buff, seq2) + + pid := core.PeerID("pid") + topic := "topic" + d.AddData(pid, topic, 0, 0, seq1Buff, true) + d.AddData(pid, topic, 0, 0, seq2Buff, true) + + ev := d.GetData(d.computeIdentifier(pid, topic)) + evLine := ev.String() + fmt.Println(evLine) + assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq1))) + assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq2))) +} + +func TestAntifloodDebugger_MoreSequencesShouldTrim(t *testing.T) { + t.Parallel() + + d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{ + CacheSize: 100, + IntervalAutoPrintInSeconds: 1, + }) + + pid := core.PeerID("pid") + topic := "topic" + startValue := uint64(33946384) + numVals := 10 + for i := 0; i < numVals; i++ { + seq := startValue + uint64(i) + seqBuff := make([]byte, 8) + binary.BigEndian.PutUint64(seqBuff, seq) + + d.AddData(pid, topic, 0, 0, seqBuff, true) + } + + ev := d.GetData(d.computeIdentifier(pid, topic)) + evLine := ev.String() + fmt.Println(evLine) + for i := 0; i < maxSequencesToPrint; i++ { + val := startValue + uint64(i) + assert.False(t, strings.Contains(evLine, fmt.Sprintf("%d", val))) + } + + for i := maxSequencesToPrint; i < numVals; i++ { + val := startValue + uint64(i) + assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", val))) + } + + assert.True(t, strings.Contains(evLine, moreSequencesPresent)) +} diff --git a/epochStart/bootstrap/disabled/disabledAntiFloodHandler.go b/epochStart/bootstrap/disabled/disabledAntiFloodHandler.go index dc692f26d4d..c3b56ee0af8 100644 --- a/epochStart/bootstrap/disabled/disabledAntiFloodHandler.go +++ b/epochStart/bootstrap/disabled/disabledAntiFloodHandler.go @@ -23,7 +23,7 @@ func (a *antiFloodHandler) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerID) er } // CanProcessMessagesOnTopic returns nil regardless of the input -func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64) error { +func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error { return nil } diff --git a/factory/interface.go b/factory/interface.go index 038675b53eb..c0ab4a3a04e 100644 --- a/factory/interface.go +++ b/factory/interface.go @@ -28,7 +28,7 @@ type NodesSetupHandler interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ResetForTopic(topic string) SetMaxMessagesForTopic(topic string, maxNum uint32) SetDebugger(debugger process.AntifloodDebugger) error diff --git a/heartbeat/interface.go b/heartbeat/interface.go index 0544b92197f..9a1ff9aab75 100644 --- a/heartbeat/interface.go +++ b/heartbeat/interface.go @@ -70,7 +70,7 @@ type NetworkShardingCollector interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error IsInterfaceNil() bool } diff --git a/heartbeat/mock/p2pAntifloodHandlerStub.go b/heartbeat/mock/p2pAntifloodHandlerStub.go index 98d7621f5e9..cfe40875afa 100644 --- a/heartbeat/mock/p2pAntifloodHandlerStub.go +++ b/heartbeat/mock/p2pAntifloodHandlerStub.go @@ -8,7 +8,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error } // ResetForTopic - @@ -29,12 +29,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled == nil { return nil } - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } // IsInterfaceNil - diff --git a/heartbeat/process/monitor.go b/heartbeat/process/monitor.go index 5e452032614..0f774c2075a 100644 --- a/heartbeat/process/monitor.go +++ b/heartbeat/process/monitor.go @@ -264,7 +264,7 @@ func (m *Monitor) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPe if err != nil { return err } - err = m.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, core.HeartbeatTopic, 1, uint64(len(message.Data()))) + err = m.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, core.HeartbeatTopic, 1, uint64(len(message.Data())), message.SeqNo()) if err != nil { return err } diff --git a/heartbeat/process/monitor_test.go b/heartbeat/process/monitor_test.go index 27e7190e819..1c50c221b5b 100644 --- a/heartbeat/process/monitor_test.go +++ b/heartbeat/process/monitor_test.go @@ -25,7 +25,7 @@ func createMockP2PAntifloodHandler() *mock.P2PAntifloodHandlerStub { CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { return nil }, - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return nil }, } diff --git a/integrationTests/mock/nilAntifloodHandler.go b/integrationTests/mock/nilAntifloodHandler.go index 0524c2ce3bc..271d9fd1fcf 100644 --- a/integrationTests/mock/nilAntifloodHandler.go +++ b/integrationTests/mock/nilAntifloodHandler.go @@ -25,7 +25,7 @@ func (nah *NilAntifloodHandler) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerI } // CanProcessMessagesOnTopic will always return nil, allowing messages to go to interceptors -func (nah *NilAntifloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64) error { +func (nah *NilAntifloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error { return nil } diff --git a/integrationTests/mock/p2pAntifloodHandlerStub.go b/integrationTests/mock/p2pAntifloodHandlerStub.go index 98d7621f5e9..cfe40875afa 100644 --- a/integrationTests/mock/p2pAntifloodHandlerStub.go +++ b/integrationTests/mock/p2pAntifloodHandlerStub.go @@ -8,7 +8,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error } // ResetForTopic - @@ -29,12 +29,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled == nil { return nil } - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } // IsInterfaceNil - diff --git a/node/interface.go b/node/interface.go index e5077ab9162..a4da9132712 100644 --- a/node/interface.go +++ b/node/interface.go @@ -37,7 +37,7 @@ type NetworkShardingCollector interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ResetForTopic(topic string) SetMaxMessagesForTopic(topic string, maxNum uint32) ApplyConsensusSize(size int) diff --git a/node/mock/p2pAntifloodHandlerStub.go b/node/mock/p2pAntifloodHandlerStub.go index 3ef48391737..3a037308d93 100644 --- a/node/mock/p2pAntifloodHandlerStub.go +++ b/node/mock/p2pAntifloodHandlerStub.go @@ -8,7 +8,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ApplyConsensusSizeCalled func(size int) } @@ -37,12 +37,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled == nil { return nil } - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } // IsInterfaceNil - diff --git a/process/interceptors/common.go b/process/interceptors/common.go index 1dbd19877ca..40ee7aaae23 100644 --- a/process/interceptors/common.go +++ b/process/interceptors/common.go @@ -26,7 +26,7 @@ func preProcessMesage( if err != nil { return err } - err = antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, topic, 1, uint64(len(message.Data()))) + err = antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, topic, 1, uint64(len(message.Data())), message.SeqNo()) if err != nil { return err } diff --git a/process/interceptors/common_test.go b/process/interceptors/common_test.go index 57a88105f47..c7d8e1811ae 100644 --- a/process/interceptors/common_test.go +++ b/process/interceptors/common_test.go @@ -70,7 +70,7 @@ func TestPreProcessMessage_AntifloodTopicCanNotProcessShouldErr(t *testing.T) { } expectedErr := errors.New("expected error") antifloodHandler := &mock.P2PAntifloodHandlerStub{ - CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { + CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { return expectedErr }, } diff --git a/process/interceptors/multiDataInterceptor.go b/process/interceptors/multiDataInterceptor.go index b316c723cb9..a6935d6703e 100644 --- a/process/interceptors/multiDataInterceptor.go +++ b/process/interceptors/multiDataInterceptor.go @@ -100,6 +100,7 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P, mdi.topic, uint32(lenMultiData), uint64(len(message.Data())), + message.SeqNo(), ) if err != nil { return err diff --git a/process/interface.go b/process/interface.go index 96d4c316b44..9c83c4af4a1 100644 --- a/process/interface.go +++ b/process/interface.go @@ -708,7 +708,7 @@ type TopicFloodPreventer interface { // p2p messages type P2PAntifloodHandler interface { CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopic(pid core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopic(pid core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ApplyConsensusSize(size int) SetDebugger(debugger AntifloodDebugger) error IsInterfaceNil() bool @@ -852,7 +852,7 @@ type InterceptedDebugger interface { // AntifloodDebugger defines an interface for debugging the antiflood behavior type AntifloodDebugger interface { - AddData(pid core.PeerID, topic string, numRejected uint32, sizeRejected uint64, isBlacklisted bool) + AddData(pid core.PeerID, topic string, numRejected uint32, sizeRejected uint64, sequence []byte, isBlacklisted bool) Close() error IsInterfaceNil() bool } diff --git a/process/mock/p2pAntifloodHandlerStub.go b/process/mock/p2pAntifloodHandlerStub.go index 175a761908d..a1cbfd5a912 100644 --- a/process/mock/p2pAntifloodHandlerStub.go +++ b/process/mock/p2pAntifloodHandlerStub.go @@ -9,7 +9,7 @@ import ( // P2PAntifloodHandlerStub - type P2PAntifloodHandlerStub struct { CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error - CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error + CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error ApplyConsensusSizeCalled func(size int) SetDebuggerCalled func(debugger process.AntifloodDebugger) error } @@ -23,11 +23,11 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P, } // CanProcessMessagesOnTopic - -func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { if p2pahs.CanProcessMessagesOnTopicCalled == nil { return nil } - return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize) + return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence) } // ApplyConsensusSize - diff --git a/process/throttle/antiflood/disabled/antiflood.go b/process/throttle/antiflood/disabled/antiflood.go index 3e3857aed62..5e7c9632686 100644 --- a/process/throttle/antiflood/disabled/antiflood.go +++ b/process/throttle/antiflood/disabled/antiflood.go @@ -27,7 +27,7 @@ func (af *AntiFlood) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerID) error { } // CanProcessMessagesOnTopic will always return nil -func (af *AntiFlood) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64) error { +func (af *AntiFlood) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error { return nil } diff --git a/process/throttle/antiflood/disabled/antifloodDebugger.go b/process/throttle/antiflood/disabled/antifloodDebugger.go index 81a388bd392..3d33c11f75d 100644 --- a/process/throttle/antiflood/disabled/antifloodDebugger.go +++ b/process/throttle/antiflood/disabled/antifloodDebugger.go @@ -7,7 +7,7 @@ type AntifloodDebugger struct { } // AddData does nothing -func (ad *AntifloodDebugger) AddData(_ core.PeerID, _ string, _ uint32, _ uint64, _ bool) { +func (ad *AntifloodDebugger) AddData(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte, _ bool) { } // Close returns nil diff --git a/process/throttle/antiflood/disabled/antiflood_test.go b/process/throttle/antiflood/disabled/antiflood_test.go index 737e91289bd..ec6d09186e9 100644 --- a/process/throttle/antiflood/disabled/antiflood_test.go +++ b/process/throttle/antiflood/disabled/antiflood_test.go @@ -22,6 +22,6 @@ func TestAntiFlood_ShouldNotPanic(t *testing.T) { daf.SetMaxMessagesForTopic("test", 10) daf.ResetForTopic("test") daf.ApplyConsensusSize(0) - _ = daf.CanProcessMessagesOnTopic(core.PeerID(1), "test", 1, 0) + _ = daf.CanProcessMessagesOnTopic(core.PeerID(1), "test", 1, 0, nil) _ = daf.CanProcessMessage(nil, core.PeerID(2)) } diff --git a/process/throttle/antiflood/p2pAntiflood.go b/process/throttle/antiflood/p2pAntiflood.go index 808c9d44a1c..3f0bfffefe9 100644 --- a/process/throttle/antiflood/p2pAntiflood.go +++ b/process/throttle/antiflood/p2pAntiflood.go @@ -71,6 +71,7 @@ func (af *p2pAntiflood) CanProcessMessage(message p2p.MessageP2P, fromConnectedP message.Topics(), 1, uint64(len(message.Data())), + message.SeqNo(), af.blacklistHandler.Has(fromConnectedPeer), ) @@ -79,14 +80,14 @@ func (af *p2pAntiflood) CanProcessMessage(message p2p.MessageP2P, fromConnectedP originatorIsBlacklisted := af.blacklistHandler.Has(message.Peer()) if originatorIsBlacklisted { - af.recordDebugEvent(message.Peer(), message.Topics(), 1, uint64(len(message.Data())), true) + af.recordDebugEvent(message.Peer(), message.Topics(), 1, uint64(len(message.Data())), message.SeqNo(), true) return fmt.Errorf("%w for pid %s", process.ErrOriginatorIsBlacklisted, message.Peer().Pretty()) } return nil } -func (af *p2pAntiflood) recordDebugEvent(pid core.PeerID, topics []string, numRejected uint32, sizeRejected uint64, isBlacklisted bool) { +func (af *p2pAntiflood) recordDebugEvent(pid core.PeerID, topics []string, numRejected uint32, sizeRejected uint64, sequence []byte, isBlacklisted bool) { if len(topics) == 0 { topics = []string{unidentifiedTopic} } @@ -94,7 +95,7 @@ func (af *p2pAntiflood) recordDebugEvent(pid core.PeerID, topics []string, numRe af.mutDebugger.RLock() defer af.mutDebugger.RUnlock() - af.debugger.AddData(pid, topics[0], numRejected, sizeRejected, isBlacklisted) + af.debugger.AddData(pid, topics[0], numRejected, sizeRejected, sequence, isBlacklisted) } func (af *p2pAntiflood) canProcessMessage(fp process.FloodPreventer, message p2p.MessageP2P, fromConnectedPeer core.PeerID) error { @@ -132,7 +133,7 @@ func (af *p2pAntiflood) canProcessMessage(fp process.FloodPreventer, message p2p } // CanProcessMessagesOnTopic signals if a p2p message can be processed or not for a given topic -func (af *p2pAntiflood) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error { +func (af *p2pAntiflood) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error { err := af.topicPreventer.IncreaseLoad(peer, topic, numMessages) if err != nil { log.Trace("topicFloodPreventer.Accumulate peer", @@ -141,7 +142,7 @@ func (af *p2pAntiflood) CanProcessMessagesOnTopic(peer core.PeerID, topic string "topic", topic, ) - af.recordDebugEvent(peer, []string{topic}, numMessages, totalSize, af.blacklistHandler.Has(peer)) + af.recordDebugEvent(peer, []string{topic}, numMessages, totalSize, sequence, af.blacklistHandler.Has(peer)) return fmt.Errorf("%w in p2pAntiflood for connected peer %s", err, diff --git a/process/throttle/antiflood/p2pAntiflood_test.go b/process/throttle/antiflood/p2pAntiflood_test.go index 6095f5f0095..49b50de1629 100644 --- a/process/throttle/antiflood/p2pAntiflood_test.go +++ b/process/throttle/antiflood/p2pAntiflood_test.go @@ -210,7 +210,7 @@ func TestP2pAntiflood_CanProcessMessagesOnTopicCanNotAccumulateShouldError(t *te &mock.FloodPreventerStub{}, ) - err := afm.CanProcessMessagesOnTopic(identifierCall, topicCall, numMessagesCall, 0) + err := afm.CanProcessMessagesOnTopic(identifierCall, topicCall, numMessagesCall, 0, nil) assert.True(t, errors.Is(err, process.ErrSystemBusy)) } @@ -235,7 +235,7 @@ func TestP2pAntiflood_CanProcessMessagesOnTopicCanAccumulateShouldWork(t *testin &mock.FloodPreventerStub{}, ) - err := afm.CanProcessMessagesOnTopic(identifierCall, topicCall, numMessagesCall, 0) + err := afm.CanProcessMessagesOnTopic(identifierCall, topicCall, numMessagesCall, 0, nil) assert.Nil(t, err) }