Skip to content

Commit

Permalink
Merge pull request #5899 from multiversx/trie-resolvers-critical-section
Browse files Browse the repository at this point in the history
Performance degradation for v1.6.15 fix
  • Loading branch information
gabi-vuls committed Feb 14, 2024
2 parents 34e2f28 + 1dfc18f commit b5be263
Show file tree
Hide file tree
Showing 14 changed files with 191 additions and 138 deletions.
1 change: 1 addition & 0 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -519,6 +519,7 @@
[Antiflood]
Enabled = true
NumConcurrentResolverJobs = 50
NumConcurrentResolvingTrieNodesJobs = 3
[Antiflood.FastReacting]
IntervalInSeconds = 1
ReservedPercent = 20.0
Expand Down
19 changes: 10 additions & 9 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,15 +362,16 @@ type TxAccumulatorConfig struct {

// AntifloodConfig will hold all p2p antiflood parameters
type AntifloodConfig struct {
Enabled bool
NumConcurrentResolverJobs int32
OutOfSpecs FloodPreventerConfig
FastReacting FloodPreventerConfig
SlowReacting FloodPreventerConfig
PeerMaxOutput AntifloodLimitsConfig
Cache CacheConfig
Topic TopicAntifloodConfig
TxAccumulator TxAccumulatorConfig
Enabled bool
NumConcurrentResolverJobs int32
NumConcurrentResolvingTrieNodesJobs int32
OutOfSpecs FloodPreventerConfig
FastReacting FloodPreventerConfig
SlowReacting FloodPreventerConfig
PeerMaxOutput AntifloodLimitsConfig
Cache CacheConfig
Topic TopicAntifloodConfig
TxAccumulator TxAccumulatorConfig
}

// FloodPreventerConfig will hold all flood preventer parameters
Expand Down
35 changes: 18 additions & 17 deletions dataRetriever/factory/resolverscontainer/args.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,22 @@ import (

// FactoryArgs will hold the arguments for ResolversContainerFactory for both shard and meta
type FactoryArgs struct {
NumConcurrentResolvingJobs int32
ShardCoordinator sharding.Coordinator
MainMessenger p2p.Messenger
FullArchiveMessenger p2p.Messenger
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
MainPreferredPeersHolder p2p.PreferredPeersHolderHandler
FullArchivePreferredPeersHolder p2p.PreferredPeersHolderHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
PayloadValidator dataRetriever.PeerAuthenticationPayloadValidator
NumConcurrentResolvingJobs int32
NumConcurrentResolvingTrieNodesJobs int32
ShardCoordinator sharding.Coordinator
MainMessenger p2p.Messenger
FullArchiveMessenger p2p.Messenger
Store dataRetriever.StorageService
Marshalizer marshal.Marshalizer
DataPools dataRetriever.PoolsHolder
Uint64ByteSliceConverter typeConverters.Uint64ByteSliceConverter
DataPacker dataRetriever.DataPacker
TriesContainer common.TriesHolder
InputAntifloodHandler dataRetriever.P2PAntifloodHandler
OutputAntifloodHandler dataRetriever.P2PAntifloodHandler
MainPreferredPeersHolder p2p.PreferredPeersHolderHandler
FullArchivePreferredPeersHolder p2p.PreferredPeersHolderHandler
SizeCheckDelta uint32
IsFullHistoryNode bool
PayloadValidator dataRetriever.PeerAuthenticationPayloadValidator
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type baseResolversContainerFactory struct {
inputAntifloodHandler dataRetriever.P2PAntifloodHandler
outputAntifloodHandler dataRetriever.P2PAntifloodHandler
throttler dataRetriever.ResolverThrottler
trieNodesThrottler dataRetriever.ResolverThrottler
intraShardTopic string
isFullHistoryNode bool
mainPreferredPeersHolder dataRetriever.PreferredPeersHolderHandler
Expand Down Expand Up @@ -78,7 +79,10 @@ func (brcf *baseResolversContainerFactory) checkParams() error {
return fmt.Errorf("%w for output", dataRetriever.ErrNilAntifloodHandler)
}
if check.IfNil(brcf.throttler) {
return dataRetriever.ErrNilThrottler
return fmt.Errorf("%w for the main throttler", dataRetriever.ErrNilThrottler)

Check warning on line 82 in dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go

View check run for this annotation

Codecov / codecov/patch

dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go#L82

Added line #L82 was not covered by tests
}
if check.IfNil(brcf.trieNodesThrottler) {
return fmt.Errorf("%w for the trie nodes throttler", dataRetriever.ErrNilThrottler)

Check warning on line 85 in dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go

View check run for this annotation

Codecov / codecov/patch

dataRetriever/factory/resolverscontainer/baseResolversContainerFactory.go#L85

Added line #L85 was not covered by tests
}
if check.IfNil(brcf.mainPreferredPeersHolder) {
return fmt.Errorf("%w for main network", dataRetriever.ErrNilPreferredPeersHolder)
Expand Down Expand Up @@ -351,7 +355,7 @@ func (brcf *baseResolversContainerFactory) createTrieNodesResolver(
SenderResolver: resolverSender,
Marshaller: brcf.marshalizer,
AntifloodHandler: brcf.inputAntifloodHandler,
Throttler: brcf.throttler,
Throttler: brcf.trieNodesThrottler,
},
TrieDataGetter: trie,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,12 @@ func NewMetaResolversContainerFactory(
args.Marshalizer = marshal.NewSizeCheckUnmarshalizer(args.Marshalizer, args.SizeCheckDelta)
}

thr, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
mainThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
if err != nil {
return nil, err
}

trieNodesThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingTrieNodesJobs)
if err != nil {
return nil, err
}
Expand All @@ -46,7 +51,8 @@ func NewMetaResolversContainerFactory(
triesContainer: args.TriesContainer,
inputAntifloodHandler: args.InputAntifloodHandler,
outputAntifloodHandler: args.OutputAntifloodHandler,
throttler: thr,
throttler: mainThrottler,
trieNodesThrottler: trieNodesThrottler,
isFullHistoryNode: args.IsFullHistoryNode,
mainPreferredPeersHolder: args.MainPreferredPeersHolder,
fullArchivePreferredPeersHolder: args.FullArchivePreferredPeersHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,15 @@ func TestNewMetaResolversContainerFactory_NewNumGoRoutinesThrottlerFailsShouldEr

args := getArgumentsMeta()
args.NumConcurrentResolvingJobs = 0

rcf, err := resolverscontainer.NewMetaResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)

args.NumConcurrentResolvingJobs = 10
args.NumConcurrentResolvingTrieNodesJobs = 0

rcf, err = resolverscontainer.NewMetaResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)
}
Expand Down Expand Up @@ -357,21 +364,22 @@ func TestMetaResolversContainerFactory_IsInterfaceNil(t *testing.T) {

func getArgumentsMeta() resolverscontainer.FactoryArgs {
return resolverscontainer.FactoryArgs{
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubMessengerForMeta("", ""),
FullArchiveMessenger: createStubMessengerForMeta("", ""),
Store: createStoreForMeta(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForMeta(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForMeta(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createStubMessengerForMeta("", ""),
FullArchiveMessenger: createStubMessengerForMeta("", ""),
Store: createStoreForMeta(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForMeta(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForMeta(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
NumConcurrentResolvingTrieNodesJobs: 3,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ func NewShardResolversContainerFactory(
args.Marshalizer = marshal.NewSizeCheckUnmarshalizer(args.Marshalizer, args.SizeCheckDelta)
}

thr, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
mainThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingJobs)
if err != nil {
return nil, err
}

trieNodesThrottler, err := throttler.NewNumGoRoutinesThrottler(args.NumConcurrentResolvingTrieNodesJobs)
if err != nil {
return nil, err
}
Expand All @@ -44,7 +49,8 @@ func NewShardResolversContainerFactory(
triesContainer: args.TriesContainer,
inputAntifloodHandler: args.InputAntifloodHandler,
outputAntifloodHandler: args.OutputAntifloodHandler,
throttler: thr,
throttler: mainThrottler,
trieNodesThrottler: trieNodesThrottler,
isFullHistoryNode: args.IsFullHistoryNode,
mainPreferredPeersHolder: args.MainPreferredPeersHolder,
fullArchivePreferredPeersHolder: args.FullArchivePreferredPeersHolder,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,15 @@ func TestNewShardResolversContainerFactory_NewNumGoRoutinesThrottlerFailsShouldE

args := getArgumentsShard()
args.NumConcurrentResolvingJobs = 0

rcf, err := resolverscontainer.NewShardResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)

args.NumConcurrentResolvingJobs = 10
args.NumConcurrentResolvingTrieNodesJobs = 0

rcf, err = resolverscontainer.NewShardResolversContainerFactory(args)
assert.Nil(t, rcf)
assert.Equal(t, core.ErrNotPositiveValue, err)
}
Expand Down Expand Up @@ -465,21 +472,22 @@ func TestShardResolversContainerFactory_IsInterfaceNil(t *testing.T) {

func getArgumentsShard() resolverscontainer.FactoryArgs {
return resolverscontainer.FactoryArgs{
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createMessengerStubForShard("", ""),
FullArchiveMessenger: createMessengerStubForShard("", ""),
Store: createStoreForShard(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForShard(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForShard(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
ShardCoordinator: mock.NewOneShardCoordinatorMock(),
MainMessenger: createMessengerStubForShard("", ""),
FullArchiveMessenger: createMessengerStubForShard("", ""),
Store: createStoreForShard(),
Marshalizer: &mock.MarshalizerMock{},
DataPools: createDataPoolsForShard(),
Uint64ByteSliceConverter: &mock.Uint64ByteSliceConverterMock{},
DataPacker: &mock.DataPackerStub{},
TriesContainer: createTriesHolderForShard(),
SizeCheckDelta: 0,
InputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
OutputAntifloodHandler: &mock.P2PAntifloodHandlerStub{},
NumConcurrentResolvingJobs: 10,
NumConcurrentResolvingTrieNodesJobs: 3,
MainPreferredPeersHolder: &p2pmocks.PeersHolderStub{},
FullArchivePreferredPeersHolder: &p2pmocks.PeersHolderStub{},
PayloadValidator: &testscommon.PeerAuthenticationPayloadValidatorStub{},
}
}
12 changes: 12 additions & 0 deletions dataRetriever/resolvers/trieNodeResolver.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package resolvers

import (
"sync"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/batch"
Expand All @@ -20,6 +22,7 @@ type ArgTrieNodeResolver struct {

// TrieNodeResolver is a wrapper over Resolver that is specialized in resolving trie node requests
type TrieNodeResolver struct {
mutCriticalSection sync.Mutex
*baseResolver
messageProcessor
trieDataGetter dataRetriever.TrieDataGetter
Expand Down Expand Up @@ -104,6 +107,9 @@ func (tnRes *TrieNodeResolver) resolveMultipleHashes(hashesBuff []byte, message
}

func (tnRes *TrieNodeResolver) resolveOnlyRequestedHashes(hashes [][]byte, nodes map[string]struct{}) (int, bool) {
tnRes.mutCriticalSection.Lock()
defer tnRes.mutCriticalSection.Unlock()

spaceUsed := 0
usedAllSpace := false
remainingSpace := core.MaxBufferSizeToSendTrieNodes
Expand All @@ -129,6 +135,9 @@ func (tnRes *TrieNodeResolver) resolveOnlyRequestedHashes(hashes [][]byte, nodes
}

func (tnRes *TrieNodeResolver) resolveSubTries(hashes [][]byte, nodes map[string]struct{}, spaceUsedAlready int) {
tnRes.mutCriticalSection.Lock()
defer tnRes.mutCriticalSection.Unlock()

var serializedNodes [][]byte
var err error
var serializedNode []byte
Expand Down Expand Up @@ -168,7 +177,10 @@ func convertMapToSlice(m map[string]struct{}) [][]byte {
}

func (tnRes *TrieNodeResolver) resolveOneHash(hash []byte, chunkIndex uint32, message p2p.MessageP2P, source p2p.MessageHandler) error {
tnRes.mutCriticalSection.Lock()
serializedNode, err := tnRes.trieDataGetter.GetSerializedNode(hash)
tnRes.mutCriticalSection.Unlock()

if err != nil {
return err
}
Expand Down
33 changes: 17 additions & 16 deletions epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -1210,22 +1210,23 @@ func (e *epochStartBootstrap) createResolversContainer() error {
// this one should only be used before determining the correct shard where the node should reside
log.Debug("epochStartBootstrap.createRequestHandler", "shard", e.shardCoordinator.SelfId())
resolversContainerArgs := resolverscontainer.FactoryArgs{
ShardCoordinator: e.shardCoordinator,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
NumConcurrentResolvingJobs: 10,
DataPacker: dataPacker,
TriesContainer: e.trieContainer,
SizeCheckDelta: 0,
InputAntifloodHandler: disabled.NewAntiFloodHandler(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
MainPreferredPeersHolder: disabled.NewPreferredPeersHolder(),
FullArchivePreferredPeersHolder: disabled.NewPreferredPeersHolder(),
PayloadValidator: payloadValidator,
ShardCoordinator: e.shardCoordinator,
MainMessenger: e.mainMessenger,
FullArchiveMessenger: e.fullArchiveMessenger,
Store: storageService,
Marshalizer: e.coreComponentsHolder.InternalMarshalizer(),
DataPools: e.dataPool,
Uint64ByteSliceConverter: uint64ByteSlice.NewBigEndianConverter(),
NumConcurrentResolvingJobs: 10,
NumConcurrentResolvingTrieNodesJobs: 3,
DataPacker: dataPacker,
TriesContainer: e.trieContainer,
SizeCheckDelta: 0,
InputAntifloodHandler: disabled.NewAntiFloodHandler(),
OutputAntifloodHandler: disabled.NewAntiFloodHandler(),
MainPreferredPeersHolder: disabled.NewPreferredPeersHolder(),
FullArchivePreferredPeersHolder: disabled.NewPreferredPeersHolder(),
PayloadValidator: payloadValidator,
}
resolverFactory, err := resolverscontainer.NewMetaResolversContainerFactory(resolversContainerArgs)
if err != nil {
Expand Down

0 comments on commit b5be263

Please sign in to comment.