Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full archive heartbeat sender + new data pools #5332

Merged
Merged
7 changes: 4 additions & 3 deletions dataRetriever/dataPool/dataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewDataPool_NilPeerAuthenticationsShouldErr(t *testing.T) {
args.PeerAuthentications = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilPeerAuthenticationPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilPeerAuthenticationPool))
assert.Nil(t, tdp)
}

Expand All @@ -139,7 +139,7 @@ func TestNewDataPool_NilHeartbeatsShouldErr(t *testing.T) {
args.Heartbeats = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilHeartbeatPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilHeartbeatPool))
assert.Nil(t, tdp)
}

Expand Down Expand Up @@ -249,6 +249,7 @@ func TestNewDataPool_Close(t *testing.T) {

tnExpectedErr := errors.New("tn expected error")
paExpectedErr := errors.New("pa expected error")
faExpectedErr := errors.New("fa expected error")
args := createMockDataPoolArgs()
tnCalled, paCalled := false, false
args.TrieNodes = &testscommon.CacherStub{
Expand All @@ -266,7 +267,7 @@ func TestNewDataPool_Close(t *testing.T) {
tdp, _ := dataPool.NewDataPool(args)
assert.NotNil(t, tdp)
err := tdp.Close()
assert.Equal(t, paExpectedErr, err)
assert.Equal(t, faExpectedErr, err)
assert.True(t, tnCalled)
assert.True(t, paCalled)
})
Expand Down
4 changes: 3 additions & 1 deletion epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -1305,7 +1306,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
Messenger: e.messenger,
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down
130 changes: 93 additions & 37 deletions factory/heartbeat/heartbeatV2Components.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/multiversx/mx-chain-go/heartbeat/processor"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
"github.com/multiversx/mx-chain-go/heartbeat/status"
"github.com/multiversx/mx-chain-go/p2p"
processFactory "github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/process/peer"
"github.com/multiversx/mx-chain-go/update"
Expand Down Expand Up @@ -53,12 +54,13 @@ type heartbeatV2ComponentsFactory struct {
}

type heartbeatV2Components struct {
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
directConnectionProcessor update.Closer
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
mainDirectConnectionProcessor update.Closer
fullArchiveDirectConnectionProcessor update.Closer
}

// NewHeartbeatV2ComponentsFactory creates a new instance of heartbeatV2ComponentsFactory
Expand Down Expand Up @@ -100,7 +102,10 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {
return errors.ErrNilNetworkComponentsHolder
}
if check.IfNil(args.NetworkComponents.NetworkMessenger()) {
return errors.ErrNilMessenger
return fmt.Errorf("%w for main", errors.ErrNilMessenger)
}
if check.IfNil(args.NetworkComponents.FullArchiveNetworkMessenger()) {
return fmt.Errorf("%w for full archive", errors.ErrNilMessenger)
}
if check.IfNil(args.CryptoComponents) {
return errors.ErrNilCryptoComponentsHolder
Expand All @@ -120,17 +125,9 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {

// Create creates the heartbeatV2 components
func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error) {
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.PeerAuthenticationTopic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return nil, err
}
}
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.HeartbeatV2Topic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return nil, err
}
err := hcf.createTopicsIfNeeded()
if err != nil {
return nil, err
}

cfg := hcf.config.HeartbeatV2
Expand All @@ -157,11 +154,12 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsSender := sender.ArgSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
PeerAuthenticationTimeBetweenSendsWhenError: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsWhenErrorInSec),
PeerAuthenticationTimeThresholdBetweenSends: cfg.PeerAuthenticationTimeThresholdBetweenSends,
HeartbeatTimeBetweenSends: time.Second * time.Duration(cfg.HeartbeatTimeBetweenSendsInSec),
Expand Down Expand Up @@ -209,7 +207,8 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsPeerShardSender := sender.ArgPeerShardSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
ShardCoordinator: hcf.bootstrapComponents.ShardCoordinator(),
TimeBetweenSends: time.Second * time.Duration(cfg.PeerShardTimeBetweenSendsInSec),
Expand Down Expand Up @@ -247,42 +246,95 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
return nil, err
}

argsDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
argsMainDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.NetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
directConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsDirectConnectionProcessor)
mainDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsMainDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsFullArchiveDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
fullArchiveDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsFullArchiveDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
argsMainCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
}
crossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsCrossShardPeerTopicNotifier)
mainCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsMainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(crossShardPeerTopicNotifier)
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(mainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

argsFullArchiveCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
}
fullArchiveCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsFullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}
err = hcf.networkComponents.FullArchiveNetworkMessenger().AddPeerTopicNotifier(fullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

return &heartbeatV2Components{
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
directConnectionProcessor: directConnectionProcessor,
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
mainDirectConnectionProcessor: mainDirectConnectionProcessor,
fullArchiveDirectConnectionProcessor: fullArchiveDirectConnectionProcessor,
}, nil
}

func (hcf *heartbeatV2ComponentsFactory) createTopicsIfNeeded() error {
err := createTopicIfNeededOnMessenger(hcf.networkComponents.NetworkMessenger())
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return err
}

return createTopicIfNeededOnMessenger(hcf.networkComponents.FullArchiveNetworkMessenger())
}

func createTopicIfNeededOnMessenger(messenger p2p.Messenger) error {
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
if !messenger.HasTopic(common.PeerAuthenticationTopic) {
err := messenger.CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return err
}
}
if !messenger.HasTopic(common.HeartbeatV2Topic) {
err := messenger.CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return err
}
}

return nil
}

// Close closes the heartbeat components
func (hc *heartbeatV2Components) Close() error {
log.Debug("calling close on heartbeatV2 components")
Expand All @@ -303,8 +355,12 @@ func (hc *heartbeatV2Components) Close() error {
log.LogIfError(hc.statusHandler.Close())
}

if !check.IfNil(hc.directConnectionProcessor) {
log.LogIfError(hc.directConnectionProcessor.Close())
if !check.IfNil(hc.mainDirectConnectionProcessor) {
log.LogIfError(hc.mainDirectConnectionProcessor.Close())
}

if !check.IfNil(hc.fullArchiveDirectConnectionProcessor) {
log.LogIfError(hc.fullArchiveDirectConnectionProcessor.Close())
}

return nil
Expand Down
20 changes: 18 additions & 2 deletions factory/heartbeat/heartbeatV2Components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ func createMockHeartbeatV2ComponentsFactoryArgs() heartbeatComp.ArgHeartbeatV2Co
BlockChain: &testscommon.ChainHandlerStub{},
},
NetworkComponents: &testsMocks.NetworkComponentsStub{
Messenger: &p2pmocks.MessengerStub{},
Messenger: &p2pmocks.MessengerStub{},
FullArchiveNetworkMessengerField: &p2pmocks.MessengerStub{},
},
CryptoComponents: &testsMocks.CryptoComponentsStub{
PrivKey: &cryptoMocks.PrivateKeyStub{},
Expand Down Expand Up @@ -189,7 +190,19 @@ func TestNewHeartbeatV2ComponentsFactory(t *testing.T) {
}
hcf, err := heartbeatComp.NewHeartbeatV2ComponentsFactory(args)
assert.Nil(t, hcf)
assert.Equal(t, errorsMx.ErrNilMessenger, err)
assert.True(t, errors.Is(err, errorsMx.ErrNilMessenger))
})
t.Run("nil FullArchiveNetworkMessenger should error", func(t *testing.T) {
t.Parallel()

args := createMockHeartbeatV2ComponentsFactoryArgs()
args.NetworkComponents = &testsMocks.NetworkComponentsStub{
Messenger: &p2pmocks.MessengerStub{},
FullArchiveNetworkMessengerField: nil,
}
hcf, err := heartbeatComp.NewHeartbeatV2ComponentsFactory(args)
assert.Nil(t, hcf)
assert.True(t, errors.Is(err, errorsMx.ErrNilMessenger))
})
t.Run("nil CryptoComponents should error", func(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -256,6 +269,7 @@ func TestHeartbeatV2Components_Create(t *testing.T) {
return nil
},
},
FullArchiveNetworkMessengerField: &p2pmocks.MessengerStub{},
}
hcf, err := heartbeatComp.NewHeartbeatV2ComponentsFactory(args)
assert.NotNil(t, hcf)
Expand All @@ -282,6 +296,7 @@ func TestHeartbeatV2Components_Create(t *testing.T) {
return nil
},
},
FullArchiveNetworkMessengerField: &p2pmocks.MessengerStub{},
}
hcf, err := heartbeatComp.NewHeartbeatV2ComponentsFactory(args)
assert.NotNil(t, hcf)
Expand Down Expand Up @@ -439,6 +454,7 @@ func TestHeartbeatV2Components_Create(t *testing.T) {
return expectedErr
},
},
FullArchiveNetworkMessengerField: &p2pmocks.MessengerStub{},
}
hcf, err := heartbeatComp.NewHeartbeatV2ComponentsFactory(args)
assert.NotNil(t, hcf)
Expand Down
3 changes: 3 additions & 0 deletions factory/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ type NetworkComponentsHolder interface {
PreferredPeersHolderHandler() PreferredPeersHolderHandler
PeersRatingHandler() p2p.PeersRatingHandler
PeersRatingMonitor() p2p.PeersRatingMonitor
FullArchiveNetworkMessenger() p2p.Messenger
FullArchivePeersRatingHandler() p2p.PeersRatingHandler
FullArchivePeersRatingMonitor() p2p.PeersRatingMonitor
IsInterfaceNil() bool
}

Expand Down
32 changes: 25 additions & 7 deletions factory/mock/networkComponentsMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@ import (

// NetworkComponentsMock -
type NetworkComponentsMock struct {
Messenger p2p.Messenger
InputAntiFlood factory.P2PAntifloodHandler
OutputAntiFlood factory.P2PAntifloodHandler
PeerBlackList process.PeerBlackListCacher
PreferredPeersHolder factory.PreferredPeersHolderHandler
PeersRatingHandlerField p2p.PeersRatingHandler
PeersRatingMonitorField p2p.PeersRatingMonitor
Messenger p2p.Messenger
InputAntiFlood factory.P2PAntifloodHandler
OutputAntiFlood factory.P2PAntifloodHandler
PeerBlackList process.PeerBlackListCacher
PreferredPeersHolder factory.PreferredPeersHolderHandler
PeersRatingHandlerField p2p.PeersRatingHandler
PeersRatingMonitorField p2p.PeersRatingMonitor
FullArchiveNetworkMessengerField p2p.Messenger
FullArchivePeersRatingHandlerField p2p.PeersRatingHandler
FullArchivePeersRatingMonitorField p2p.PeersRatingMonitor
}

// PubKeyCacher -
Expand Down Expand Up @@ -77,6 +80,21 @@ func (ncm *NetworkComponentsMock) PeersRatingMonitor() p2p.PeersRatingMonitor {
return ncm.PeersRatingMonitorField
}

// FullArchiveNetworkMessenger -
func (ncm *NetworkComponentsMock) FullArchiveNetworkMessenger() p2p.Messenger {
return ncm.FullArchiveNetworkMessengerField
}

// FullArchivePeersRatingHandler -
func (ncm *NetworkComponentsMock) FullArchivePeersRatingHandler() p2p.PeersRatingHandler {
return ncm.FullArchivePeersRatingHandlerField
}

// FullArchivePeersRatingMonitor -
func (ncm *NetworkComponentsMock) FullArchivePeersRatingMonitor() p2p.PeersRatingMonitor {
return ncm.FullArchivePeersRatingMonitorField
}

// IsInterfaceNil -
func (ncm *NetworkComponentsMock) IsInterfaceNil() bool {
return ncm == nil
Expand Down
1 change: 0 additions & 1 deletion factory/network/networkComponentsHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,6 @@ func (mnc *managedNetworkComponents) PeersRatingMonitor() p2p.PeersRatingMonitor
}

// FullArchiveNetworkMessenger returns the p2p messenger of the full archive network
// TODO[Sorin]: add these new methods into the interface
func (mnc *managedNetworkComponents) FullArchiveNetworkMessenger() p2p.Messenger {
mnc.mutNetworkComponents.RLock()
defer mnc.mutNetworkComponents.RUnlock()
Expand Down