Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

En 6833/antiflood observers #2007

Merged
merged 17 commits into from
Jun 26, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
25 changes: 24 additions & 1 deletion cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ElrondNetwork/elrond-go/process/block/preprocess"
"github.com/ElrondNetwork/elrond-go/process/coordinator"
"github.com/ElrondNetwork/elrond-go/process/economics"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/process/factory/interceptorscontainer"
"github.com/ElrondNetwork/elrond-go/process/factory/metachain"
"github.com/ElrondNetwork/elrond-go/process/factory/shard"
Expand Down Expand Up @@ -1729,7 +1730,24 @@ func newValidatorStatisticsProcessor(
return validatorStatisticsProcessor, nil
}

// PrepareNetworkShardingCollector will create the network sharding collector and apply it to the network messenger
// PrepareOpenTopics will set to the anti flood handler the topics for which
// the node can receive messages from others than validators
func PrepareOpenTopics(
antiflood mainFactory.P2PAntifloodHandler,
shardCoordinator sharding.Coordinator,
) {
selfID := shardCoordinator.SelfId()
if selfID == core.MetachainShardId {
antiflood.SetTopicsForAll(core.HeartbeatTopic)
return
}

selfShardTxTopic := factory.TransactionTopic + core.CommunicationIdentifierBetweenShards(selfID, selfID)
antiflood.SetTopicsForAll(core.HeartbeatTopic, selfShardTxTopic)
}

// PrepareNetworkShardingCollector will create the network sharding collector and apply it to
// the network messenger and antiflood handler
func PrepareNetworkShardingCollector(
network *mainFactory.NetworkComponents,
config *config.Config,
Expand All @@ -1752,6 +1770,11 @@ func PrepareNetworkShardingCollector(
return nil, err
}

err = network.InputAntifloodHandler.SetPeerValidatorMapper(networkShardingCollector)
if err != nil {
return nil, err
}

return networkShardingCollector, nil
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2072,6 +2072,8 @@ func createNode(
return nil, err
}

factory.PrepareOpenTopics(network.InputAntifloodHandler, shardCoordinator)

apiTxsByHashThrottler, err := throttler.NewNumGoRoutinesThrottler(maxNumGoRoutinesTxsByHashApi)
if err != nil {
return nil, err
Expand Down
5 changes: 5 additions & 0 deletions epochStart/bootstrap/disabled/disabledAntiFloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (a *antiFloodHandler) SetDebugger(_ process.AntifloodDebugger) error {
func (a *antiFloodHandler) BlacklistPeer(_ core.PeerID, _ string, _ time.Duration) {
}

// IsOriginatorEligibleForTopic returns nil
func (a *antiFloodHandler) IsOriginatorEligibleForTopic(_ core.PeerID, _ string) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (a *antiFloodHandler) IsInterfaceNil() bool {
return a == nil
Expand Down
3 changes: 3 additions & 0 deletions factory/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ type P2PAntifloodHandler interface {
ResetForTopic(topic string)
SetMaxMessagesForTopic(topic string, maxNum uint32)
SetDebugger(debugger process.AntifloodDebugger) error
SetPeerValidatorMapper(validatorMapper process.PeerValidatorMapper) error
SetTopicsForAll(topics ...string)
ApplyConsensusSize(size int)
BlacklistPeer(peer core.PeerID, reason string, duration time.Duration)
IsOriginatorEligibleForTopic(pid core.PeerID, topic string) error
IsInterfaceNil() bool
}
5 changes: 5 additions & 0 deletions integrationTests/mock/nilAntifloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (nah *NilAntifloodHandler) ApplyConsensusSize(_ int) {
func (nah *NilAntifloodHandler) BlacklistPeer(_ core.PeerID, _ string, _ time.Duration) {
}

// IsOriginatorEligibleForTopic returns nil
func (nah *NilAntifloodHandler) IsOriginatorEligibleForTopic(_ core.PeerID, _ string) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (nah *NilAntifloodHandler) IsInterfaceNil() bool {
return nah == nil
Expand Down
6 changes: 6 additions & 0 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -805,3 +805,9 @@ var ErrInvalidUserNameLength = errors.New("invalid user name length")

// ErrTxValueOutOfBounds signals that transaction value is out of bounds
var ErrTxValueOutOfBounds = errors.New("tx value is out of bounds")

// ErrNilPeerValidatorMapper signals that nil peer validator mapper has been provided
var ErrNilPeerValidatorMapper = errors.New("nil peer validator mapper")

// ErrOnlyValidatorsCanUseThisTopic signals that topic can be used by validator only
var ErrOnlyValidatorsCanUseThisTopic = errors.New("only validators can use this topic")
15 changes: 14 additions & 1 deletion process/interceptors/multiDataInterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
mdi.throttler.EndProcessing()
}()

errOriginator := mdi.antifloodHandler.IsOriginatorEligibleForTopic(message.Peer(), mdi.topic)
allWhiteListed := true
for _, dataBuff := range multiDataBuff {
var interceptedData process.InterceptedData
interceptedData, err = mdi.interceptedData(dataBuff, message.Peer(), fromConnectedPeer)
Expand All @@ -130,8 +132,15 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
continue
}

isForCurrentShard := interceptedData.IsForCurrentShard()
isWhiteListed := mdi.whiteListRequest.IsWhiteListed(interceptedData)
if !isWhiteListed && errOriginator != nil {
lastErrEncountered = errOriginator
allWhiteListed = false
wgProcess.Done()
continue
sasurobert marked this conversation as resolved.
Show resolved Hide resolved
}

isForCurrentShard := interceptedData.IsForCurrentShard()
shouldProcess := isForCurrentShard || isWhiteListed
if !shouldProcess {
log.Trace("intercepted data should not be processed",
Expand All @@ -156,6 +165,10 @@ func (mdi *MultiDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
)
}

if !allWhiteListed && errOriginator != nil {
log.Debug("got message from peer on topic only for validators", "originator", message.Peer(), "topic", mdi.topic, "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to:

log.Debug("got message from peer on topic only for validators", 
"originator", p2p.PeerIdToShortString(message.Peer()), 
"topic", mdi.topic, 
"err", err)

as message.Peer() might contain unprintable characters
I think we should add a String() function on core.PeerID struct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

return lastErrEncountered
}

Expand Down
69 changes: 69 additions & 0 deletions process/interceptors/multiDataInterceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,75 @@ func TestMultiDataInterceptor_SetInterceptedDebugHandlerShouldWork(t *testing.T)
assert.True(t, debugger == mdi.InterceptedDebugHandler()) //pointer testing
}

func TestMultiDataInterceptor_ProcessReceivedMessageIsOriginatorNotOkButWhiteListed(t *testing.T) {
t.Parallel()

buffData := [][]byte{[]byte("buff1"), []byte("buff2")}

marshalizer := &mock.MarshalizerMock{}
checkCalledNum := int32(0)
processCalledNum := int32(0)
throttler := createMockThrottler()
interceptedData := &mock.InterceptedDataStub{
CheckValidityCalled: func() error {
return nil
},
IsForCurrentShardCalled: func() bool {
return false
},
}

whiteListHandler := &mock.WhiteListHandlerStub{
IsWhiteListedCalled: func(interceptedData process.InterceptedData) bool {
return true
},
}
errOriginator := process.ErrOnlyValidatorsCanUseThisTopic
mdi, _ := interceptors.NewMultiDataInterceptor(
testTopic,
marshalizer,
&mock.InterceptedDataFactoryStub{
CreateCalled: func(buff []byte) (data process.InterceptedData, e error) {
return interceptedData, nil
},
},
createMockInterceptorStub(&checkCalledNum, &processCalledNum),
throttler,
&mock.P2PAntifloodHandlerStub{
IsOriginatorEligibleForTopicCalled: func(pid core.PeerID, topic string) error {
return errOriginator
},
},
whiteListHandler,
)

dataField, _ := marshalizer.Marshal(&batch.Batch{Data: buffData})
msg := &mock.P2PMessageMock{
DataField: dataField,
}
err := mdi.ProcessReceivedMessage(msg, fromConnectedPeerId)

time.Sleep(time.Second)

assert.Nil(t, err)
assert.Equal(t, int32(2), atomic.LoadInt32(&checkCalledNum))
assert.Equal(t, int32(2), atomic.LoadInt32(&processCalledNum))
assert.Equal(t, int32(1), throttler.StartProcessingCount())
assert.Equal(t, int32(1), throttler.EndProcessingCount())

whiteListHandler.IsWhiteListedCalled = func(interceptedData process.InterceptedData) bool {
return false
}
err = mdi.ProcessReceivedMessage(msg, fromConnectedPeerId)
time.Sleep(time.Second)

assert.Equal(t, err, errOriginator)
assert.Equal(t, int32(2), atomic.LoadInt32(&checkCalledNum))
assert.Equal(t, int32(2), atomic.LoadInt32(&processCalledNum))
assert.Equal(t, int32(2), throttler.StartProcessingCount())
assert.Equal(t, int32(2), throttler.EndProcessingCount())
}

//------- IsInterfaceNil

func TestMultiDataInterceptor_IsInterfaceNil(t *testing.T) {
Expand Down
9 changes: 8 additions & 1 deletion process/interceptors/singleDataInterceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (sdi *SingleDataInterceptor) ProcessReceivedMessage(message p2p.MessageP2P,
return err
}

isForCurrentShard := interceptedData.IsForCurrentShard()
errOriginator := sdi.antifloodHandler.IsOriginatorEligibleForTopic(message.Peer(), sdi.topic)
isWhiteListed := sdi.whiteListRequested.IsWhiteListed(interceptedData)
if !isWhiteListed && errOriginator != nil {
log.Debug("got message from peer on topic only for validators", "originator", message.Peer(), "topic", sdi.topic, "err", err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, please use p2p.PeerIdToShortString function

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

sdi.throttler.EndProcessing()
return errOriginator
}

isForCurrentShard := interceptedData.IsForCurrentShard()
shouldProcess := isForCurrentShard || isWhiteListed
if !shouldProcess {
sdi.throttler.EndProcessing()
Expand Down
65 changes: 65 additions & 0 deletions process/interceptors/singleDataInterceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,71 @@ func TestSingleDataInterceptor_ProcessReceivedMessageWhitelistedShouldWork(t *te
assert.Equal(t, int32(1), throttler.EndProcessingCount())
}

func TestSingleDataInterceptor_ProcessReceivedMessageWithOriginator(t *testing.T) {
t.Parallel()

checkCalledNum := int32(0)
processCalledNum := int32(0)
throttler := createMockThrottler()
interceptedData := &mock.InterceptedDataStub{
CheckValidityCalled: func() error {
return nil
},
IsForCurrentShardCalled: func() bool {
return false
},
}

whiteListHandler := &mock.WhiteListHandlerStub{
IsWhiteListedCalled: func(interceptedData process.InterceptedData) bool {
return true
},
}
sdi, _ := interceptors.NewSingleDataInterceptor(
testTopic,
&mock.InterceptedDataFactoryStub{
CreateCalled: func(buff []byte) (data process.InterceptedData, e error) {
return interceptedData, nil
},
},
createMockInterceptorStub(&checkCalledNum, &processCalledNum),
throttler,
&mock.P2PAntifloodHandlerStub{
IsOriginatorEligibleForTopicCalled: func(pid core.PeerID, topic string) error {
return process.ErrOnlyValidatorsCanUseThisTopic
},
},
whiteListHandler,
)

msg := &mock.P2PMessageMock{
DataField: []byte("data to be processed"),
}
err := sdi.ProcessReceivedMessage(msg, fromConnectedPeerId)

time.Sleep(time.Second)

assert.Nil(t, err)
assert.Equal(t, int32(1), atomic.LoadInt32(&checkCalledNum))
assert.Equal(t, int32(1), atomic.LoadInt32(&processCalledNum))
assert.Equal(t, int32(1), throttler.EndProcessingCount())
assert.Equal(t, int32(1), throttler.EndProcessingCount())

whiteListHandler.IsWhiteListedCalled = func(interceptedData process.InterceptedData) bool {
return false
}

err = sdi.ProcessReceivedMessage(msg, fromConnectedPeerId)

time.Sleep(time.Second)

assert.Equal(t, err, process.ErrOnlyValidatorsCanUseThisTopic)
assert.Equal(t, int32(1), atomic.LoadInt32(&checkCalledNum))
assert.Equal(t, int32(1), atomic.LoadInt32(&processCalledNum))
assert.Equal(t, int32(2), throttler.EndProcessingCount())
assert.Equal(t, int32(2), throttler.EndProcessingCount())
}

//------- debug

func TestSingleDataInterceptor_SetInterceptedDebugHandlerNilShouldErr(t *testing.T) {
Expand Down
7 changes: 7 additions & 0 deletions process/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,13 @@ type P2PAntifloodHandler interface {
ApplyConsensusSize(size int)
SetDebugger(debugger AntifloodDebugger) error
BlacklistPeer(peer core.PeerID, reason string, duration time.Duration)
IsOriginatorEligibleForTopic(pid core.PeerID, topic string) error
IsInterfaceNil() bool
}

// PeerValidatorMapper can determine the peer info from a peer id
type PeerValidatorMapper interface {
GetPeerInfo(pid core.PeerID) core.P2PPeerInfo
IsInterfaceNil() bool
}

Expand Down
19 changes: 14 additions & 5 deletions process/mock/p2pAntifloodHandlerStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,12 @@ import (

// P2PAntifloodHandlerStub -
type P2PAntifloodHandlerStub struct {
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
ApplyConsensusSizeCalled func(size int)
SetDebuggerCalled func(debugger process.AntifloodDebugger) error
BlacklistPeerCalled func(peer core.PeerID, reason string, duration time.Duration)
CanProcessMessageCalled func(message p2p.MessageP2P, fromConnectedPeer core.PeerID) error
CanProcessMessagesOnTopicCalled func(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error
ApplyConsensusSizeCalled func(size int)
SetDebuggerCalled func(debugger process.AntifloodDebugger) error
BlacklistPeerCalled func(peer core.PeerID, reason string, duration time.Duration)
IsOriginatorEligibleForTopicCalled func(pid core.PeerID, topic string) error
}

// CanProcessMessage -
Expand All @@ -25,6 +26,14 @@ func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessage(message p2p.MessageP2P,
return p2pahs.CanProcessMessageCalled(message, fromConnectedPeer)
}

// IsOriginatorEligibleForTopic -
func (p2pahs *P2PAntifloodHandlerStub) IsOriginatorEligibleForTopic(pid core.PeerID, topic string) error {
if p2pahs.IsOriginatorEligibleForTopicCalled != nil {
return p2pahs.IsOriginatorEligibleForTopicCalled(pid, topic)
}
return nil
}

// CanProcessMessagesOnTopic -
func (p2pahs *P2PAntifloodHandlerStub) CanProcessMessagesOnTopic(peer core.PeerID, topic string, numMessages uint32, totalSize uint64, sequence []byte) error {
if p2pahs.CanProcessMessagesOnTopicCalled == nil {
Expand Down
20 changes: 20 additions & 0 deletions process/mock/peerShardResolverStub.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package mock

import (
"github.com/ElrondNetwork/elrond-go/core"
)

// PeerShardResolverStub -
type PeerShardResolverStub struct {
GetPeerInfoCalled func(pid core.PeerID) core.P2PPeerInfo
}

// GetPeerInfo -
func (psrs *PeerShardResolverStub) GetPeerInfo(pid core.PeerID) core.P2PPeerInfo {
return psrs.GetPeerInfoCalled(pid)
}

// IsInterfaceNil -
func (psrs *PeerShardResolverStub) IsInterfaceNil() bool {
return psrs == nil
}
14 changes: 14 additions & 0 deletions process/throttle/antiflood/disabled/antiflood.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,20 @@ func (af *AntiFlood) CanProcessMessage(_ p2p.MessageP2P, _ core.PeerID) error {
return nil
}

// IsOriginatorEligibleForTopic will always return nil
func (af *AntiFlood) IsOriginatorEligibleForTopic(_ core.PeerID, _ string) error {
return nil
sasurobert marked this conversation as resolved.
Show resolved Hide resolved
}

// SetTopicsForAll does nothing
func (af *AntiFlood) SetTopicsForAll(_ ...string) {
}

// SetPeerValidatorMapper does nothing
func (af *AntiFlood) SetPeerValidatorMapper(_ process.PeerValidatorMapper) error {
return nil
}

// CanProcessMessagesOnTopic will always return nil
func (af *AntiFlood) CanProcessMessagesOnTopic(_ core.PeerID, _ string, _ uint32, _ uint64, _ []byte) error {
return nil
Expand Down