Skip to content

Commit

Permalink
Merge 5d6daec into e58a866
Browse files Browse the repository at this point in the history
  • Loading branch information
ssd04 committed May 30, 2023
2 parents e58a866 + 5d6daec commit 2cfa72a
Show file tree
Hide file tree
Showing 18 changed files with 890 additions and 163 deletions.
9 changes: 9 additions & 0 deletions common/channels.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
package common

import "github.com/multiversx/mx-chain-core-go/core"

// GetClosedUnbufferedChannel returns an instance of a 'chan struct{}' that is already closed
func GetClosedUnbufferedChannel() chan struct{} {
ch := make(chan struct{})
close(ch)

return ch
}

// CloseKeyValueHolderChan will close the channel if not nil
func CloseKeyValueHolderChan(ch chan core.KeyValueHolder) {
if ch != nil {
close(ch)

Check warning on line 16 in common/channels.go

View check run for this annotation

Codecov / codecov/patch

common/channels.go#L14-L16

Added lines #L14 - L16 were not covered by tests
}
}
4 changes: 4 additions & 0 deletions common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,10 @@ const (
// TrieLeavesChannelDefaultCapacity represents the default value to be used as capacity for getting all trie leaves on
// a channel
TrieLeavesChannelDefaultCapacity = 100

// TrieLeavesChannelSyncCapacity represents the value to be used as capacity for getting main trie
// leaf nodes for trie sync
TrieLeavesChannelSyncCapacity = 1000
)

// ApiOutputFormat represents the format type returned by api
Expand Down
16 changes: 6 additions & 10 deletions state/syncer/baseAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ type baseAccountsSyncer struct {
timeoutHandler trie.TimeoutHandler
shardId uint32
cacher storage.Cacher
rootHash []byte
maxTrieLevelInMemory uint
name string
maxHardCapForMissingNodes int
Expand Down Expand Up @@ -96,15 +95,11 @@ func (b *baseAccountsSyncer) syncMainTrie(
rootHash []byte,
trieTopic string,
ctx context.Context,
) (common.Trie, error) {
b.rootHash = rootHash
leavesChan chan core.KeyValueHolder,
) error {
atomic.AddInt32(&b.numMaxTries, 1)

log.Trace("syncing main trie", "roothash", rootHash)
dataTrie, err := trie.NewTrie(b.trieStorageManager, b.marshalizer, b.hasher, b.enableEpochsHandler, b.maxTrieLevelInMemory)
if err != nil {
return nil, err
}

b.dataTries[string(rootHash)] = struct{}{}
arg := trie.ArgTrieSyncer{
Expand All @@ -119,22 +114,23 @@ func (b *baseAccountsSyncer) syncMainTrie(
TimeoutHandler: b.timeoutHandler,
MaxHardCapForMissingNodes: b.maxHardCapForMissingNodes,
CheckNodesOnDisk: b.checkNodesOnDisk,
LeavesChan: leavesChan,
}
trieSyncer, err := trie.CreateTrieSyncer(arg, b.trieSyncerVersion)
if err != nil {
return nil, err
return err

Check warning on line 121 in state/syncer/baseAccountsSyncer.go

View check run for this annotation

Codecov / codecov/patch

state/syncer/baseAccountsSyncer.go#L121

Added line #L121 was not covered by tests
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return nil, err
return err

Check warning on line 126 in state/syncer/baseAccountsSyncer.go

View check run for this annotation

Codecov / codecov/patch

state/syncer/baseAccountsSyncer.go#L126

Added line #L126 was not covered by tests
}

atomic.AddInt32(&b.numTriesSynced, 1)

log.Trace("finished syncing main trie", "roothash", rootHash)

return dataTrie.Recreate(rootHash)
return nil
}

func (b *baseAccountsSyncer) printStatisticsAndUpdateMetrics(ctx context.Context) {
Expand Down
116 changes: 116 additions & 0 deletions state/syncer/baseAccoutnsSyncer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package syncer_test

import (
"testing"
"time"

"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/syncer"
"github.com/multiversx/mx-chain-go/testscommon"
"github.com/multiversx/mx-chain-go/testscommon/enableEpochsHandlerMock"
"github.com/multiversx/mx-chain-go/testscommon/hashingMocks"
"github.com/multiversx/mx-chain-go/testscommon/marshallerMock"
"github.com/multiversx/mx-chain-go/testscommon/statusHandler"
"github.com/multiversx/mx-chain-go/testscommon/storageManager"
"github.com/stretchr/testify/require"
)

func getDefaultBaseAccSyncerArgs() syncer.ArgsNewBaseAccountsSyncer {
return syncer.ArgsNewBaseAccountsSyncer{
Hasher: &hashingMocks.HasherMock{},
Marshalizer: marshallerMock.MarshalizerMock{},
TrieStorageManager: &storageManager.StorageManagerStub{},
RequestHandler: &testscommon.RequestHandlerStub{},
Timeout: time.Second,
Cacher: testscommon.NewCacherMock(),
UserAccountsSyncStatisticsHandler: &testscommon.SizeSyncStatisticsHandlerStub{},
AppStatusHandler: &statusHandler.AppStatusHandlerStub{},
EnableEpochsHandler: &enableEpochsHandlerMock.EnableEpochsHandlerStub{},
MaxTrieLevelInMemory: 5,
MaxHardCapForMissingNodes: 100,
TrieSyncerVersion: 3,
CheckNodesOnDisk: false,
}
}

func TestBaseAccountsSyncer_CheckArgs(t *testing.T) {
t.Parallel()

t.Run("nil hasher", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Hasher = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilHasher, err)
})

t.Run("nil marshaller", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Marshalizer = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilMarshalizer, err)
})

t.Run("nil trie storage manager", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.TrieStorageManager = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilStorageManager, err)
})

t.Run("nil requests handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.RequestHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilRequestHandler, err)
})

t.Run("nil cacher", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.Cacher = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilCacher, err)
})

t.Run("nil user accounts sync statistics handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.UserAccountsSyncStatisticsHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilSyncStatisticsHandler, err)
})

t.Run("nil app status handler", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.AppStatusHandler = nil
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrNilAppStatusHandler, err)
})

t.Run("invalid max hard capacity for missing nodes", func(t *testing.T) {
t.Parallel()

args := getDefaultBaseAccSyncerArgs()
args.MaxHardCapForMissingNodes = 0
err := syncer.CheckBaseAccountsSyncerArgs(args)
require.Equal(t, state.ErrInvalidMaxHardCapForMissingNodes, err)
})

t.Run("should work", func(t *testing.T) {
t.Parallel()

require.Nil(t, syncer.CheckBaseAccountsSyncerArgs(getDefaultBaseAccSyncerArgs()))
})
}
25 changes: 25 additions & 0 deletions state/syncer/export_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,30 @@
package syncer

import (
"context"

"github.com/multiversx/mx-chain-go/common"
)

// UserAccountsSyncer -
type UserAccountsSyncer = userAccountsSyncer

// ValidatorAccountsSyncer -
type ValidatorAccountsSyncer = validatorAccountsSyncer

// CheckBaseAccountsSyncerArgs -
func CheckBaseAccountsSyncerArgs(args ArgsNewBaseAccountsSyncer) error {
return checkArgs(args)
}

// SyncAccountDataTries -
func (u *userAccountsSyncer) SyncAccountDataTries(
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
return u.syncAccountDataTries(leavesChannels, ctx)
}

// GetNumHandlers -
func (mtnn *missingTrieNodesNotifier) GetNumHandlers() int {
return len(mtnn.handlers)
Expand Down
82 changes: 36 additions & 46 deletions state/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@ import (
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/state"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/trie"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
logger "github.com/multiversx/mx-chain-logger-go"
)

Expand Down Expand Up @@ -86,7 +84,6 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
timeoutHandler: timeoutHandler,
shardId: args.ShardId,
cacher: args.Cacher,
rootHash: nil,
maxTrieLevelInMemory: args.MaxTrieLevelInMemory,
name: fmt.Sprintf("user accounts for shard %s", core.GetShardIDString(args.ShardId)),
maxHardCapForMissingNodes: args.MaxHardCapForMissingNodes,
Expand Down Expand Up @@ -126,23 +123,40 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte, storageMarker common.

go u.printStatisticsAndUpdateMetrics(ctx)

mainTrie, err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx)
if err != nil {
return err
leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelSyncCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}

defer func() {
_ = mainTrie.Close()
wgSyncMainTrie := &sync.WaitGroup{}
wgSyncMainTrie.Add(1)

go func() {
err := u.syncMainTrie(rootHash, factory.AccountTrieNodesTopic, ctx, leavesChannels.LeavesChan)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)

Check warning on line 137 in state/syncer/userAccountsSyncer.go

View check run for this annotation

Codecov / codecov/patch

state/syncer/userAccountsSyncer.go#L137

Added line #L137 was not covered by tests
}

common.CloseKeyValueHolderChan(leavesChannels.LeavesChan)

wgSyncMainTrie.Done()
}()

log.Debug("main trie synced, starting to sync data tries", "num data tries", len(u.dataTries))
err := u.syncAccountDataTries(leavesChannels, ctx)
if err != nil {
return err

Check warning on line 147 in state/syncer/userAccountsSyncer.go

View check run for this annotation

Codecov / codecov/patch

state/syncer/userAccountsSyncer.go#L147

Added line #L147 was not covered by tests
}

wgSyncMainTrie.Wait()

err = u.syncAccountDataTries(mainTrie, ctx)
err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

storageMarker.MarkStorerAsSyncedAndActive(mainTrie.GetStorageManager())
storageMarker.MarkStorerAsSyncedAndActive(u.trieStorageManager)

log.Debug("main trie and data tries synced", "main trie root hash", rootHash, "num data tries", len(u.dataTries))

return nil
}
Expand Down Expand Up @@ -185,6 +199,7 @@ func (u *userAccountsSyncer) createAndStartSyncer(
TimeoutHandler: u.timeoutHandler,
MaxHardCapForMissingNodes: u.maxHardCapForMissingNodes,
CheckNodesOnDisk: checkNodesOnDisk,
LeavesChan: nil, // not used for data tries
}
trieSyncer, err := trie.CreateTrieSyncer(arg, u.trieSyncerVersion)
if err != nil {
Expand Down Expand Up @@ -221,33 +236,15 @@ func (u *userAccountsSyncer) updateDataTrieStatistics(trieSyncer trie.TrieSyncer
}

func (u *userAccountsSyncer) syncAccountDataTries(
mainTrie common.Trie,
leavesChannels *common.TrieIteratorChannels,
ctx context.Context,
) error {
defer u.printDataTrieStatistics()

mainRootHash, err := mainTrie.RootHash()
if err != nil {
return err
if leavesChannels == nil {
return trie.ErrNilTrieIteratorChannels
}

leavesChannels := &common.TrieIteratorChannels{
LeavesChan: make(chan core.KeyValueHolder, common.TrieLeavesChannelDefaultCapacity),
ErrChan: errChan.NewErrChanWrapper(),
}
err = mainTrie.GetAllLeavesOnChannel(
leavesChannels,
context.Background(),
mainRootHash,
keyBuilder.NewDisabledKeyBuilder(),
parsers.NewMainTrieLeafParser(),
)
if err != nil {
return err
}
defer u.printDataTrieStatistics()

var errFound error
errMutex := sync.Mutex{}
wg := sync.WaitGroup{}
argsAccCreation := state.ArgsAccountCreation{
Hasher: u.hasher,
Expand All @@ -259,11 +256,11 @@ func (u *userAccountsSyncer) syncAccountDataTries(

account, err := state.NewUserAccountFromBytes(leaf.Value(), argsAccCreation)
if err != nil {
log.Trace("this must be a leaf with code", "err", err)
log.Trace("this must be a leaf with code", "leaf key", leaf.Key(), "err", err)
continue
}

if len(account.RootHash) == 0 {
if common.IsEmptyTrie(account.RootHash) {
continue
}

Expand All @@ -280,11 +277,9 @@ func (u *userAccountsSyncer) syncAccountDataTries(
defer u.throttler.EndProcessing()

log.Trace("sync data trie", "roothash", trieRootHash)
newErr := u.syncDataTrie(trieRootHash, address, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
err := u.syncDataTrie(trieRootHash, address, ctx)
if err != nil {
leavesChannels.ErrChan.WriteInChanNonBlocking(err)

Check warning on line 282 in state/syncer/userAccountsSyncer.go

View check run for this annotation

Codecov / codecov/patch

state/syncer/userAccountsSyncer.go#L282

Added line #L282 was not covered by tests
}
atomic.AddInt32(&u.numTriesSynced, 1)
log.Trace("finished sync data trie", "roothash", trieRootHash)
Expand All @@ -294,12 +289,7 @@ func (u *userAccountsSyncer) syncAccountDataTries(

wg.Wait()

err = leavesChannels.ErrChan.ReadFromChanNonBlocking()
if err != nil {
return err
}

return errFound
return nil
}

func (u *userAccountsSyncer) printDataTrieStatistics() {
Expand Down

0 comments on commit 2cfa72a

Please sign in to comment.