Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance degradation for v1.6.15 fix #5899

Merged
merged 4 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@
[Antiflood]
Enabled = true
NumConcurrentResolverJobs = 50
NumConcurrentResolvingTrieNodesJobs = 3
[Antiflood.FastReacting]
IntervalInSeconds = 1
ReservedPercent = 20.0
Expand Down
19 changes: 10 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,16 @@ type TxAccumulatorConfig struct {

// AntifloodConfig will hold all p2p antiflood parameters
type AntifloodConfig struct {
Enabled bool
NumConcurrentResolverJobs int32
OutOfSpecs FloodPreventerConfig
FastReacting FloodPreventerConfig
SlowReacting FloodPreventerConfig
PeerMaxOutput AntifloodLimitsConfig
Cache CacheConfig
Topic TopicAntifloodConfig
TxAccumulator TxAccumulatorConfig
Enabled bool
NumConcurrentResolverJobs int32
NumConcurrentResolvingTrieNodesJobs int32
OutOfSpecs FloodPreventerConfig
FastReacting FloodPreventerConfig
SlowReacting FloodPreventerConfig
PeerMaxOutput AntifloodLimitsConfig
Cache CacheConfig
Topic TopicAntifloodConfig
TxAccumulator TxAccumulatorConfig
}

// FloodPreventerConfig will hold all flood preventer parameters
Expand Down
35 changes: 18 additions & 17 deletions dataRetriever/factory/resolverscontainer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ import (

// FactoryArgs will hold the arguments for ResolversContainerFactory for both shard and meta
type FactoryArgs struct {
NumConcurrentResolvingJobs int32
ShardCoordinator sharding.Coordinator
MainMessenger p2p.Messenger
FullArchiveMessenger p2p.Messenger
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
MainPreferredPeersHolder p2p.PreferredPeersHolderHandler
FullArchivePreferredPeersHolder p2p.PreferredPeersHolderHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
PayloadValidator dataRetriever.PeerAuthenticationPayloadValidator
NumConcurrentResolvingJobs int32
NumConcurrentResolvingTrieNodesJobs int32
ShardCoordinator sharding.Coordinator
MainMessenger p2p.Messenger
FullArchiveMessenger p2p.Messenger
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
MainPreferredPeersHolder p2p.PreferredPeersHolderHandler
FullArchivePreferredPeersHolder p2p.PreferredPeersHolderHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
PayloadValidator dataRetriever.PeerAuthenticationPayloadValidator
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
inputAntifloodHandler dataRetriever.P2PAntifloodHandler
outputAntifloodHandler dataRetriever.P2PAntifloodHandler
throttler dataRetriever.ResolverThrottler
trieNodesThrottler dataRetriever.ResolverThrottler
intraShardTopic string
isFullHistoryNode bool
mainPreferredPeersHolder dataRetriever.PreferredPeersHolderHandler
Expand Down Expand Up @@ -78,7 +79,10 @@
return fmt.Errorf("%w for output", dataRetriever.ErrNilAntifloodHandler)
}
if check.IfNil(brcf.throttler) {
return dataRetriever.ErrNilThrottler
return fmt.Errorf("%w for the main throttler", dataRetriever.ErrNilThrottler)

Check warning on line 82 in dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go

View check run for this annotation

Codecov / codecov/patch

dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go#L82

Added line #L82 was not covered by tests
}
if check.IfNil(brcf.trieNodesThrottler) {
return fmt.Errorf("%w for the trie nodes throttler", dataRetriever.ErrNilThrottler)

Check warning on line 85 in dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go

View check run for this annotation

Codecov / codecov/patch

dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go#L85

Added line #L85 was not covered by tests
}
if check.IfNil(brcf.mainPreferredPeersHolder) {
return fmt.Errorf("%w for main network", dataRetriever.ErrNilPreferredPeersHolder)
Expand Down Expand Up @@ -351,7 +355,7 @@
SenderResolver: resolverSender,
Marshaller: brcf.marshalizer,
AntifloodHandler: brcf.inputAntifloodHandler,
Throttler: brcf.throttler,
Throttler: brcf.trieNodesThrottler,
},
TrieDataGetter: trie,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ func NewMetaResolversContainerFactory(
args.Marshalizer = marshal.NewSizeCheckUnmarshalizer(args.Marshalizer, args.SizeCheckDelta)
}

thr, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
mainThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
if err != nil {
return nil, err
}

trieNodesThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingTrieNodesJobs)
if err != nil {
return nil, err
}
Expand All @@ -46,7 +51,8 @@ func NewMetaResolversContainerFactory(
triesContainer: args.TriesContainer,
inputAntifloodHandler: args.InputAntifloodHandler,
outputAntifloodHandler: args.OutputAntifloodHandler,
throttler: thr,
throttler: mainThrottler,
trieNodesThrottler: trieNodesThrottler,
isFullHistoryNode: args.IsFullHistoryNode,
mainPreferredPeersHolder: args.MainPreferredPeersHolder,
fullArchivePreferredPeersHolder: args.FullArchivePreferredPeersHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ func TestNewMetaResolversContainerFactory_NewNumGoRoutinesThrottlerFailsShouldEr

args := getArgumentsMeta()
args.NumConcurrentResolvingJobs = 0

rcf, err := resolverscontainer.NewMetaResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)

args.NumConcurrentResolvingJobs = 10
args.NumConcurrentResolvingTrieNodesJobs = 0

rcf, err = resolverscontainer.NewMetaResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)
}
Expand Down Expand Up @@ -357,21 +364,22 @@ func TestMetaResolversContainerFactory_IsInterfaceNil(t *testing.T) {

func getArgumentsMeta() resolverscontainer.FactoryArgs {
return resolverscontainer.FactoryArgs{
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubMessengerForMeta("", ""),
FullArchiveMessenger: createStubMessengerForMeta("", ""),
Store: createStoreForMeta(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForMeta(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForMeta(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubMessengerForMeta("", ""),
FullArchiveMessenger: createStubMessengerForMeta("", ""),
Store: createStoreForMeta(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForMeta(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForMeta(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
NumConcurrentResolvingTrieNodesJobs: 3,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ func NewShardResolversContainerFactory(
args.Marshalizer = marshal.NewSizeCheckUnmarshalizer(args.Marshalizer, args.SizeCheckDelta)
}

thr, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
mainThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
if err != nil {
return nil, err
}

trieNodesThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingTrieNodesJobs)
if err != nil {
return nil, err
}
Expand All @@ -44,7 +49,8 @@ func NewShardResolversContainerFactory(
triesContainer: args.TriesContainer,
inputAntifloodHandler: args.InputAntifloodHandler,
outputAntifloodHandler: args.OutputAntifloodHandler,
throttler: thr,
throttler: mainThrottler,
trieNodesThrottler: trieNodesThrottler,
isFullHistoryNode: args.IsFullHistoryNode,
mainPreferredPeersHolder: args.MainPreferredPeersHolder,
fullArchivePreferredPeersHolder: args.FullArchivePreferredPeersHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ func TestNewShardResolversContainerFactory_NewNumGoRoutinesThrottlerFailsShouldE

args := getArgumentsShard()
args.NumConcurrentResolvingJobs = 0

rcf, err := resolverscontainer.NewShardResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)

args.NumConcurrentResolvingJobs = 10
args.NumConcurrentResolvingTrieNodesJobs = 0

rcf, err = resolverscontainer.NewShardResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)
}
Expand Down Expand Up @@ -465,21 +472,22 @@ func TestShardResolversContainerFactory_IsInterfaceNil(t *testing.T) {

func getArgumentsShard() resolverscontainer.FactoryArgs {
return resolverscontainer.FactoryArgs{
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createMessengerStubForShard("", ""),
FullArchiveMessenger: createMessengerStubForShard("", ""),
Store: createStoreForShard(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForShard(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForShard(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createMessengerStubForShard("", ""),
FullArchiveMessenger: createMessengerStubForShard("", ""),
Store: createStoreForShard(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForShard(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForShard(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
NumConcurrentResolvingTrieNodesJobs: 3,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
}
}
12 changes: 12 additions & 0 deletions dataRetriever/resolvers/trieNodeResolver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resolvers

import (
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/batch"
Expand All @@ -20,6 +22,7 @@ type ArgTrieNodeResolver struct {

// TrieNodeResolver is a wrapper over Resolver that is specialized in resolving trie node requests
type TrieNodeResolver struct {
mutCriticalSection sync.Mutex
*baseResolver
messageProcessor
trieDataGetter dataRetriever.TrieDataGetter
Expand Down Expand Up @@ -104,6 +107,9 @@ func (tnRes *TrieNodeResolver) resolveMultipleHashes(hashesBuff []byte, message
}

func (tnRes *TrieNodeResolver) resolveOnlyRequestedHashes(hashes [][]byte, nodes map[string]struct{}) (int, bool) {
tnRes.mutCriticalSection.Lock()
defer tnRes.mutCriticalSection.Unlock()

spaceUsed := 0
usedAllSpace := false
remainingSpace := core.MaxBufferSizeToSendTrieNodes
Expand All @@ -129,6 +135,9 @@ func (tnRes *TrieNodeResolver) resolveOnlyRequestedHashes(hashes [][]byte, nodes
}

func (tnRes *TrieNodeResolver) resolveSubTries(hashes [][]byte, nodes map[string]struct{}, spaceUsedAlready int) {
tnRes.mutCriticalSection.Lock()
defer tnRes.mutCriticalSection.Unlock()

var serializedNodes [][]byte
var err error
var serializedNode []byte
Expand Down Expand Up @@ -168,7 +177,10 @@ func convertMapToSlice(m map[string]struct{}) [][]byte {
}

func (tnRes *TrieNodeResolver) resolveOneHash(hash []byte, chunkIndex uint32, message p2p.MessageP2P, source p2p.MessageHandler) error {
tnRes.mutCriticalSection.Lock()
serializedNode, err := tnRes.trieDataGetter.GetSerializedNode(hash)
tnRes.mutCriticalSection.Unlock()

if err != nil {
return err
}
Expand Down
33 changes: 17 additions & 16 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,22 +1210,23 @@ func (e *epochStartBootstrap) createResolversContainer() error {
// this one should only be used before determining the correct shard where the node should reside
log.Debug("epochStartBootstrap.createRequestHandler", "shard", e.shardCoordinator.SelfId())
resolversContainerArgs := resolverscontainer.FactoryArgs{
ShardCoordinator: e.shardCoordinator,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
NumConcurrentResolvingJobs: 10,
DataPacker: dataPacker,
TriesContainer: e.trieContainer,
SizeCheckDelta: 0,
InputAntifloodHandler: disabled.NewAntiFloodHandler(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
MainPreferredPeersHolder: disabled.NewPreferredPeersHolder(),
FullArchivePreferredPeersHolder: disabled.NewPreferredPeersHolder(),
PayloadValidator: payloadValidator,
ShardCoordinator: e.shardCoordinator,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
NumConcurrentResolvingJobs: 10,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should create a task to remove the magic numbers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done: MX-15149

NumConcurrentResolvingTrieNodesJobs: 3,
DataPacker: dataPacker,
TriesContainer: e.trieContainer,
SizeCheckDelta: 0,
InputAntifloodHandler: disabled.NewAntiFloodHandler(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
MainPreferredPeersHolder: disabled.NewPreferredPeersHolder(),
FullArchivePreferredPeersHolder: disabled.NewPreferredPeersHolder(),
PayloadValidator: payloadValidator,
}
resolverFactory, err := resolverscontainer.NewMetaResolversContainerFactory(resolversContainerArgs)
if err != nil {
Expand Down
Loading
Loading