Skip to content

Commit

Permalink
Merge pull request #2007 from ElrondNetwork/EN-6833/Antiflood-observers
Browse files Browse the repository at this point in the history
En 6833/antiflood observers
  • Loading branch information
LucianMincu committed Jun 26, 2020
2 parents 8230210 + a721a94 commit 1d88984
Show file tree
Hide file tree
Showing 57 changed files with 482 additions and 182 deletions.
31 changes: 26 additions & 5 deletions cmd/node/factory/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/ElrondNetwork/elrond-go/process/block/preprocess"
"github.com/ElrondNetwork/elrond-go/process/coordinator"
"github.com/ElrondNetwork/elrond-go/process/economics"
"github.com/ElrondNetwork/elrond-go/process/factory"
"github.com/ElrondNetwork/elrond-go/process/factory/interceptorscontainer"
"github.com/ElrondNetwork/elrond-go/process/factory/metachain"
"github.com/ElrondNetwork/elrond-go/process/factory/shard"
Expand Down Expand Up @@ -765,7 +766,6 @@ func newShardInterceptorContainerFactory(
WhiteListHandler: whiteListHandler,
WhiteListerVerifiedTxs: whiteListerVerifiedTxs,
AntifloodHandler: network.InputAntifloodHandler,
NonceConverter: dataCore.Uint64ByteSliceConverter,
}
interceptorContainerFactory, err := interceptorscontainer.NewShardInterceptorsContainerFactory(shardInterceptorsContainerFactoryArgs)
if err != nil {
Expand Down Expand Up @@ -820,7 +820,6 @@ func newMetaInterceptorContainerFactory(
WhiteListHandler: whiteListHandler,
WhiteListerVerifiedTxs: whiteListerVerifiedTxs,
AntifloodHandler: network.InputAntifloodHandler,
NonceConverter: dataCore.Uint64ByteSliceConverter,
}
interceptorContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(metaInterceptorsContainerFactoryArgs)
if err != nil {
Expand Down Expand Up @@ -1725,17 +1724,34 @@ func newValidatorStatisticsProcessor(
return validatorStatisticsProcessor, nil
}

// PrepareNetworkShardingCollector will create the network sharding collector and apply it to the network messenger
// PrepareOpenTopics will set to the anti flood handler the topics for which
// the node can receive messages from others than validators
func PrepareOpenTopics(
antiflood mainFactory.P2PAntifloodHandler,
shardCoordinator sharding.Coordinator,
) {
selfID := shardCoordinator.SelfId()
if selfID == core.MetachainShardId {
antiflood.SetTopicsForAll(core.HeartbeatTopic)
return
}

selfShardTxTopic := factory.TransactionTopic + core.CommunicationIdentifierBetweenShards(selfID, selfID)
antiflood.SetTopicsForAll(core.HeartbeatTopic, selfShardTxTopic)
}

// PrepareNetworkShardingCollector will create the network sharding collector and apply it to
// the network messenger and antiflood handler
func PrepareNetworkShardingCollector(
network *mainFactory.NetworkComponents,
config *config.Config,
nodesCoordinator sharding.NodesCoordinator,
coordinator sharding.Coordinator,
epochStartRegistrationHandler epochStart.RegistrationHandler,
epochShard uint32,
epochStart uint32,
) (*networksharding.PeerShardMapper, error) {

networkShardingCollector, err := createNetworkShardingCollector(config, nodesCoordinator, epochStartRegistrationHandler, epochShard)
networkShardingCollector, err := createNetworkShardingCollector(config, nodesCoordinator, epochStartRegistrationHandler, epochStart)
if err != nil {
return nil, err
}
Expand All @@ -1748,6 +1764,11 @@ func PrepareNetworkShardingCollector(
return nil, err
}

err = network.InputAntifloodHandler.SetPeerValidatorMapper(networkShardingCollector)
if err != nil {
return nil, err
}

return networkShardingCollector, nil
}

Expand Down
4 changes: 3 additions & 1 deletion cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2035,12 +2035,14 @@ func createNode(
nodesCoordinator,
shardCoordinator,
epochStartRegistrationHandler,
process.EpochStartTrigger.Epoch(),
process.EpochStartTrigger.MetaEpoch(),
)
if err != nil {
return nil, err
}

factory.PrepareOpenTopics(network.InputAntifloodHandler, shardCoordinator)

apiTxsByHashThrottler, err := throttler.NewNumGoRoutinesThrottler(maxNumGoRoutinesTxsByHashApi)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion consensus/mock/nodesCoordinatorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func (ncm *NodesCoordinatorMock) ComputeAdditionalLeaving([]*state.ShardValidato
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte, _ uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte) (sharding.Validator, uint32, error) {
panic("implement me")
}

Expand Down
6 changes: 3 additions & 3 deletions core/mock/nodesCoordinatorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type NodesCoordinatorMock struct {
GetValidatorsRewardsAddressesCalled func(randomness []byte, round uint64, shardId uint32, epoch uint32) ([]string, error)
SetNodesPerShardsCalled func(nodes map[uint32][]sharding.Validator, epoch uint32) error
ComputeValidatorsGroupCalled func(randomness []byte, round uint64, shardId uint32, epoch uint32) (validatorsGroup []sharding.Validator, err error)
GetValidatorWithPublicKeyCalled func(publicKey []byte, epoch uint32) (validator sharding.Validator, shardId uint32, err error)
GetValidatorWithPublicKeyCalled func(publicKey []byte) (validator sharding.Validator, shardId uint32, err error)
GetAllEligibleValidatorsPublicKeysCalled func(epoch uint32) (map[uint32][][]byte, error)
GetAllWaitingValidatorsPublicKeysCalled func() (map[uint32][][]byte, error)
ConsensusGroupSizeCalled func(uint32) int
Expand Down Expand Up @@ -170,9 +170,9 @@ func (ncm *NodesCoordinatorMock) ConsensusGroupSize(shardId uint32) int {
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(publicKey []byte, epoch uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(publicKey []byte) (sharding.Validator, uint32, error) {
if ncm.GetValidatorWithPublicKeyCalled != nil {
return ncm.GetValidatorWithPublicKeyCalled(publicKey, epoch)
return ncm.GetValidatorWithPublicKeyCalled(publicKey)
}

if publicKey == nil {
Expand Down
2 changes: 2 additions & 0 deletions dataRetriever/requestHandlers/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var log = logger.GetOrCreate("dataretriever/requesthandlers")
const minHashesToRequest = 10
const timeToAccumulateTrieHashes = 100 * time.Millisecond

//TODO move the keys definitions that are whitelisted in core and use them in InterceptedData implementations, Identifiers() function

type resolverRequestHandler struct {
epoch uint32
shardID uint32
Expand Down
5 changes: 5 additions & 0 deletions epochStart/bootstrap/disabled/disabledAntiFloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (a *antiFloodHandler) SetDebugger(_ process.AntifloodDebugger) error {
func (a *antiFloodHandler) BlacklistPeer(_ core.PeerID, _ string, _ time.Duration) {
}

// IsOriginatorEligibleForTopic returns nil
func (a *antiFloodHandler) IsOriginatorEligibleForTopic(_ core.PeerID, _ string) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (a *antiFloodHandler) IsInterfaceNil() bool {
return a == nil
Expand Down
2 changes: 1 addition & 1 deletion epochStart/bootstrap/disabled/disabledNodesCoordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (n *nodesCoordinator) ComputeConsensusGroup(_ []byte, _ uint64, _ uint32, _
}

// GetValidatorWithPublicKey -
func (n *nodesCoordinator) GetValidatorWithPublicKey(_ []byte, _ uint32) (validator sharding.Validator, shardId uint32, err error) {
func (n *nodesCoordinator) GetValidatorWithPublicKey(_ []byte) (validator sharding.Validator, shardId uint32, err error) {
return nil, 0, nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
WhiteListHandler: args.WhiteListHandler,
WhiteListerVerifiedTxs: args.WhiteListerVerifiedTxs,
AntifloodHandler: antiFloodHandler,
NonceConverter: args.NonceConverter,
}

interceptorsContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(containerFactoryArgs)
Expand Down
1 change: 0 additions & 1 deletion epochStart/bootstrap/syncEpochStartMeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ func NewEpochStartMetaSyncer(args ArgsNewEpochStartMetaSyncer) (*epochStartMetaS
HeaderIntegrityVerifier: headerIntegrityVerifier,
ValidityAttester: disabled.NewValidityAttester(),
EpochStartTrigger: disabled.NewEpochStartTrigger(),
NonceConverter: args.NonceConverter,
}

interceptedMetaHdrDataFactory, err := interceptorsFactory.NewInterceptedMetaHeaderDataFactory(&argsInterceptedDataFactory)
Expand Down
4 changes: 2 additions & 2 deletions epochStart/mock/nodesCoordinatorStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ func (ncm *NodesCoordinatorStub) GetSelectedPublicKeys(_ []byte, _ uint32, _ uin
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorStub) GetValidatorWithPublicKey(address []byte, _ uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorStub) GetValidatorWithPublicKey(publicKey []byte) (sharding.Validator, uint32, error) {
if ncm.GetValidatorWithPublicKeyCalled != nil {
return ncm.GetValidatorWithPublicKeyCalled(address)
return ncm.GetValidatorWithPublicKeyCalled(publicKey)
}
return nil, 0, nil
}
Expand Down
3 changes: 3 additions & 0 deletions factory/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ type P2PAntifloodHandler interface {
ResetForTopic(topic string)
SetMaxMessagesForTopic(topic string, maxNum uint32)
SetDebugger(debugger process.AntifloodDebugger) error
SetPeerValidatorMapper(validatorMapper process.PeerValidatorMapper) error
SetTopicsForAll(topics ...string)
ApplyConsensusSize(size int)
BlacklistPeer(peer core.PeerID, reason string, duration time.Duration)
IsOriginatorEligibleForTopic(pid core.PeerID, topic string) error
IsInterfaceNil() bool
}
2 changes: 1 addition & 1 deletion heartbeat/mock/nodesCoordinatorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (ncm *NodesCoordinatorMock) GetSelectedPublicKeys(_ []byte, _ uint32, _ uin
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte, _ uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte) (sharding.Validator, uint32, error) {
panic("implement me")
}

Expand Down
5 changes: 5 additions & 0 deletions integrationTests/mock/nilAntifloodHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (nah *NilAntifloodHandler) ApplyConsensusSize(_ int) {
func (nah *NilAntifloodHandler) BlacklistPeer(_ core.PeerID, _ string, _ time.Duration) {
}

// IsOriginatorEligibleForTopic returns nil
func (nah *NilAntifloodHandler) IsOriginatorEligibleForTopic(_ core.PeerID, _ string) error {
return nil
}

// IsInterfaceNil returns true if there is no value under the interface
func (nah *NilAntifloodHandler) IsInterfaceNil() bool {
return nah == nil
Expand Down
4 changes: 2 additions & 2 deletions integrationTests/mock/nodesCoordinatorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,9 +139,9 @@ func (ncm *NodesCoordinatorMock) GetSelectedPublicKeys(_ []byte, _ uint32, _ uin
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(address []byte, _ uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(publicKey []byte) (sharding.Validator, uint32, error) {
if ncm.GetValidatorWithPublicKeyCalled != nil {
return ncm.GetValidatorWithPublicKeyCalled(address)
return ncm.GetValidatorWithPublicKeyCalled(publicKey)
}
return nil, 0, nil
}
Expand Down
2 changes: 0 additions & 2 deletions integrationTests/testProcessorNode.go
Original file line number Diff line number Diff line change
Expand Up @@ -688,7 +688,6 @@ func (tpn *TestProcessorNode) initInterceptors() {
WhiteListHandler: tpn.WhiteListHandler,
WhiteListerVerifiedTxs: tpn.WhiteListerVerifiedTxs,
AntifloodHandler: &mock.NilAntifloodHandler{},
NonceConverter: TestUint64Converter,
}
interceptorContainerFactory, _ := interceptorscontainer.NewMetaInterceptorsContainerFactory(metaIntercContFactArgs)

Expand Down Expand Up @@ -747,7 +746,6 @@ func (tpn *TestProcessorNode) initInterceptors() {
WhiteListHandler: tpn.WhiteListHandler,
WhiteListerVerifiedTxs: tpn.WhiteListerVerifiedTxs,
AntifloodHandler: &mock.NilAntifloodHandler{},
NonceConverter: TestUint64Converter,
}
interceptorContainerFactory, _ := interceptorscontainer.NewShardInterceptorsContainerFactory(shardInterContFactArgs)

Expand Down
2 changes: 1 addition & 1 deletion node/mock/nodesCoordinatorMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (ncm *NodesCoordinatorMock) GetSelectedPublicKeys(_ []byte, _ uint32, _ uin
}

// GetValidatorWithPublicKey -
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte, _ uint32) (sharding.Validator, uint32, error) {
func (ncm *NodesCoordinatorMock) GetValidatorWithPublicKey(_ []byte) (sharding.Validator, uint32, error) {
panic("implement me")
}

Expand Down
2 changes: 0 additions & 2 deletions process/block/interceptedBlocks/argInterceptedBlockHeader.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package interceptedBlocks

import (
"github.com/ElrondNetwork/elrond-go/data/typeConverters"
"github.com/ElrondNetwork/elrond-go/hashing"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
Expand All @@ -18,5 +17,4 @@ type ArgInterceptedBlockHeader struct {
HeaderIntegrityVerifier process.InterceptedHeaderIntegrityVerifier
ValidityAttester process.ValidityAttester
EpochStartTrigger process.EpochStartTriggerHandler
NonceConverter typeConverters.Uint64ByteSliceConverter
}
3 changes: 0 additions & 3 deletions process/block/interceptedBlocks/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,6 @@ func checkBlockHeaderArgument(arg *ArgInterceptedBlockHeader) error {
if check.IfNil(arg.ValidityAttester) {
return process.ErrNilValidityAttester
}
if check.IfNil(arg.NonceConverter) {
return process.ErrNilUint64Converter
}

return nil
}
Expand Down
12 changes: 0 additions & 12 deletions process/block/interceptedBlocks/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func createDefaultBlockHeaderArgument() *ArgInterceptedBlockHeader {
HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{},
ValidityAttester: &mock.ValidityAttesterStub{},
EpochStartTrigger: &mock.EpochStartTriggerStub{},
NonceConverter: mock.NewNonceHashConverterMock(),
}

return arg
Expand Down Expand Up @@ -135,17 +134,6 @@ func TestCheckBlockHeaderArgument_NilValidityAttesterShouldErr(t *testing.T) {
assert.Equal(t, process.ErrNilValidityAttester, err)
}

func TestCheckBlockHeaderArgument_NilNonceConverterShouldErr(t *testing.T) {
t.Parallel()

arg := createDefaultBlockHeaderArgument()
arg.NonceConverter = nil

err := checkBlockHeaderArgument(arg)

assert.Equal(t, process.ErrNilUint64Converter, err)
}

func TestCheckBlockHeaderArgument_ShouldWork(t *testing.T) {
t.Parallel()

Expand Down
8 changes: 3 additions & 5 deletions process/block/interceptedBlocks/interceptedBlockHeader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
"github.com/ElrondNetwork/elrond-go/data/typeConverters"
"github.com/ElrondNetwork/elrond-go/hashing"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
Expand All @@ -29,7 +28,6 @@ type InterceptedHeader struct {
isForCurrentShard bool
validityAttester process.ValidityAttester
epochStartTrigger process.EpochStartTriggerHandler
nonceConverter typeConverters.Uint64ByteSliceConverter
}

// NewInterceptedHeader creates a new instance of InterceptedHeader struct
Expand All @@ -52,7 +50,6 @@ func NewInterceptedHeader(arg *ArgInterceptedBlockHeader) (*InterceptedHeader, e
shardCoordinator: arg.ShardCoordinator,
validityAttester: arg.ValidityAttester,
epochStartTrigger: arg.EpochStartTrigger,
nonceConverter: arg.NonceConverter,
}
inHdr.processFields(arg.HdrBuff)

Expand Down Expand Up @@ -192,9 +189,10 @@ func (inHdr *InterceptedHeader) String() string {

// Identifiers returns the identifiers used in requests
func (inHdr *InterceptedHeader) Identifiers() [][]byte {
nonceBytes := inHdr.nonceConverter.ToByteSlice(inHdr.hdr.Nonce)
keyNonce := []byte(fmt.Sprintf("%d-%d", inHdr.hdr.ShardID, inHdr.hdr.Nonce))
keyEpoch := []byte(core.EpochStartIdentifier(inHdr.hdr.Epoch))

return [][]byte{inHdr.hash, nonceBytes}
return [][]byte{inHdr.hash, keyNonce, keyEpoch}
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ func createDefaultShardArgument() *interceptedBlocks.ArgInterceptedBlockHeader {
HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{},
ValidityAttester: &mock.ValidityAttesterStub{},
EpochStartTrigger: &mock.EpochStartTriggerStub{},
NonceConverter: mock.NewNonceHashConverterMock(),
}

hdr := createMockShardHeader()
Expand Down
9 changes: 4 additions & 5 deletions process/block/interceptedBlocks/interceptedMetaBlockHeader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package interceptedBlocks
import (
"fmt"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
"github.com/ElrondNetwork/elrond-go/data/typeConverters"
"github.com/ElrondNetwork/elrond-go/hashing"
"github.com/ElrondNetwork/elrond-go/marshal"
"github.com/ElrondNetwork/elrond-go/process"
Expand All @@ -25,7 +25,6 @@ type InterceptedMetaHeader struct {
hash []byte
validityAttester process.ValidityAttester
epochStartTrigger process.EpochStartTriggerHandler
nonceConverter typeConverters.Uint64ByteSliceConverter
}

// NewInterceptedMetaHeader creates a new instance of InterceptedMetaHeader struct
Expand All @@ -48,7 +47,6 @@ func NewInterceptedMetaHeader(arg *ArgInterceptedBlockHeader) (*InterceptedMetaH
shardCoordinator: arg.ShardCoordinator,
validityAttester: arg.ValidityAttester,
epochStartTrigger: arg.EpochStartTrigger,
nonceConverter: arg.NonceConverter,
}
inHdr.processFields(arg.HdrBuff)

Expand Down Expand Up @@ -147,9 +145,10 @@ func (imh *InterceptedMetaHeader) String() string {

// Identifiers returns the identifiers used in requests
func (imh *InterceptedMetaHeader) Identifiers() [][]byte {
nonceBytes := imh.nonceConverter.ToByteSlice(imh.hdr.Nonce)
keyNonce := []byte(fmt.Sprintf("%d-%d", core.MetachainShardId, imh.hdr.Nonce))
keyEpoch := []byte(core.EpochStartIdentifier(imh.hdr.Epoch))

return [][]byte{imh.hash, nonceBytes}
return [][]byte{imh.hash, keyNonce, keyEpoch}
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ func createDefaultMetaArgument() *interceptedBlocks.ArgInterceptedBlockHeader {
HeaderIntegrityVerifier: &mock.HeaderIntegrityVerifierStub{},
ValidityAttester: &mock.ValidityAttesterStub{},
EpochStartTrigger: &mock.EpochStartTriggerStub{},
NonceConverter: mock.NewNonceHashConverterMock(),
}

hdr := createMockMetaHeader()
Expand Down
6 changes: 6 additions & 0 deletions process/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -829,3 +829,9 @@ var ErrInvalidUnitValue = errors.New("invalid unit value")

// ErrInvalidBadPeerThreshold signals that an invalid bad peer threshold has been provided
var ErrInvalidBadPeerThreshold = errors.New("invalid bad peer threshold")

// ErrNilPeerValidatorMapper signals that nil peer validator mapper has been provided
var ErrNilPeerValidatorMapper = errors.New("nil peer validator mapper")

// ErrOnlyValidatorsCanUseThisTopic signals that topic can be used by validator only
var ErrOnlyValidatorsCanUseThisTopic = errors.New("only validators can use this topic")

0 comments on commit 1d88984

Please sign in to comment.