Skip to content

Commit

Permalink
Merge pull request #5337 from multiversx/process_components_for_fulla…
Browse files Browse the repository at this point in the history
…rchive_messenger

Full archive peer shard mapper
  • Loading branch information
sstanculeanu committed Jun 16, 2023
2 parents a6975f8 + 11cdfc9 commit 1e22b2d
Show file tree
Hide file tree
Showing 61 changed files with 1,680 additions and 1,286 deletions.
7 changes: 5 additions & 2 deletions epochStart/bootstrap/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ func checkArguments(args ArgsEpochStartBootstrap) error {
if check.IfNil(args.GenesisShardCoordinator) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilShardCoordinator)
}
if check.IfNil(args.Messenger) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilMessenger)
if check.IfNil(args.MainMessenger) {
return fmt.Errorf("%s on main network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.FullArchiveMessenger) {
return fmt.Errorf("%s on full archive network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.EconomicsData) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilEconomicsData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/disabled"
disabledFactory "github.com/multiversx/mx-chain-go/factory/disabled"
disabledGenesis "github.com/multiversx/mx-chain-go/genesis/process/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory/interceptorscontainer"
"github.com/multiversx/mx-chain-go/sharding"
Expand All @@ -29,7 +30,8 @@ type ArgsEpochStartInterceptorContainer struct {
CryptoComponents process.CryptoComponentsHolder
Config config.Config
ShardCoordinator sharding.Coordinator
Messenger process.TopicHandler
MainMessenger process.TopicHandler
FullArchiveMessenger process.TopicHandler
DataPool dataRetriever.PoolsHolder
WhiteListHandler update.WhiteListHandler
WhiteListerVerifiedTxs update.WhiteListHandler
Expand All @@ -40,24 +42,25 @@ type ArgsEpochStartInterceptorContainer struct {
HeaderIntegrityVerifier process.HeaderIntegrityVerifier
RequestHandler process.RequestHandler
SignaturesHandler process.SignaturesHandler
NodeOperationMode p2p.NodeOperation
}

// NewEpochStartInterceptorsContainer will return a real interceptors container factory, but with many disabled components
func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer) (process.InterceptorsContainer, error) {
func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer) (process.InterceptorsContainer, process.InterceptorsContainer, error) {
if check.IfNil(args.CoreComponents) {
return nil, epochStart.ErrNilCoreComponentsHolder
return nil, nil, epochStart.ErrNilCoreComponentsHolder
}
if check.IfNil(args.CryptoComponents) {
return nil, epochStart.ErrNilCryptoComponentsHolder
return nil, nil, epochStart.ErrNilCryptoComponentsHolder
}
if check.IfNil(args.CoreComponents.AddressPubKeyConverter()) {
return nil, epochStart.ErrNilPubkeyConverter
return nil, nil, epochStart.ErrNilPubkeyConverter
}

cryptoComponents := args.CryptoComponents.Clone().(process.CryptoComponentsHolder)
err := cryptoComponents.SetMultiSignerContainer(disabled.NewMultiSignerContainer())
if err != nil {
return nil, err
return nil, nil, err
}

nodesCoordinator := disabled.NewNodesCoordinator()
Expand All @@ -72,6 +75,7 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
epochStartTrigger := disabled.NewEpochStartTrigger()
// TODO: move the peerShardMapper creation before boostrapComponents
peerShardMapper := disabled.NewPeerShardMapper()
fullArchivePeerShardMapper := disabled.NewPeerShardMapper()
hardforkTrigger := disabledFactory.HardforkTrigger()

containerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{
Expand All @@ -80,7 +84,8 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
Accounts: accountsAdapter,
ShardCoordinator: args.ShardCoordinator,
NodesCoordinator: nodesCoordinator,
Messenger: args.Messenger,
MainMessenger: args.MainMessenger,
FullArchiveMessenger: args.FullArchiveMessenger,
Store: storer,
DataPool: args.DataPool,
MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed,
Expand All @@ -100,24 +105,33 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
PeerSignatureHandler: cryptoComponents.PeerSignatureHandler(),
SignaturesHandler: args.SignaturesHandler,
HeartbeatExpiryTimespanInSec: args.Config.HeartbeatV2.HeartbeatExpiryTimespanInSec,
PeerShardMapper: peerShardMapper,
MainPeerShardMapper: peerShardMapper,
FullArchivePeerShardMapper: fullArchivePeerShardMapper,
HardforkTrigger: hardforkTrigger,
NodeOperationMode: args.NodeOperationMode,
}

interceptorsContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(containerFactoryArgs)
if err != nil {
return nil, err
return nil, nil, err
}

container, err := interceptorsContainerFactory.Create()
mainContainer, fullArchiveContainer, err := interceptorsContainerFactory.Create()
if err != nil {
return nil, err
return nil, nil, err
}

err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(container)
err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(mainContainer)
if err != nil {
return nil, err
return nil, nil, err
}

return container, nil
if args.NodeOperationMode == p2p.FullArchiveMode {
err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(fullArchiveContainer)
if err != nil {
return nil, nil, err
}
}

return mainContainer, fullArchiveContainer, nil
}
11 changes: 8 additions & 3 deletions epochStart/bootstrap/fromLocalStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ func (e *epochStartBootstrap) prepareEpochFromStorage() (Parameters, error) {
}

defer func() {
errClose := e.interceptorContainer.Close()
errClose := e.mainInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage interceptorContainer.Close()", "error", errClose)
log.Warn("prepareEpochFromStorage mainInterceptorContainer.Close()", "error", errClose)
}
errClose = e.fullArchiveInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage fullArchiveInterceptorContainer.Close()", "error", errClose)
}
}()

Expand All @@ -161,7 +165,8 @@ func (e *epochStartBootstrap) prepareEpochFromStorage() (Parameters, error) {
return Parameters{}, err
}

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}
Expand Down
104 changes: 67 additions & 37 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -93,7 +93,8 @@ type epochStartBootstrap struct {
destinationShardAsObserver uint32
coreComponentsHolder process.CoreComponentsHolder
cryptoComponentsHolder process.CryptoComponentsHolder
messenger Messenger
mainMessenger Messenger
fullArchiveMessenger Messenger
generalConfig config.Config
prefsConfig config.PreferencesConfig
flagsConfig config.ContextFlagsConfig
Expand All @@ -117,24 +118,25 @@ type epochStartBootstrap struct {
bootstrapHeartbeatSender update.Closer
trieSyncStatisticsProvider common.SizeSyncStatisticsHandler
nodeProcessingMode common.NodeProcessingMode

nodeOperationMode p2p.NodeOperation
// created components
requestHandler process.RequestHandler
interceptorContainer process.InterceptorsContainer
dataPool dataRetriever.PoolsHolder
miniBlocksSyncer epochStart.PendingMiniBlocksSyncHandler
headersSyncer epochStart.HeadersByHashSyncer
txSyncerForScheduled update.TransactionsSyncHandler
epochStartMetaBlockSyncer epochStart.StartOfEpochMetaSyncer
nodesConfigHandler StartOfEpochNodesConfigHandler
whiteListHandler update.WhiteListHandler
whiteListerVerifiedTxs update.WhiteListHandler
storageOpenerHandler storage.UnitOpenerHandler
latestStorageDataProvider storage.LatestStorageDataProviderHandler
argumentsParser process.ArgumentsParser
dataSyncerFactory types.ScheduledDataSyncerCreator
dataSyncerWithScheduled types.ScheduledDataSyncer
storageService dataRetriever.StorageService
requestHandler process.RequestHandler
mainInterceptorContainer process.InterceptorsContainer
fullArchiveInterceptorContainer process.InterceptorsContainer
dataPool dataRetriever.PoolsHolder
miniBlocksSyncer epochStart.PendingMiniBlocksSyncHandler
headersSyncer epochStart.HeadersByHashSyncer
txSyncerForScheduled update.TransactionsSyncHandler
epochStartMetaBlockSyncer epochStart.StartOfEpochMetaSyncer
nodesConfigHandler StartOfEpochNodesConfigHandler
whiteListHandler update.WhiteListHandler
whiteListerVerifiedTxs update.WhiteListHandler
storageOpenerHandler storage.UnitOpenerHandler
latestStorageDataProvider storage.LatestStorageDataProviderHandler
argumentsParser process.ArgumentsParser
dataSyncerFactory types.ScheduledDataSyncerCreator
dataSyncerWithScheduled types.ScheduledDataSyncer
storageService dataRetriever.StorageService

// gathered data
epochStartMeta data.MetaHeaderHandler
Expand Down Expand Up @@ -162,7 +164,8 @@ type ArgsEpochStartBootstrap struct {
CoreComponentsHolder process.CoreComponentsHolder
CryptoComponentsHolder process.CryptoComponentsHolder
DestinationShardAsObserver uint32
Messenger Messenger
MainMessenger Messenger
FullArchiveMessenger Messenger
GeneralConfig config.Config
PrefsConfig config.PreferencesConfig
FlagsConfig config.ContextFlagsConfig
Expand Down Expand Up @@ -201,7 +204,8 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
epochStartProvider := &epochStartBootstrap{
coreComponentsHolder: args.CoreComponentsHolder,
cryptoComponentsHolder: args.CryptoComponentsHolder,
messenger: args.Messenger,
mainMessenger: args.MainMessenger,
fullArchiveMessenger: args.FullArchiveMessenger,
generalConfig: args.GeneralConfig,
prefsConfig: args.PrefsConfig,
flagsConfig: args.FlagsConfig,
Expand All @@ -228,6 +232,11 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
shardCoordinator: args.GenesisShardCoordinator,
trieSyncStatisticsProvider: args.TrieSyncStatisticsProvider,
nodeProcessingMode: args.NodeProcessingMode,
nodeOperationMode: p2p.NormalOperation,
}

if epochStartProvider.prefsConfig.FullArchive {
epochStartProvider.nodeOperationMode = p2p.FullArchiveMode
}

whiteListCache, err := storageunit.NewCache(storageFactory.GetCacherFromConfig(epochStartProvider.generalConfig.WhiteListPool))
Expand Down Expand Up @@ -371,9 +380,14 @@ func (e *epochStartBootstrap) Bootstrap() (Parameters, error) {
}

defer func() {
errClose := e.interceptorContainer.Close()
errClose := e.mainInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage mainInterceptorContainer.Close()", "error", errClose)
}

errClose = e.fullArchiveInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage interceptorContainer.Close()", "error", errClose)
log.Warn("prepareEpochFromStorage fullArchiveInterceptorContainer.Close()", "error", errClose)
}
}()

Expand Down Expand Up @@ -418,10 +432,16 @@ func (e *epochStartBootstrap) bootstrapFromLocalStorage() (Parameters, error) {

func (e *epochStartBootstrap) cleanupOnBootstrapFinish() {
log.Debug("unregistering all message processor and un-joining all topics")
errMessenger := e.messenger.UnregisterAllMessageProcessors()
errMessenger := e.mainMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.mainMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

errMessenger = e.messenger.UnJoinAllTopics()
errMessenger = e.fullArchiveMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.fullArchiveMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

e.closeTrieNodes()
Expand Down Expand Up @@ -511,7 +531,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {

epochStartConfig := e.generalConfig.EpochStartConfig
metaBlockProcessor, err := NewEpochStartMetaBlockProcessor(
e.messenger,
e.mainMessenger,
e.requestHandler,
e.coreComponentsHolder.InternalMarshalizer(),
e.coreComponentsHolder.Hasher(),
Expand All @@ -527,7 +547,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {
CoreComponentsHolder: e.coreComponentsHolder,
CryptoComponentsHolder: e.cryptoComponentsHolder,
RequestHandler: e.requestHandler,
Messenger: e.messenger,
Messenger: e.mainMessenger,
ShardCoordinator: e.shardCoordinator,
EconomicsData: e.economicsData,
WhitelistHandler: e.whiteListHandler,
Expand All @@ -550,17 +570,19 @@ func (e *epochStartBootstrap) createSyncers() error {
CryptoComponents: e.cryptoComponentsHolder,
Config: e.generalConfig,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
DataPool: e.dataPool,
WhiteListHandler: e.whiteListHandler,
WhiteListerVerifiedTxs: e.whiteListerVerifiedTxs,
ArgumentsParser: e.argumentsParser,
HeaderIntegrityVerifier: e.headerIntegrityVerifier,
RequestHandler: e.requestHandler,
SignaturesHandler: e.messenger,
SignaturesHandler: e.mainMessenger,
NodeOperationMode: e.nodeOperationMode,
}

e.interceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args)
e.mainInterceptorContainer, e.fullArchiveInterceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args)
if err != nil {
return err
}
Expand Down Expand Up @@ -672,7 +694,8 @@ func (e *epochStartBootstrap) requestAndProcessing() (Parameters, error) {
}
log.Debug("start in epoch bootstrap: shardCoordinator", "numOfShards", e.baseData.numberOfShards, "shardId", e.baseData.shardId)

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}
Expand Down Expand Up @@ -1191,7 +1214,7 @@ func (e *epochStartBootstrap) createResolversContainer() error {
log.Debug("epochStartBootstrap.createRequestHandler", "shard", e.shardCoordinator.SelfId())
resolversContainerArgs := resolverscontainer.FactoryArgs{
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Expand Down Expand Up @@ -1222,7 +1245,7 @@ func (e *epochStartBootstrap) createRequestHandler() error {
requestersContainerArgs := requesterscontainer.FactoryArgs{
RequesterConfig: e.generalConfig.Requesters,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
Expand Down Expand Up @@ -1293,8 +1316,15 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}

heartbeatTopic := common.HeartbeatV2Topic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
if !e.messenger.HasTopic(heartbeatTopic) {
err = e.messenger.CreateTopic(heartbeatTopic, true)
if !e.mainMessenger.HasTopic(heartbeatTopic) {
err = e.mainMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
}

if !e.fullArchiveMessenger.HasTopic(heartbeatTopic) {
err = e.fullArchiveMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
Expand All @@ -1306,8 +1336,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down

0 comments on commit 1e22b2d

Please sign in to comment.