Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full archive heartbeat sender + new data pools #5332

Merged
Merged
4 changes: 2 additions & 2 deletions dataRetriever/dataPool/dataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewDataPool_NilPeerAuthenticationsShouldErr(t *testing.T) {
args.PeerAuthentications = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilPeerAuthenticationPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilPeerAuthenticationPool))
assert.Nil(t, tdp)
}

Expand All @@ -139,7 +139,7 @@ func TestNewDataPool_NilHeartbeatsShouldErr(t *testing.T) {
args.Heartbeats = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilHeartbeatPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilHeartbeatPool))
assert.Nil(t, tdp)
}

Expand Down
4 changes: 3 additions & 1 deletion epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -1305,7 +1306,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
Messenger: e.messenger,
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down
130 changes: 93 additions & 37 deletions factory/heartbeat/heartbeatV2Components.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/multiversx/mx-chain-go/heartbeat/processor"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
"github.com/multiversx/mx-chain-go/heartbeat/status"
"github.com/multiversx/mx-chain-go/p2p"
processFactory "github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/process/peer"
"github.com/multiversx/mx-chain-go/update"
Expand Down Expand Up @@ -53,12 +54,13 @@ type heartbeatV2ComponentsFactory struct {
}

type heartbeatV2Components struct {
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
directConnectionProcessor update.Closer
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
mainDirectConnectionProcessor update.Closer
fullArchiveDirectConnectionProcessor update.Closer
}

// NewHeartbeatV2ComponentsFactory creates a new instance of heartbeatV2ComponentsFactory
Expand Down Expand Up @@ -100,7 +102,10 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {
return errors.ErrNilNetworkComponentsHolder
}
if check.IfNil(args.NetworkComponents.NetworkMessenger()) {
return errors.ErrNilMessenger
return fmt.Errorf("%w for main", errors.ErrNilMessenger)
}
if check.IfNil(args.NetworkComponents.FullArchiveNetworkMessenger()) {
return fmt.Errorf("%w for full archive", errors.ErrNilMessenger)
}
if check.IfNil(args.CryptoComponents) {
return errors.ErrNilCryptoComponentsHolder
Expand All @@ -120,17 +125,9 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {

// Create creates the heartbeatV2 components
func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error) {
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.PeerAuthenticationTopic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return nil, err
}
}
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.HeartbeatV2Topic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return nil, err
}
err := hcf.createTopicsIfNeeded()
if err != nil {
return nil, err
}

cfg := hcf.config.HeartbeatV2
Expand All @@ -157,11 +154,12 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsSender := sender.ArgSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
PeerAuthenticationTimeBetweenSendsWhenError: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsWhenErrorInSec),
PeerAuthenticationTimeThresholdBetweenSends: cfg.PeerAuthenticationTimeThresholdBetweenSends,
HeartbeatTimeBetweenSends: time.Second * time.Duration(cfg.HeartbeatTimeBetweenSendsInSec),
Expand Down Expand Up @@ -209,7 +207,8 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsPeerShardSender := sender.ArgPeerShardSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
ShardCoordinator: hcf.bootstrapComponents.ShardCoordinator(),
TimeBetweenSends: time.Second * time.Duration(cfg.PeerShardTimeBetweenSendsInSec),
Expand Down Expand Up @@ -247,42 +246,95 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
return nil, err
}

argsDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
argsMainDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.NetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
directConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsDirectConnectionProcessor)
mainDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsMainDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsFullArchiveDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
fullArchiveDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsFullArchiveDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
argsMainCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
}
crossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsCrossShardPeerTopicNotifier)
mainCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsMainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(crossShardPeerTopicNotifier)
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(mainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

argsFullArchiveCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
}
fullArchiveCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsFullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}
err = hcf.networkComponents.FullArchiveNetworkMessenger().AddPeerTopicNotifier(fullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

return &heartbeatV2Components{
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
directConnectionProcessor: directConnectionProcessor,
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
mainDirectConnectionProcessor: mainDirectConnectionProcessor,
fullArchiveDirectConnectionProcessor: fullArchiveDirectConnectionProcessor,
}, nil
}

func (hcf *heartbeatV2ComponentsFactory) createTopicsIfNeeded() error {
err := createTopicsIfNeededOnMessenger(hcf.networkComponents.NetworkMessenger())
if err != nil {
return err
}

return createTopicsIfNeededOnMessenger(hcf.networkComponents.FullArchiveNetworkMessenger())
}

func createTopicsIfNeededOnMessenger(messenger p2p.Messenger) error {
if !messenger.HasTopic(common.PeerAuthenticationTopic) {
err := messenger.CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return err
}
}
if !messenger.HasTopic(common.HeartbeatV2Topic) {
err := messenger.CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return err
}
}

return nil
}

// Close closes the heartbeat components
func (hc *heartbeatV2Components) Close() error {
log.Debug("calling close on heartbeatV2 components")
Expand All @@ -303,8 +355,12 @@ func (hc *heartbeatV2Components) Close() error {
log.LogIfError(hc.statusHandler.Close())
}

if !check.IfNil(hc.directConnectionProcessor) {
log.LogIfError(hc.directConnectionProcessor.Close())
if !check.IfNil(hc.mainDirectConnectionProcessor) {
log.LogIfError(hc.mainDirectConnectionProcessor.Close())
}

if !check.IfNil(hc.fullArchiveDirectConnectionProcessor) {
log.LogIfError(hc.fullArchiveDirectConnectionProcessor.Close())
}

return nil
Expand Down