Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EN-6710 antiflood blocks consensus fix #1895

Merged
merged 7 commits into from
Jun 9, 2020
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@
Capacity = 5000
Type = "LRU"
[Antiflood.Topic]
DefaultMaxMessagesPerSec = 10000
MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 10 },
DefaultMaxMessagesPerSec = 15000
MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 20 },
{ Topic = "shardBlocks*", NumMessagesPerSec = 20 },
{ Topic = "metachainBlocks", NumMessagesPerSec = 20 }]
[Antiflood.WebServer]
Expand Down
2 changes: 1 addition & 1 deletion consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type NetworkShardingCollector interface {
// p2p messages
type P2PAntifloodHandler interface {
CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
ResetForTopic(topic string)
SetMaxMessagesForTopic(topic string, maxNum uint32)
IsInterfaceNil() bool
Expand Down
6 changes: 3 additions & 3 deletions consensus/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// P2PAntifloodHandlerStub -
type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
}

// ResetForTopic -
Expand All @@ -29,9 +29,9 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P,
}

// CanProcessMessagesOnTopic -
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
if p2pahs.CanProcessMessagesOnTopicCalled != nil {
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize)
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence)
}

return nil
Expand Down
12 changes: 11 additions & 1 deletion consensus/spos/bls/blsWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import (
"github.com/ElrondNetwork/elrond-go/consensus/spos"
)

const peerMaxMessagesPerSec = uint32(2)
// peerMaxMessagesPerSec defines how many messages can be propagated by a pid in a round. The value was chosen by
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

// following the next premises:
// 1. a leader can propagate as maximum as 3 messages per round: proposed header block + proposed body + final info;
// 2. due to the fact that a delayed signature of the proposer (from previous round) can be received in the current round
// adds an extra 1 to the total value, reaching value 4;
// 3. Because the leader might be selected in the next round and might have an empty data pool, it can send the newly
// empty proposed block at the very beginning of the next round. One extra message here, yielding to a total of 5.
// 4. If we consider the forks that can appear on the system wee need to add one more to the value.
// Validators only send one signature message in a round, treating the edge case of a delayed message, will need at most
// 2 messages per round (which is ok as it is below the set value of 5)
const peerMaxMessagesPerSec = uint32(6)

// worker defines the data needed by spos to communicate between nodes which are in the validators group
type worker struct {
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
}

topic := GetConsensusTopicIDFromShardCoordinator(wrk.shardCoordinator)
err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())))
err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/spos/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func createMockP2PAntifloodHandler() *mock.P2PAntifloodHandlerStub {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestWorker_ProcessReceivedMessageShouldErrIfFloodIsDetectedOnTopic(t *testi
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return expectedErr
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ type RequestedItemsHandler interface {
// p2p messages
type P2PAntifloodHandler interface {
CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64, sequence []byte) error
IsInterfaceNil() bool
}

Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// P2PAntifloodHandlerStub -
type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
}

// CanProcessMessage -
Expand All @@ -21,12 +21,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P,
}

// CanProcessMessagesOnTopic -
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
if p2pahs.CanProcessMessagesOnTopicCalled == nil {
return nil
}

return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize)
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence)
}

// IsInterfaceNil -
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/headerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestHeaderResolver_ProcessReceivedCanProcessMessageErrorsShouldErr(t *testi
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/messageProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (mp *messageProcessor) canProcessMessage(message p2p.MessageP2P, fromConnec
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())))
err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/resolvers/messageProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMessageProcessor_CanProcessOnTopicErrorsShouldErr(t *testing.T) {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return expectedErr
},
},
Expand All @@ -60,7 +60,7 @@ func TestMessageProcessor_CanProcessThrottlerNotAllowingShouldErr(t *testing.T)
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
},
Expand All @@ -87,7 +87,7 @@ func TestMessageProcessor_CanProcessShouldWork(t *testing.T) {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/transactionResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestTxResolver_ProcessReceivedMessageCanProcessMessageErrorsShouldErr(t *te
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/trieNodeResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestTrieNodeResolver_ProcessReceivedAntiflooderCanProcessMessageErrShouldEr
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
31 changes: 29 additions & 2 deletions debug/antiflood/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package antiflood

import (
"context"
"encoding/binary"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand All @@ -20,11 +22,14 @@ const sizeUint64 = 8
const sizeBool = 1
const newLineChar = "\n"
const minIntervalInSeconds = 1
const maxSequencesToPrint = 5
const moreSequencesPresent = "..."

var log = logger.GetOrCreate("debug/antiflood")

type event struct {
pid core.PeerID
sequences map[uint64]struct{}
topic string
numRejected uint32
sizeRejected uint64
Expand All @@ -37,8 +42,21 @@ func (ev *event) Size() int {
}

func (ev *event) String() string {
return fmt.Sprintf("pid: %s, topic: %s, num rejected: %d, size rejected: %d, is blacklisted: %v",
ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, ev.isBlackListed)
sequences := make([]string, 0, len(ev.sequences))
for seq := range ev.sequences {
sequences = append(sequences, fmt.Sprintf("%d", seq))
}

sort.Slice(sequences, func(i, j int) bool {
return sequences[i] < sequences[j]
})

if len(sequences) > maxSequencesToPrint {
sequences = append([]string{moreSequencesPresent}, sequences[len(sequences)-maxSequencesToPrint:]...)
}

return fmt.Sprintf("pid: %s; topic: %s; num rejected: %d; size rejected: %d; seqences: %s; is blacklisted: %v",
ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, strings.Join(sequences, ", "), ev.isBlackListed)
}

type debugger struct {
Expand Down Expand Up @@ -78,10 +96,16 @@ func (d *debugger) AddData(
topic string,
numRejected uint32,
sizeRejected uint64,
sequence []byte,
isBlacklisted bool,
) {
identifier := d.computeIdentifier(pid, topic)

seqVal := uint64(0)
if len(sequence) >= 8 {
seqVal = binary.BigEndian.Uint64(sequence)
}

d.mut.Lock()
defer d.mut.Unlock()

Expand All @@ -93,6 +117,7 @@ func (d *debugger) AddData(
numRejected: 0,
sizeRejected: 0,
isBlackListed: false,
sequences: map[uint64]struct{}{seqVal: {}},
}
}

Expand All @@ -104,12 +129,14 @@ func (d *debugger) AddData(
numRejected: 0,
sizeRejected: 0,
isBlackListed: false,
sequences: map[uint64]struct{}{seqVal: {}},
}
}

ev.numRejected += numRejected
ev.sizeRejected += sizeRejected
ev.isBlackListed = isBlacklisted
ev.sequences[seqVal] = struct{}{}

d.cache.Put(identifier, ev, ev.Size())
}
Expand Down
78 changes: 72 additions & 6 deletions debug/antiflood/debugger_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package antiflood

import (
"encoding/binary"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -65,7 +67,7 @@ func TestAntifloodDebugger_AddDataNotExistingShouldAdd(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid, topic, numRejected, sizeRejected, true)
d.AddData(pid, topic, numRejected, sizeRejected, make([]byte, 8), true)

assert.Equal(t, 1, d.cache.Len())
ev := d.GetData([]byte(string(pid) + topic))
Expand All @@ -89,8 +91,8 @@ func TestAntifloodDebugger_AddDataExistingShouldChange(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid, topic, numRejected, sizeRejected, true)
d.AddData(pid, topic, numRejected, sizeRejected, false)
d.AddData(pid, topic, numRejected, sizeRejected, nil, true)
d.AddData(pid, topic, numRejected, sizeRejected, nil, false)

assert.Equal(t, 1, d.cache.Len())
ev := d.GetData([]byte(string(pid) + topic))
Expand Down Expand Up @@ -126,8 +128,8 @@ func TestAntifloodDebugger_PrintShouldWork(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid1, topic, numRejected, sizeRejected, true)
d.AddData(pid2, topic, numRejected, sizeRejected, false)
d.AddData(pid1, topic, numRejected, sizeRejected, nil, true)
d.AddData(pid2, topic, numRejected, sizeRejected, nil, false)

time.Sleep(time.Millisecond * 1500)

Expand Down Expand Up @@ -164,7 +166,7 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) {
d.printEventFunc = func(data string) {
atomic.AddInt32(&numPrinted, 1)
}
d.AddData("", "", 0, 0, true)
d.AddData("", "", 0, 0, nil, true)

time.Sleep(time.Millisecond * 2500)
assert.True(t, atomic.LoadInt32(&numPrinted) > 0)
Expand All @@ -178,3 +180,67 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) {

assert.Equal(t, int32(0), atomic.LoadInt32(&numPrinted))
}

func TestAntifloodDebugger_DifferentSequenceShouldAppend(t *testing.T) {
t.Parallel()

d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{
CacheSize: 100,
IntervalAutoPrintInSeconds: 1,
})

seq1 := uint64(2347234)
seq1Buff := make([]byte, 8)
binary.BigEndian.PutUint64(seq1Buff, seq1)

seq2 := uint64(110)
seq2Buff := make([]byte, 8)
binary.BigEndian.PutUint64(seq2Buff, seq2)

pid := core.PeerID("pid")
topic := "topic"
d.AddData(pid, topic, 0, 0, seq1Buff, true)
d.AddData(pid, topic, 0, 0, seq2Buff, true)

ev := d.GetData(d.computeIdentifier(pid, topic))
evLine := ev.String()
fmt.Println(evLine)
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq1)))
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq2)))
}

func TestAntifloodDebugger_MoreSequencesShouldTrim(t *testing.T) {
t.Parallel()

d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{
CacheSize: 100,
IntervalAutoPrintInSeconds: 1,
})

pid := core.PeerID("pid")
topic := "topic"
startValue := uint64(33946384)
numVals := 10
for i := 0; i < numVals; i++ {
seq := startValue + uint64(i)
seqBuff := make([]byte, 8)
binary.BigEndian.PutUint64(seqBuff, seq)

d.AddData(pid, topic, 0, 0, seqBuff, true)
}

ev := d.GetData(d.computeIdentifier(pid, topic))
evLine := ev.String()
fmt.Println(evLine)
for i := 0; i < maxSequencesToPrint; i++ {
val := startValue + uint64(i)
assert.False(t, strings.Contains(evLine, fmt.Sprintf("%d", val)))
}

for i := maxSequencesToPrint; i < numVals; i++ {
val := startValue + uint64(i)
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", val)))
}

assert.True(t, strings.Contains(evLine, moreSequencesPresent))
}
2 changes: 1 addition & 1 deletion epochStart/bootstrap/disabled/disabledAntiFloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (a *antiFloodHandler) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerID) er
}

// CanProcessMessagesOnTopic returns nil regardless of the input
func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64) error {
func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error {
return nil
}

Expand Down