Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full archive peer shard mapper #5337

Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 5 additions & 2 deletions epochStart/bootstrap/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ func checkArguments(args ArgsEpochStartBootstrap) error {
if check.IfNil(args.GenesisShardCoordinator) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilShardCoordinator)
}
if check.IfNil(args.Messenger) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilMessenger)
if check.IfNil(args.MainMessenger) {
return fmt.Errorf("%s on main network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.FullArchiveMessenger) {
return fmt.Errorf("%s on full archive network: %w", baseErrorMessage, epochStart.ErrNilMessenger)
}
if check.IfNil(args.EconomicsData) {
return fmt.Errorf("%s: %w", baseErrorMessage, epochStart.ErrNilEconomicsData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/disabled"
disabledFactory "github.com/multiversx/mx-chain-go/factory/disabled"
disabledGenesis "github.com/multiversx/mx-chain-go/genesis/process/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory/interceptorscontainer"
"github.com/multiversx/mx-chain-go/sharding"
Expand All @@ -29,7 +30,8 @@ type ArgsEpochStartInterceptorContainer struct {
CryptoComponents process.CryptoComponentsHolder
Config config.Config
ShardCoordinator sharding.Coordinator
Messenger process.TopicHandler
MainMessenger process.TopicHandler
FullArchiveMessenger process.TopicHandler
DataPool dataRetriever.PoolsHolder
WhiteListHandler update.WhiteListHandler
WhiteListerVerifiedTxs update.WhiteListHandler
Expand All @@ -40,24 +42,25 @@ type ArgsEpochStartInterceptorContainer struct {
HeaderIntegrityVerifier process.HeaderIntegrityVerifier
RequestHandler process.RequestHandler
SignaturesHandler process.SignaturesHandler
NodeOperationMode p2p.NodeOperation
}

// NewEpochStartInterceptorsContainer will return a real interceptors container factory, but with many disabled components
func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer) (process.InterceptorsContainer, error) {
func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer) (process.InterceptorsContainer, process.InterceptorsContainer, error) {
if check.IfNil(args.CoreComponents) {
return nil, epochStart.ErrNilCoreComponentsHolder
return nil, nil, epochStart.ErrNilCoreComponentsHolder
}
if check.IfNil(args.CryptoComponents) {
return nil, epochStart.ErrNilCryptoComponentsHolder
return nil, nil, epochStart.ErrNilCryptoComponentsHolder
}
if check.IfNil(args.CoreComponents.AddressPubKeyConverter()) {
return nil, epochStart.ErrNilPubkeyConverter
return nil, nil, epochStart.ErrNilPubkeyConverter
}

cryptoComponents := args.CryptoComponents.Clone().(process.CryptoComponentsHolder)
err := cryptoComponents.SetMultiSignerContainer(disabled.NewMultiSignerContainer())
if err != nil {
return nil, err
return nil, nil, err
}

nodesCoordinator := disabled.NewNodesCoordinator()
Expand All @@ -72,6 +75,7 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
epochStartTrigger := disabled.NewEpochStartTrigger()
// TODO: move the peerShardMapper creation before boostrapComponents
peerShardMapper := disabled.NewPeerShardMapper()
fullArchivePeerShardMapper := disabled.NewPeerShardMapper()
hardforkTrigger := disabledFactory.HardforkTrigger()

containerFactoryArgs := interceptorscontainer.CommonInterceptorsContainerFactoryArgs{
Expand All @@ -80,7 +84,8 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
Accounts: accountsAdapter,
ShardCoordinator: args.ShardCoordinator,
NodesCoordinator: nodesCoordinator,
Messenger: args.Messenger,
MainMessenger: args.MainMessenger,
FullArchiveMessenger: args.FullArchiveMessenger,
Store: storer,
DataPool: args.DataPool,
MaxTxNonceDeltaAllowed: common.MaxTxNonceDeltaAllowed,
Expand All @@ -100,24 +105,33 @@ func NewEpochStartInterceptorsContainer(args ArgsEpochStartInterceptorContainer)
PeerSignatureHandler: cryptoComponents.PeerSignatureHandler(),
SignaturesHandler: args.SignaturesHandler,
HeartbeatExpiryTimespanInSec: args.Config.HeartbeatV2.HeartbeatExpiryTimespanInSec,
PeerShardMapper: peerShardMapper,
MainPeerShardMapper: peerShardMapper,
FullArchivePeerShardMapper: fullArchivePeerShardMapper,
HardforkTrigger: hardforkTrigger,
NodeOperationMode: args.NodeOperationMode,
}

interceptorsContainerFactory, err := interceptorscontainer.NewMetaInterceptorsContainerFactory(containerFactoryArgs)
if err != nil {
return nil, err
return nil, nil, err
}

container, err := interceptorsContainerFactory.Create()
mainContainer, fullArchiveContainer, err := interceptorsContainerFactory.Create()
if err != nil {
return nil, err
return nil, nil, err
}

err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(container)
err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(mainContainer)
if err != nil {
return nil, err
return nil, nil, err
}

return container, nil
if args.NodeOperationMode == p2p.FullArchiveMode {
err = interceptorsContainerFactory.AddShardTrieNodeInterceptors(fullArchiveContainer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be left as it is but we do not support trie sync in an arbitrary epoch. Debatable if we will support at a certain time.

if err != nil {
return nil, nil, err
}
}

return mainContainer, fullArchiveContainer, nil
}
11 changes: 8 additions & 3 deletions epochStart/bootstrap/fromLocalStorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,13 @@ func (e *epochStartBootstrap) prepareEpochFromStorage() (Parameters, error) {
}

defer func() {
errClose := e.interceptorContainer.Close()
errClose := e.mainInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage interceptorContainer.Close()", "error", errClose)
log.Warn("prepareEpochFromStorage mainInterceptorContainer.Close()", "error", errClose)
}
errClose = e.fullArchiveInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage fullArchiveInterceptorContainer.Close()", "error", errClose)
}
}()

Expand All @@ -161,7 +165,8 @@ func (e *epochStartBootstrap) prepareEpochFromStorage() (Parameters, error) {
return Parameters{}, err
}

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}
Expand Down
104 changes: 67 additions & 37 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"github.com/multiversx/mx-chain-go/epochStart/bootstrap/types"
factoryDisabled "github.com/multiversx/mx-chain-go/factory/disabled"
"github.com/multiversx/mx-chain-go/heartbeat/sender"
disabledP2P "github.com/multiversx/mx-chain-go/p2p/disabled"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/block/preprocess"
"github.com/multiversx/mx-chain-go/process/heartbeat/validator"
Expand Down Expand Up @@ -93,7 +93,8 @@ type epochStartBootstrap struct {
destinationShardAsObserver uint32
coreComponentsHolder process.CoreComponentsHolder
cryptoComponentsHolder process.CryptoComponentsHolder
messenger Messenger
mainMessenger Messenger
fullArchiveMessenger Messenger
generalConfig config.Config
prefsConfig config.PreferencesConfig
flagsConfig config.ContextFlagsConfig
Expand All @@ -117,24 +118,25 @@ type epochStartBootstrap struct {
bootstrapHeartbeatSender update.Closer
trieSyncStatisticsProvider common.SizeSyncStatisticsHandler
nodeProcessingMode common.NodeProcessingMode

nodeOperationMode p2p.NodeOperation
// created components
requestHandler process.RequestHandler
interceptorContainer process.InterceptorsContainer
dataPool dataRetriever.PoolsHolder
miniBlocksSyncer epochStart.PendingMiniBlocksSyncHandler
headersSyncer epochStart.HeadersByHashSyncer
txSyncerForScheduled update.TransactionsSyncHandler
epochStartMetaBlockSyncer epochStart.StartOfEpochMetaSyncer
nodesConfigHandler StartOfEpochNodesConfigHandler
whiteListHandler update.WhiteListHandler
whiteListerVerifiedTxs update.WhiteListHandler
storageOpenerHandler storage.UnitOpenerHandler
latestStorageDataProvider storage.LatestStorageDataProviderHandler
argumentsParser process.ArgumentsParser
dataSyncerFactory types.ScheduledDataSyncerCreator
dataSyncerWithScheduled types.ScheduledDataSyncer
storageService dataRetriever.StorageService
requestHandler process.RequestHandler
mainInterceptorContainer process.InterceptorsContainer
fullArchiveInterceptorContainer process.InterceptorsContainer
dataPool dataRetriever.PoolsHolder
miniBlocksSyncer epochStart.PendingMiniBlocksSyncHandler
headersSyncer epochStart.HeadersByHashSyncer
txSyncerForScheduled update.TransactionsSyncHandler
epochStartMetaBlockSyncer epochStart.StartOfEpochMetaSyncer
nodesConfigHandler StartOfEpochNodesConfigHandler
whiteListHandler update.WhiteListHandler
whiteListerVerifiedTxs update.WhiteListHandler
storageOpenerHandler storage.UnitOpenerHandler
latestStorageDataProvider storage.LatestStorageDataProviderHandler
argumentsParser process.ArgumentsParser
dataSyncerFactory types.ScheduledDataSyncerCreator
dataSyncerWithScheduled types.ScheduledDataSyncer
storageService dataRetriever.StorageService

// gathered data
epochStartMeta data.MetaHeaderHandler
Expand Down Expand Up @@ -162,7 +164,8 @@ type ArgsEpochStartBootstrap struct {
CoreComponentsHolder process.CoreComponentsHolder
CryptoComponentsHolder process.CryptoComponentsHolder
DestinationShardAsObserver uint32
Messenger Messenger
MainMessenger Messenger
FullArchiveMessenger Messenger
GeneralConfig config.Config
PrefsConfig config.PreferencesConfig
FlagsConfig config.ContextFlagsConfig
Expand Down Expand Up @@ -201,7 +204,8 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
epochStartProvider := &epochStartBootstrap{
coreComponentsHolder: args.CoreComponentsHolder,
cryptoComponentsHolder: args.CryptoComponentsHolder,
messenger: args.Messenger,
mainMessenger: args.MainMessenger,
fullArchiveMessenger: args.FullArchiveMessenger,
generalConfig: args.GeneralConfig,
prefsConfig: args.PrefsConfig,
flagsConfig: args.FlagsConfig,
Expand All @@ -228,6 +232,11 @@ func NewEpochStartBootstrap(args ArgsEpochStartBootstrap) (*epochStartBootstrap,
shardCoordinator: args.GenesisShardCoordinator,
trieSyncStatisticsProvider: args.TrieSyncStatisticsProvider,
nodeProcessingMode: args.NodeProcessingMode,
nodeOperationMode: p2p.NormalOperation,
}

if epochStartProvider.prefsConfig.FullArchive {
epochStartProvider.nodeOperationMode = p2p.FullArchiveMode
}

whiteListCache, err := storageunit.NewCache(storageFactory.GetCacherFromConfig(epochStartProvider.generalConfig.WhiteListPool))
Expand Down Expand Up @@ -371,9 +380,14 @@ func (e *epochStartBootstrap) Bootstrap() (Parameters, error) {
}

defer func() {
errClose := e.interceptorContainer.Close()
errClose := e.mainInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage mainInterceptorContainer.Close()", "error", errClose)
}

errClose = e.fullArchiveInterceptorContainer.Close()
if errClose != nil {
log.Warn("prepareEpochFromStorage interceptorContainer.Close()", "error", errClose)
log.Warn("prepareEpochFromStorage fullArchiveInterceptorContainer.Close()", "error", errClose)
}
}()

Expand Down Expand Up @@ -418,10 +432,16 @@ func (e *epochStartBootstrap) bootstrapFromLocalStorage() (Parameters, error) {

func (e *epochStartBootstrap) cleanupOnBootstrapFinish() {
log.Debug("unregistering all message processor and un-joining all topics")
errMessenger := e.messenger.UnregisterAllMessageProcessors()
errMessenger := e.mainMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.mainMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

errMessenger = e.messenger.UnJoinAllTopics()
errMessenger = e.fullArchiveMessenger.UnregisterAllMessageProcessors()
log.LogIfError(errMessenger)

errMessenger = e.fullArchiveMessenger.UnJoinAllTopics()
log.LogIfError(errMessenger)

e.closeTrieNodes()
Expand Down Expand Up @@ -511,7 +531,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {

epochStartConfig := e.generalConfig.EpochStartConfig
metaBlockProcessor, err := NewEpochStartMetaBlockProcessor(
e.messenger,
e.mainMessenger,
e.requestHandler,
e.coreComponentsHolder.InternalMarshalizer(),
e.coreComponentsHolder.Hasher(),
Expand All @@ -527,7 +547,7 @@ func (e *epochStartBootstrap) prepareComponentsToSyncFromNetwork() error {
CoreComponentsHolder: e.coreComponentsHolder,
CryptoComponentsHolder: e.cryptoComponentsHolder,
RequestHandler: e.requestHandler,
Messenger: e.messenger,
Messenger: e.mainMessenger,
ShardCoordinator: e.shardCoordinator,
EconomicsData: e.economicsData,
WhitelistHandler: e.whiteListHandler,
Expand All @@ -550,17 +570,19 @@ func (e *epochStartBootstrap) createSyncers() error {
CryptoComponents: e.cryptoComponentsHolder,
Config: e.generalConfig,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
DataPool: e.dataPool,
WhiteListHandler: e.whiteListHandler,
WhiteListerVerifiedTxs: e.whiteListerVerifiedTxs,
ArgumentsParser: e.argumentsParser,
HeaderIntegrityVerifier: e.headerIntegrityVerifier,
RequestHandler: e.requestHandler,
SignaturesHandler: e.messenger,
SignaturesHandler: e.mainMessenger,
NodeOperationMode: e.nodeOperationMode,
}

e.interceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args)
e.mainInterceptorContainer, e.fullArchiveInterceptorContainer, err = factoryInterceptors.NewEpochStartInterceptorsContainer(args)
if err != nil {
return err
}
Expand Down Expand Up @@ -672,7 +694,8 @@ func (e *epochStartBootstrap) requestAndProcessing() (Parameters, error) {
}
log.Debug("start in epoch bootstrap: shardCoordinator", "numOfShards", e.baseData.numberOfShards, "shardId", e.baseData.shardId)

err = e.messenger.CreateTopic(common.ConsensusTopic+e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId()), true)
consensusTopic := common.ConsensusTopic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
err = e.mainMessenger.CreateTopic(consensusTopic, true)
if err != nil {
return Parameters{}, err
}
Expand Down Expand Up @@ -1191,7 +1214,7 @@ func (e *epochStartBootstrap) createResolversContainer() error {
log.Debug("epochStartBootstrap.createRequestHandler", "shard", e.shardCoordinator.SelfId())
resolversContainerArgs := resolverscontainer.FactoryArgs{
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Expand Down Expand Up @@ -1222,7 +1245,7 @@ func (e *epochStartBootstrap) createRequestHandler() error {
requestersContainerArgs := requesterscontainer.FactoryArgs{
RequesterConfig: e.generalConfig.Requesters,
ShardCoordinator: e.shardCoordinator,
Messenger: e.messenger,
Messenger: e.mainMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
Expand Down Expand Up @@ -1293,8 +1316,15 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}

heartbeatTopic := common.HeartbeatV2Topic + e.shardCoordinator.CommunicationIdentifier(e.shardCoordinator.SelfId())
if !e.messenger.HasTopic(heartbeatTopic) {
err = e.messenger.CreateTopic(heartbeatTopic, true)
if !e.mainMessenger.HasTopic(heartbeatTopic) {
err = e.mainMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
}

if !e.fullArchiveMessenger.HasTopic(heartbeatTopic) {
err = e.fullArchiveMessenger.CreateTopic(heartbeatTopic, true)
if err != nil {
return err
}
Expand All @@ -1306,8 +1336,8 @@ func (e *epochStartBootstrap) createHeartbeatSender() error {
}
heartbeatCfg := e.generalConfig.HeartbeatV2
argsHeartbeatSender := sender.ArgBootstrapSender{
MainMessenger: e.messenger,
FullArchiveMessenger: disabledP2P.NewNetworkMessenger(), // TODO[Sorin]: pass full archive messenger
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Marshaller: e.coreComponentsHolder.InternalMarshalizer(),
HeartbeatTopic: heartbeatTopic,
HeartbeatTimeBetweenSends: time.Second * time.Duration(heartbeatCfg.HeartbeatTimeBetweenSendsDuringBootstrapInSec),
Expand Down