Skip to content

Commit

Permalink
Merge c657818 into f05af2b
Browse files Browse the repository at this point in the history
  • Loading branch information
sstanculeanu committed Jun 9, 2023
2 parents f05af2b + c657818 commit c28cf02
Show file tree
Hide file tree
Showing 31 changed files with 798 additions and 283 deletions.
4 changes: 2 additions & 2 deletions dataRetriever/dataPool/dataPool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func TestNewDataPool_NilPeerAuthenticationsShouldErr(t *testing.T) {
args.PeerAuthentications = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilPeerAuthenticationPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilPeerAuthenticationPool))
assert.Nil(t, tdp)
}

Expand All @@ -139,7 +139,7 @@ func TestNewDataPool_NilHeartbeatsShouldErr(t *testing.T) {
args.Heartbeats = nil
tdp, err := dataPool.NewDataPool(args)

assert.Equal(t, dataRetriever.ErrNilHeartbeatPool, err)
assert.True(t, errors.Is(err, dataRetriever.ErrNilHeartbeatPool))
assert.Nil(t, tdp)
}

Expand Down
4 changes: 3 additions & 1 deletion epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -1305,7 +1306,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
Messenger: e.messenger,
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down
130 changes: 93 additions & 37 deletions factory/heartbeat/heartbeatV2Components.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/multiversx/mx-chain-go/heartbeat/processor"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
"github.com/multiversx/mx-chain-go/heartbeat/status"
"github.com/multiversx/mx-chain-go/p2p"
processFactory "github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/process/peer"
"github.com/multiversx/mx-chain-go/update"
Expand Down Expand Up @@ -53,12 +54,13 @@ type heartbeatV2ComponentsFactory struct {
}

type heartbeatV2Components struct {
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
directConnectionProcessor update.Closer
sender update.Closer
peerAuthRequestsProcessor update.Closer
shardSender update.Closer
monitor factory.HeartbeatV2Monitor
statusHandler update.Closer
mainDirectConnectionProcessor update.Closer
fullArchiveDirectConnectionProcessor update.Closer
}

// NewHeartbeatV2ComponentsFactory creates a new instance of heartbeatV2ComponentsFactory
Expand Down Expand Up @@ -100,7 +102,10 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {
return errors.ErrNilNetworkComponentsHolder
}
if check.IfNil(args.NetworkComponents.NetworkMessenger()) {
return errors.ErrNilMessenger
return fmt.Errorf("%w for main", errors.ErrNilMessenger)
}
if check.IfNil(args.NetworkComponents.FullArchiveNetworkMessenger()) {
return fmt.Errorf("%w for full archive", errors.ErrNilMessenger)
}
if check.IfNil(args.CryptoComponents) {
return errors.ErrNilCryptoComponentsHolder
Expand All @@ -120,17 +125,9 @@ func checkHeartbeatV2FactoryArgs(args ArgHeartbeatV2ComponentsFactory) error {

// Create creates the heartbeatV2 components
func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error) {
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.PeerAuthenticationTopic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return nil, err
}
}
if !hcf.networkComponents.NetworkMessenger().HasTopic(common.HeartbeatV2Topic) {
err := hcf.networkComponents.NetworkMessenger().CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return nil, err
}
err := hcf.createTopicsIfNeeded()
if err != nil {
return nil, err
}

cfg := hcf.config.HeartbeatV2
Expand All @@ -157,11 +154,12 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsSender := sender.ArgSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
PeerAuthenticationTopic: common.PeerAuthenticationTopic,
HeartbeatTopic: heartbeatTopic,
PeerAuthenticationTimeBetweenSends: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsInSec),
PeerAuthenticationTimeBetweenSendsWhenError: time.Second * time.Duration(cfg.PeerAuthenticationTimeBetweenSendsWhenErrorInSec),
PeerAuthenticationTimeThresholdBetweenSends: cfg.PeerAuthenticationTimeThresholdBetweenSends,
HeartbeatTimeBetweenSends: time.Second * time.Duration(cfg.HeartbeatTimeBetweenSendsInSec),
Expand Down Expand Up @@ -209,7 +207,8 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
}

argsPeerShardSender := sender.ArgPeerShardSender{
Messenger: hcf.networkComponents.NetworkMessenger(),
MainMessenger: hcf.networkComponents.NetworkMessenger(),
FullArchiveMessenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
Marshaller: hcf.coreComponents.InternalMarshalizer(),
ShardCoordinator: hcf.bootstrapComponents.ShardCoordinator(),
TimeBetweenSends: time.Second * time.Duration(cfg.PeerShardTimeBetweenSendsInSec),
Expand Down Expand Up @@ -247,42 +246,95 @@ func (hcf *heartbeatV2ComponentsFactory) Create() (*heartbeatV2Components, error
return nil, err
}

argsDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
argsMainDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.NetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
directConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsDirectConnectionProcessor)
mainDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsMainDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsFullArchiveDirectConnectionProcessor := processor.ArgsDirectConnectionProcessor{
TimeToReadDirectConnections: time.Second * time.Duration(cfg.TimeToReadDirectConnectionsInSec),
Messenger: hcf.networkComponents.FullArchiveNetworkMessenger(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
BaseIntraShardTopic: common.ConsensusTopic,
BaseCrossShardTopic: processFactory.MiniBlocksTopic,
}
fullArchiveDirectConnectionProcessor, err := processor.NewDirectConnectionProcessor(argsFullArchiveDirectConnectionProcessor)
if err != nil {
return nil, err
}

argsCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
argsMainCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(),
}
crossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsCrossShardPeerTopicNotifier)
mainCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsMainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(crossShardPeerTopicNotifier)
err = hcf.networkComponents.NetworkMessenger().AddPeerTopicNotifier(mainCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

argsFullArchiveCrossShardPeerTopicNotifier := monitor.ArgsCrossShardPeerTopicNotifier{
ShardCoordinator: hcf.processComponents.ShardCoordinator(),
PeerShardMapper: hcf.processComponents.PeerShardMapper(), // TODO[Sorin]: replace this with the full archive psm
}
fullArchiveCrossShardPeerTopicNotifier, err := monitor.NewCrossShardPeerTopicNotifier(argsFullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err

Check warning on line 294 in factory/heartbeat/heartbeatV2Components.go

View check run for this annotation

Codecov / codecov/patch

factory/heartbeat/heartbeatV2Components.go#L294

Added line #L294 was not covered by tests
}
err = hcf.networkComponents.FullArchiveNetworkMessenger().AddPeerTopicNotifier(fullArchiveCrossShardPeerTopicNotifier)
if err != nil {
return nil, err
}

return &heartbeatV2Components{
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
directConnectionProcessor: directConnectionProcessor,
sender: heartbeatV2Sender,
peerAuthRequestsProcessor: paRequestsProcessor,
shardSender: shardSender,
monitor: heartbeatsMonitor,
statusHandler: statusHandler,
mainDirectConnectionProcessor: mainDirectConnectionProcessor,
fullArchiveDirectConnectionProcessor: fullArchiveDirectConnectionProcessor,
}, nil
}

func (hcf *heartbeatV2ComponentsFactory) createTopicsIfNeeded() error {
err := createTopicsIfNeededOnMessenger(hcf.networkComponents.NetworkMessenger())
if err != nil {
return err
}

return createTopicsIfNeededOnMessenger(hcf.networkComponents.FullArchiveNetworkMessenger())
}

func createTopicsIfNeededOnMessenger(messenger p2p.Messenger) error {
if !messenger.HasTopic(common.PeerAuthenticationTopic) {
err := messenger.CreateTopic(common.PeerAuthenticationTopic, true)
if err != nil {
return err
}
}
if !messenger.HasTopic(common.HeartbeatV2Topic) {
err := messenger.CreateTopic(common.HeartbeatV2Topic, true)
if err != nil {
return err
}
}

return nil
}

// Close closes the heartbeat components
func (hc *heartbeatV2Components) Close() error {
log.Debug("calling close on heartbeatV2 components")
Expand All @@ -303,8 +355,12 @@ func (hc *heartbeatV2Components) Close() error {
log.LogIfError(hc.statusHandler.Close())
}

if !check.IfNil(hc.directConnectionProcessor) {
log.LogIfError(hc.directConnectionProcessor.Close())
if !check.IfNil(hc.mainDirectConnectionProcessor) {
log.LogIfError(hc.mainDirectConnectionProcessor.Close())
}

if !check.IfNil(hc.fullArchiveDirectConnectionProcessor) {
log.LogIfError(hc.fullArchiveDirectConnectionProcessor.Close())
}

return nil
Expand Down

0 comments on commit c28cf02

Please sign in to comment.