Skip to content

Commit

Permalink
Merge pull request #1895 from ElrondNetwork/EN-6710-antiflood-blocks-…
Browse files Browse the repository at this point in the history
…consensus

EN-6710 antiflood blocks consensus fix
  • Loading branch information
LucianMincu committed Jun 9, 2020
2 parents 2dd5f7a + 1b66268 commit a6e14df
Show file tree
Hide file tree
Showing 35 changed files with 167 additions and 62 deletions.
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,8 @@
Capacity = 5000
Type = "LRU"
[Antiflood.Topic]
DefaultMaxMessagesPerSec = 10000
MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 10 },
DefaultMaxMessagesPerSec = 15000
MaxMessages = [{ Topic = "heartbeat", NumMessagesPerSec = 20 },
{ Topic = "shardBlocks*", NumMessagesPerSec = 20 },
{ Topic = "metachainBlocks", NumMessagesPerSec = 20 }]
[Antiflood.WebServer]
Expand Down
2 changes: 1 addition & 1 deletion consensus/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ type NetworkShardingCollector interface {
// p2p messages
type P2PAntifloodHandler interface {
CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
ResetForTopic(topic string)
SetMaxMessagesForTopic(topic string, maxNum uint32)
IsInterfaceNil() bool
Expand Down
6 changes: 3 additions & 3 deletions consensus/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// P2PAntifloodHandlerStub -
type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
}

// ResetForTopic -
Expand All @@ -29,9 +29,9 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P,
}

// CanProcessMessagesOnTopic -
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
if p2pahs.CanProcessMessagesOnTopicCalled != nil {
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize)
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence)
}

return nil
Expand Down
12 changes: 11 additions & 1 deletion consensus/spos/bls/blsWorker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,17 @@ import (
"github.com/ElrondNetwork/elrond-go/consensus/spos"
)

const peerMaxMessagesPerSec = uint32(2)
// peerMaxMessagesPerSec defines how many messages can be propagated by a pid in a round. The value was chosen by
// following the next premises:
// 1. a leader can propagate as maximum as 3 messages per round: proposed header block + proposed body + final info;
// 2. due to the fact that a delayed signature of the proposer (from previous round) can be received in the current round
// adds an extra 1 to the total value, reaching value 4;
// 3. Because the leader might be selected in the next round and might have an empty data pool, it can send the newly
// empty proposed block at the very beginning of the next round. One extra message here, yielding to a total of 5.
// 4. If we consider the forks that can appear on the system wee need to add one more to the value.
// Validators only send one signature message in a round, treating the edge case of a delayed message, will need at most
// 2 messages per round (which is ok as it is below the set value of 5)
const peerMaxMessagesPerSec = uint32(6)

// worker defines the data needed by spos to communicate between nodes which are in the validators group
type worker struct {
Expand Down
2 changes: 1 addition & 1 deletion consensus/spos/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ func (wrk *Worker) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedP
}

topic := GetConsensusTopicIDFromShardCoordinator(wrk.shardCoordinator)
err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())))
err = wrk.antifloodHandler.CanProcessMessagesOnTopic(message.Peer(), topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions consensus/spos/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func createMockP2PAntifloodHandler() *mock.P2PAntifloodHandlerStub {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down Expand Up @@ -395,7 +395,7 @@ func TestWorker_ProcessReceivedMessageShouldErrIfFloodIsDetectedOnTopic(t *testi
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return expectedErr
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ type RequestedItemsHandler interface {
// p2p messages
type P2PAntifloodHandler interface {
CanProcessMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64) error
CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, _ uint64, sequence []byte) error
IsInterfaceNil() bool
}

Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
// P2PAntifloodHandlerStub -
type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
}

// CanProcessMessage -
Expand All @@ -21,12 +21,12 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P,
}

// CanProcessMessagesOnTopic -
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
if p2pahs.CanProcessMessagesOnTopicCalled == nil {
return nil
}

return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize)
return p2pahs.CanProcessMessagesOnTopicCalled(peer, topic, numMessages, totalSize, sequence)
}

// IsInterfaceNil -
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/headerResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ func TestHeaderResolver_ProcessReceivedCanProcessMessageErrorsShouldErr(t *testi
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/messageProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (mp *messageProcessor) canProcessMessage(message p2p.MessageP2P, fromConnec
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())))
err = mp.antifloodHandler.CanProcessMessagesOnTopic(fromConnectedPeer, mp.topic, 1, uint64(len(message.Data())), message.SeqNo())
if err != nil {
return fmt.Errorf("%w on resolver topic %s", err, mp.topic)
}
Expand Down
6 changes: 3 additions & 3 deletions dataRetriever/resolvers/messageProcessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestMessageProcessor_CanProcessOnTopicErrorsShouldErr(t *testing.T) {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return expectedErr
},
},
Expand All @@ -60,7 +60,7 @@ func TestMessageProcessor_CanProcessThrottlerNotAllowingShouldErr(t *testing.T)
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
},
Expand All @@ -87,7 +87,7 @@ func TestMessageProcessor_CanProcessShouldWork(t *testing.T) {
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return nil
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
},
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/transactionResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func TestTxResolver_ProcessReceivedMessageCanProcessMessageErrorsShouldErr(t *te
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
2 changes: 1 addition & 1 deletion dataRetriever/resolvers/trieNodeResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestTrieNodeResolver_ProcessReceivedAntiflooderCanProcessMessageErrShouldEr
CanProcessMessageCalled: func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error {
return expectedErr
},
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64) error {
CanProcessMessagesOnTopicCalled: func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
return nil
},
}
Expand Down
31 changes: 29 additions & 2 deletions debug/antiflood/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package antiflood

import (
"context"
"encoding/binary"
"fmt"
"sort"
"strings"
"sync"
"time"
Expand All @@ -20,11 +22,14 @@ const sizeUint64 = 8
const sizeBool = 1
const newLineChar = "\n"
const minIntervalInSeconds = 1
const maxSequencesToPrint = 5
const moreSequencesPresent = "..."

var log = logger.GetOrCreate("debug/antiflood")

type event struct {
pid core.PeerID
sequences map[uint64]struct{}
topic string
numRejected uint32
sizeRejected uint64
Expand All @@ -37,8 +42,21 @@ func (ev *event) Size() int {
}

func (ev *event) String() string {
return fmt.Sprintf("pid: %s, topic: %s, num rejected: %d, size rejected: %d, is blacklisted: %v",
ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, ev.isBlackListed)
sequences := make([]string, 0, len(ev.sequences))
for seq := range ev.sequences {
sequences = append(sequences, fmt.Sprintf("%d", seq))
}

sort.Slice(sequences, func(i, j int) bool {
return sequences[i] < sequences[j]
})

if len(sequences) > maxSequencesToPrint {
sequences = append([]string{moreSequencesPresent}, sequences[len(sequences)-maxSequencesToPrint:]...)
}

return fmt.Sprintf("pid: %s; topic: %s; num rejected: %d; size rejected: %d; seqences: %s; is blacklisted: %v",
ev.pid.Pretty(), ev.topic, ev.numRejected, ev.sizeRejected, strings.Join(sequences, ", "), ev.isBlackListed)
}

type debugger struct {
Expand Down Expand Up @@ -78,10 +96,16 @@ func (d *debugger) AddData(
topic string,
numRejected uint32,
sizeRejected uint64,
sequence []byte,
isBlacklisted bool,
) {
identifier := d.computeIdentifier(pid, topic)

seqVal := uint64(0)
if len(sequence) >= 8 {
seqVal = binary.BigEndian.Uint64(sequence)
}

d.mut.Lock()
defer d.mut.Unlock()

Expand All @@ -93,6 +117,7 @@ func (d *debugger) AddData(
numRejected: 0,
sizeRejected: 0,
isBlackListed: false,
sequences: map[uint64]struct{}{seqVal: {}},
}
}

Expand All @@ -104,12 +129,14 @@ func (d *debugger) AddData(
numRejected: 0,
sizeRejected: 0,
isBlackListed: false,
sequences: map[uint64]struct{}{seqVal: {}},
}
}

ev.numRejected += numRejected
ev.sizeRejected += sizeRejected
ev.isBlackListed = isBlacklisted
ev.sequences[seqVal] = struct{}{}

d.cache.Put(identifier, ev, ev.Size())
}
Expand Down
78 changes: 72 additions & 6 deletions debug/antiflood/debugger_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package antiflood

import (
"encoding/binary"
"errors"
"fmt"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -65,7 +67,7 @@ func TestAntifloodDebugger_AddDataNotExistingShouldAdd(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid, topic, numRejected, sizeRejected, true)
d.AddData(pid, topic, numRejected, sizeRejected, make([]byte, 8), true)

assert.Equal(t, 1, d.cache.Len())
ev := d.GetData([]byte(string(pid) + topic))
Expand All @@ -89,8 +91,8 @@ func TestAntifloodDebugger_AddDataExistingShouldChange(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid, topic, numRejected, sizeRejected, true)
d.AddData(pid, topic, numRejected, sizeRejected, false)
d.AddData(pid, topic, numRejected, sizeRejected, nil, true)
d.AddData(pid, topic, numRejected, sizeRejected, nil, false)

assert.Equal(t, 1, d.cache.Len())
ev := d.GetData([]byte(string(pid) + topic))
Expand Down Expand Up @@ -126,8 +128,8 @@ func TestAntifloodDebugger_PrintShouldWork(t *testing.T) {
topic := "topic"
numRejected := uint32(272)
sizeRejected := uint64(7272)
d.AddData(pid1, topic, numRejected, sizeRejected, true)
d.AddData(pid2, topic, numRejected, sizeRejected, false)
d.AddData(pid1, topic, numRejected, sizeRejected, nil, true)
d.AddData(pid2, topic, numRejected, sizeRejected, nil, false)

time.Sleep(time.Millisecond * 1500)

Expand Down Expand Up @@ -164,7 +166,7 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) {
d.printEventFunc = func(data string) {
atomic.AddInt32(&numPrinted, 1)
}
d.AddData("", "", 0, 0, true)
d.AddData("", "", 0, 0, nil, true)

time.Sleep(time.Millisecond * 2500)
assert.True(t, atomic.LoadInt32(&numPrinted) > 0)
Expand All @@ -178,3 +180,67 @@ func TestAntifloodDebugger_CloseShouldWork(t *testing.T) {

assert.Equal(t, int32(0), atomic.LoadInt32(&numPrinted))
}

func TestAntifloodDebugger_DifferentSequenceShouldAppend(t *testing.T) {
t.Parallel()

d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{
CacheSize: 100,
IntervalAutoPrintInSeconds: 1,
})

seq1 := uint64(2347234)
seq1Buff := make([]byte, 8)
binary.BigEndian.PutUint64(seq1Buff, seq1)

seq2 := uint64(110)
seq2Buff := make([]byte, 8)
binary.BigEndian.PutUint64(seq2Buff, seq2)

pid := core.PeerID("pid")
topic := "topic"
d.AddData(pid, topic, 0, 0, seq1Buff, true)
d.AddData(pid, topic, 0, 0, seq2Buff, true)

ev := d.GetData(d.computeIdentifier(pid, topic))
evLine := ev.String()
fmt.Println(evLine)
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq1)))
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", seq2)))
}

func TestAntifloodDebugger_MoreSequencesShouldTrim(t *testing.T) {
t.Parallel()

d, _ := NewAntifloodDebugger(config.AntifloodDebugConfig{
CacheSize: 100,
IntervalAutoPrintInSeconds: 1,
})

pid := core.PeerID("pid")
topic := "topic"
startValue := uint64(33946384)
numVals := 10
for i := 0; i < numVals; i++ {
seq := startValue + uint64(i)
seqBuff := make([]byte, 8)
binary.BigEndian.PutUint64(seqBuff, seq)

d.AddData(pid, topic, 0, 0, seqBuff, true)
}

ev := d.GetData(d.computeIdentifier(pid, topic))
evLine := ev.String()
fmt.Println(evLine)
for i := 0; i < maxSequencesToPrint; i++ {
val := startValue + uint64(i)
assert.False(t, strings.Contains(evLine, fmt.Sprintf("%d", val)))
}

for i := maxSequencesToPrint; i < numVals; i++ {
val := startValue + uint64(i)
assert.True(t, strings.Contains(evLine, fmt.Sprintf("%d", val)))
}

assert.True(t, strings.Contains(evLine, moreSequencesPresent))
}
2 changes: 1 addition & 1 deletion epochStart/bootstrap/disabled/disabledAntiFloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (a *antiFloodHandler) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerID) er
}

// CanProcessMessagesOnTopic returns nil regardless of the input
func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64) error {
func (a *antiFloodHandler) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error {
return nil
}

Expand Down
Loading

0 comments on commit a6e14df

Please sign in to comment.