Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/trie sync optimizations #5291

Merged
merged 35 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3028163
pass iterator channels to syncer
ssd04 May 8, 2023
acc9cc2
add more goroutines and channels control on sync finish
ssd04 May 9, 2023
0bca902
upadte maintrie usage
ssd04 May 10, 2023
9a29aa7
more logs for debugging
ssd04 May 10, 2023
e506c95
mutex clenaup + nit account check
ssd04 May 10, 2023
4d737ba
mutex protation and cleanup
ssd04 May 10, 2023
38c26ae
remove roothash nil vars
ssd04 May 10, 2023
91448e2
fix mutes for syncer
ssd04 May 10, 2023
50cad3b
get leaves also from storeTrieNode
ssd04 May 11, 2023
f8401bd
unit tests for constructors in syncer
ssd04 May 12, 2023
3f1faa6
add unit tests for sync accounts
ssd04 May 12, 2023
7bba7e9
cleanup unused code
ssd04 May 12, 2023
9a4cb9e
fixes after review: added more unit tests
ssd04 May 15, 2023
5a1414d
Merge pull request #5239 from multiversx/state-syncer-unit-tests
ssd04 May 15, 2023
4e25822
add unit tests for account leaves chan
ssd04 May 15, 2023
f393d2c
Merge branch 'feat/trie-sync-optimizations' into trie-sync-improvements
ssd04 May 15, 2023
6068c06
update unit tests + remove debugging code
ssd04 May 15, 2023
784a0b2
remove duplicated leaf handling
ssd04 May 16, 2023
89b1643
add leaves channel for trie syncer version 1 and 2
ssd04 May 16, 2023
da22403
remove logs and unused variables
ssd04 May 16, 2023
b486628
remove log trace
ssd04 May 16, 2023
0b1b774
added sync logs
ssd04 May 16, 2023
cfe47dd
fixes after review: refactorings
ssd04 May 17, 2023
d58e024
refactored to use only leaves chan
ssd04 May 17, 2023
a714373
fix trie sync tests
ssd04 May 17, 2023
096e952
fixes after review: renamings and small optimizations
ssd04 May 19, 2023
64cb479
remove doubled error log
ssd04 May 22, 2023
90977a3
Merge pull request #5226 from multiversx/trie-sync-improvements
ssd04 May 22, 2023
33cf83d
Merge branch 'rc/v1.6.0' into merge-rc-1.6.0-trie-sync-optimizations
ssd04 May 29, 2023
1798719
fixes after merge
ssd04 May 29, 2023
53f43fc
Merge pull request #5293 from multiversx/merge-rc-1.6.0-trie-sync-opt…
ssd04 May 29, 2023
dc0c7f4
Merge branch 'rc/v1.6.0' into merge-rc-1.6.0-trie-sync-optimizations
ssd04 May 30, 2023
01b0e13
fix merge conflicts
ssd04 May 30, 2023
6b8fb77
conflict fixes after merge - revert print
ssd04 May 30, 2023
5d6daec
Merge pull request #5309 from multiversx/merge-rc-1.6.0-trie-sync-opt…
ssd04 May 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions common/channels.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package common

import "github.com/multiversx/mx-chain-core-go/core"

// GetClosedUnbufferedChannel returns an instance of a 'chan struct{}' that is already closed
func GetClosedUnbufferedChannel() chan struct{} {
ch := make(chan struct{})
close(ch)

return ch
}

// CloseKeyValueHolderChan will close the channel if not nil
func CloseKeyValueHolderChan(ch chan core.KeyValueHolder) {
if ch != nil {
close(ch)
}
}
4 changes: 4 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,10 @@ const (
// TrieLeavesChannelDefaultCapacity represents the default value to be used as capacity for getting all trie leaves on
// a channel
TrieLeavesChannelDefaultCapacity = 100

// TrieLeavesChannelSyncCapacity represents the value to be used as capacity for getting main trie
// leaf nodes for trie sync
TrieLeavesChannelSyncCapacity = 1000
)

// ApiOutputFormat represents the format type returned by api
Expand Down
16 changes: 6 additions & 10 deletions state/syncer/baseAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type baseAccountsSyncer struct {
timeoutHandler trie.TimeoutHandler
shardId uint32
cacher storage.Cacher
rootHash []byte
maxTrieLevelInMemory uint
name string
maxHardCapForMissingNodes int
Expand Down Expand Up @@ -96,15 +95,11 @@ func (b *baseAccountsSyncer) syncMainTrie(
rootHash []byte,
trieTopic string,
ctx context.Context,
) (common.Trie, error) {
b.rootHash = rootHash
leavesChan chan core.KeyValueHolder,
) error {
atomic.AddInt32(&b.numMaxTries, 1)

log.Trace("syncing main trie", "roothash", rootHash)
dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.enableEpochsHandler, b.maxTrieLevelInMemory)
if err != nil {
return nil, err
}

b.dataTries[string(rootHash)] = struct{}{}
arg := trie.ArgTrieSyncer{
Expand All @@ -119,22 +114,23 @@ func (b *baseAccountsSyncer) syncMainTrie(
TimeoutHandler: b.timeoutHandler,
MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes,
CheckNodesOnDisk: b.checkNodesOnDisk,
LeavesChan: leavesChan,
}
trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion)
if err != nil {
return nil, err
return err
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return nil, err
return err
}

atomic.AddInt32(&b.numTriesSynced, 1)

log.Trace("finished syncing main trie", "roothash", rootHash)

return dataTrie.Recreate(rootHash)
return nil
}

func (b *baseAccountsSyncer) printStatisticsAndUpdateMetrics(ctx context.Context) {
Expand Down
116 changes: 116 additions & 0 deletions state/syncer/baseAccoutnsSyncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package syncer_test

import (
"testing"
"time"

"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/hashingMocks"
"github.com/multiversx/mx-chain-go/testscommon/marshallerMock"
"github.com/multiversx/mx-chain-go/testscommon/statusHandler"
"github.com/multiversx/mx-chain-go/testscommon/storageManager"
"github.com/stretchr/testify/require"
)

func getDefaultBaseAccSyncerArgs() syncer.ArgsNewBaseAccountsSyncer {
return syncer.ArgsNewBaseAccountsSyncer{
Hasher: &hashingMocks.HasherMock{},
Marshalizer: marshallerMock.MarshalizerMock{},
TrieStorageManager: &storageManager.StorageManagerStub{},
RequestHandler: &testscommon.RequestHandlerStub{},
Timeout: time.Second,
Cacher: testscommon.NewCacherMock(),
UserAccountsSyncStatisticsHandler: &testscommon.SizeSyncStatisticsHandlerStub{},
AppStatusHandler: &statusHandler.AppStatusHandlerStub{},
EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{},
MaxTrieLevelInMemory: 5,
MaxHardCapForMissingNodes: 100,
TrieSyncerVersion: 3,
CheckNodesOnDisk: false,
}
}

func TestBaseAccountsSyncer_CheckArgs(t *testing.T) {
t.Parallel()

t.Run("nil hasher", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Hasher = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilHasher, err)
})

t.Run("nil marshaller", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Marshalizer = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilMarshalizer, err)
})

t.Run("nil trie storage manager", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.TrieStorageManager = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilStorageManager, err)
})

t.Run("nil requests handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.RequestHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilRequestHandler, err)
})

t.Run("nil cacher", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Cacher = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilCacher, err)
})

t.Run("nil user accounts sync statistics handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.UserAccountsSyncStatisticsHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilSyncStatisticsHandler, err)
})

t.Run("nil app status handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.AppStatusHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilAppStatusHandler, err)
})

t.Run("invalid max hard capacity for missing nodes", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.MaxHardCapForMissingNodes = 0
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrInvalidMaxHardCapForMissingNodes, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

require.Nil(t, syncer.CheckBaseAccountsSyncerArgs(getDefaultBaseAccSyncerArgs()))
})
}
25 changes: 25 additions & 0 deletions state/syncer/export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
package syncer

import (
"context"

"github.com/multiversx/mx-chain-go/common"
)

// UserAccountsSyncer -
type UserAccountsSyncer = userAccountsSyncer

// ValidatorAccountsSyncer -
type ValidatorAccountsSyncer = validatorAccountsSyncer

// CheckBaseAccountsSyncerArgs -
func CheckBaseAccountsSyncerArgs(args ArgsNewBaseAccountsSyncer) error {
return checkArgs(args)
}

// SyncAccountDataTries -
func (u *userAccountsSyncer) SyncAccountDataTries(
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
return u.syncAccountDataTries(leavesChannels, ctx)
}

// GetNumHandlers -
func (mtnn *missingTrieNodesNotifier) GetNumHandlers() int {
return len(mtnn.handlers)
Expand Down
82 changes: 36 additions & 46 deletions state/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/trie"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
logger "github.com/multiversx/mx-chain-logger-go"
)

Expand Down Expand Up @@ -86,7 +84,6 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
timeoutHandler: timeoutHandler,
shardId: args.ShardId,
cacher: args.Cacher,
rootHash: nil,
maxTrieLevelInMemory: args.MaxTrieLevelInMemory,
name: fmt.Sprintf("user accounts for shard %s", core.GetShardIDString(args.ShardId)),
maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes,
Expand Down Expand Up @@ -126,23 +123,40 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte, storageMarker common.

go u.printStatisticsAndUpdateMetrics(ctx)

mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx)
if err != nil {
return err
leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelSyncCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}

defer func() {
_ = mainTrie.Close()
wgSyncMainTrie := &sync.WaitGroup{}
wgSyncMainTrie.Add(1)

go func() {
err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels.LeavesChan)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)
}

common.CloseKeyValueHolderChan(leavesChannels.LeavesChan)

wgSyncMainTrie.Done()
}()

log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries))
err := u.syncAccountDataTries(leavesChannels, ctx)
if err != nil {
return err
}

wgSyncMainTrie.Wait()

err = u.syncAccountDataTries(mainTrie, ctx)
err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager())
storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager)

log.Debug("main trie and data tries synced", "main trie root hash", rootHash, "num data tries", len(u.dataTries))

return nil
}
Expand Down Expand Up @@ -185,6 +199,7 @@ func (u *userAccountsSyncer) createAndStartSyncer(
TimeoutHandler: u.timeoutHandler,
MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes,
CheckNodesOnDisk: checkNodesOnDisk,
LeavesChan: nil, // not used for data tries
}
trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion)
if err != nil {
Expand Down Expand Up @@ -221,33 +236,15 @@ func (u *userAccountsSyncer) updateDataTrieStatistics(trieSyncer trie.TrieSyncer
}

func (u *userAccountsSyncer) syncAccountDataTries(
mainTrie common.Trie,
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
defer u.printDataTrieStatistics()

mainRootHash, err := mainTrie.RootHash()
if err != nil {
return err
if leavesChannels == nil {
return trie.ErrNilTrieIteratorChannels
}

leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}
err = mainTrie.GetAllLeavesOnChannel(
leavesChannels,
context.Background(),
mainRootHash,
keyBuilder.NewDisabledKeyBuilder(),
parsers.NewMainTrieLeafParser(),
)
if err != nil {
return err
}
defer u.printDataTrieStatistics()

var errFound error
errMutex := sync.Mutex{}
wg := sync.WaitGroup{}
argsAccCreation := state.ArgsAccountCreation{
Hasher: u.hasher,
Expand All @@ -259,11 +256,11 @@ func (u *userAccountsSyncer) syncAccountDataTries(

account, err := state.NewUserAccountFromBytes(leaf.Value(), argsAccCreation)
if err != nil {
log.Trace("this must be a leaf with code", "err", err)
log.Trace("this must be a leaf with code", "leaf key", leaf.Key(), "err", err)
continue
}

if len(account.RootHash) == 0 {
if common.IsEmptyTrie(account.RootHash) {
continue
}

Expand All @@ -280,11 +277,9 @@ func (u *userAccountsSyncer) syncAccountDataTries(
defer u.throttler.EndProcessing()

log.Trace("sync data trie", "roothash", trieRootHash)
newErr := u.syncDataTrie(trieRootHash, address, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
err := u.syncDataTrie(trieRootHash, address, ctx)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)
}
atomic.AddInt32(&u.numTriesSynced, 1)
log.Trace("finished sync data trie", "roothash", trieRootHash)
Expand All @@ -294,12 +289,7 @@ func (u *userAccountsSyncer) syncAccountDataTries(

wg.Wait()

err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

return errFound
return nil
}

func (u *userAccountsSyncer) printDataTrieStatistics() {
Expand Down