Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Start snapshot after restart fix #4551

Merged
merged 2 commits into from Oct 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions epochStart/bootstrap/disabled/disabledAccountsAdapter.go
Expand Up @@ -17,6 +17,10 @@ func NewAccountsAdapter() *accountsAdapter {
return &accountsAdapter{}
}

// StartSnapshotIfNeeded -
func (a *accountsAdapter) StartSnapshotIfNeeded() {
}

// GetTrie -
func (a *accountsAdapter) GetTrie(_ []byte) (common.Trie, error) {
return nil, nil
Expand Down
8 changes: 8 additions & 0 deletions node/nodeRunner.go
Expand Up @@ -384,6 +384,14 @@ func (nr *nodeRunner) executeOneComponentCreationCycle(
return true, err
}

selfId := managedBootstrapComponents.ShardCoordinator().SelfId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please re-check this. On rc/v1.4.0 we do not have this call here

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in rc/v1.4.0 these are called in the addSyncersToAccountsDB function after setting the trie syncers.

if selfId == core.MetachainShardId {
managedStateComponents.PeerAccounts().StartSnapshotIfNeeded()
managedStateComponents.AccountsAdapter().StartSnapshotIfNeeded()
} else {
managedStateComponents.AccountsAdapter().StartSnapshotIfNeeded()
}

hardforkTrigger := managedProcessComponents.HardforkTrigger()
err = hardforkTrigger.AddCloser(nodesShufflerOut)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions process/txsimulator/wrappedAccountsDB.go
Expand Up @@ -24,6 +24,10 @@ func NewReadOnlyAccountsDB(accountsDB state.AccountsAdapter) (*readOnlyAccountsD
return &readOnlyAccountsDB{originalAccounts: accountsDB}, nil
}

// StartSnapshotIfNeeded does nothing for this implementation
func (r *readOnlyAccountsDB) StartSnapshotIfNeeded() {
}

// GetCode returns the code for the given account
func (r *readOnlyAccountsDB) GetCode(codeHash []byte) []byte {
return r.originalAccounts.GetCode(codeHash)
Expand Down
29 changes: 19 additions & 10 deletions state/accountsDB.go
Expand Up @@ -128,12 +128,6 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) {
processStatusHandler: args.ProcessStatusHandler,
}

trieStorageManager := adb.mainTrie.GetStorageManager()
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey))
if err != nil || !bytes.Equal(val, []byte(common.ActiveDBVal)) {
startSnapshotAfterRestart(adb, args)
}

return adb, nil
}

Expand All @@ -160,17 +154,16 @@ func checkArgsAccountsDB(args ArgsAccountsDB) error {
return nil
}

func startSnapshotAfterRestart(adb AccountsAdapter, args ArgsAccountsDB) {
tsm := args.Trie.GetStorageManager()
func startSnapshotAfterRestart(adb AccountsAdapter, tsm common.StorageManager, processingMode common.NodeProcessingMode) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked the file with v1.4.0, looks good 👍

epoch, err := tsm.GetLatestStorageEpoch()
if err != nil {
log.Error("could not get latest storage epoch")
}
putActiveDBMarker := epoch == 0 && err == nil
isInImportDBMode := args.ProcessingMode == common.ImportDb
isInImportDBMode := processingMode == common.ImportDb
putActiveDBMarker = putActiveDBMarker || isInImportDBMode
if putActiveDBMarker {
log.Debug("marking activeDB", "epoch", epoch, "error", err, "processing mode", args.ProcessingMode)
log.Debug("marking activeDB", "epoch", epoch, "error", err, "processing mode", processingMode)
err = tsm.Put([]byte(common.ActiveDBKey), []byte(common.ActiveDBVal))
handleLoggingWhenError("error while putting active DB value into main storer", err)
return
Expand Down Expand Up @@ -204,6 +197,22 @@ func handleLoggingWhenError(message string, err error, extraArguments ...interfa
log.Warn(message, append(args, extraArguments...)...)
}

// StartSnapshotIfNeeded starts the snapshot if the previous snapshot process was not fully completed
func (adb *AccountsDB) StartSnapshotIfNeeded() {
startSnapshotIfNeeded(adb, adb.mainTrie.GetStorageManager(), adb.processingMode)
}

func startSnapshotIfNeeded(
adb AccountsAdapter,
trieStorageManager common.StorageManager,
processingMode common.NodeProcessingMode,
) {
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey))
if err != nil || !bytes.Equal(val, []byte(common.ActiveDBVal)) {
startSnapshotAfterRestart(adb, trieStorageManager, processingMode)
}
}

// GetCode returns the code for the given account
func (adb *AccountsDB) GetCode(codeHash []byte) []byte {
if len(codeHash) == 0 {
Expand Down
4 changes: 4 additions & 0 deletions state/accountsDBApi.go
Expand Up @@ -75,6 +75,10 @@ func (accountsDB *accountsDBApi) doRecreateTrieWithBlockInfo(newBlockInfo common
return newBlockInfo, nil
}

// StartSnapshotIfNeeded does nothing for this implementation
func (accountsDB *accountsDBApi) StartSnapshotIfNeeded() {
}

// GetExistingAccount will call the inner accountsAdapter method after trying to recreate the trie
func (accountsDB *accountsDBApi) GetExistingAccount(address []byte) (vmcommon.AccountHandler, error) {
account, _, err := accountsDB.GetAccountWithBlockInfo(address, holders.NewRootHashHolderAsEmpty())
Expand Down
4 changes: 4 additions & 0 deletions state/accountsDBApiWithHistory.go
Expand Up @@ -29,6 +29,10 @@ func NewAccountsDBApiWithHistory(innerAccountsAdapter AccountsAdapter) (*account
}, nil
}

// StartSnapshotIfNeeded is a not permitted operation in this implementation and thus, does nothing
func (accountsDB *accountsDBApiWithHistory) StartSnapshotIfNeeded() {
}

// GetExistingAccount will return an error
func (accountsDB *accountsDBApiWithHistory) GetExistingAccount(_ []byte) (vmcommon.AccountHandler, error) {
return nil, ErrFunctionalityNotImplemented
Expand Down
131 changes: 0 additions & 131 deletions state/accountsDB_test.go
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"github.com/ElrondNetwork/elrond-go-core/core"
atomicFlag "github.com/ElrondNetwork/elrond-go-core/core/atomic"
"github.com/ElrondNetwork/elrond-go-core/core/check"
"github.com/ElrondNetwork/elrond-go-core/marshal"
"github.com/ElrondNetwork/elrond-go/common"
Expand Down Expand Up @@ -2359,136 +2358,6 @@ func TestAccountsDB_GetAccountFromBytesShouldLoadDataTrie(t *testing.T) {
assert.Equal(t, dataTrie, account.DataTrie())
}

func TestAccountsDB_NewAccountsDbShouldSetActiveDB(t *testing.T) {
t.Parallel()

rootHash := []byte("rootHash")
expectedErr := errors.New("expected error")
t.Run("epoch 0", func(t *testing.T) {
putCalled := false
trieStub := &trieMock.TrieStub{
RootCalled: func() ([]byte, error) {
return rootHash, nil
},
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
ShouldTakeSnapshotCalled: func() bool {
return true
},
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, nil
},
PutCalled: func(key []byte, val []byte) error {
assert.Equal(t, []byte(common.ActiveDBKey), key)
assert.Equal(t, []byte(common.ActiveDBVal), val)

putCalled = true

return nil
},
}
},
}

_ = generateAccountDBFromTrie(trieStub)

assert.True(t, putCalled)
})
t.Run("epoch 0, GetLatestStorageEpoch errors should not put", func(t *testing.T) {
trieStub := &trieMock.TrieStub{
RootCalled: func() ([]byte, error) {
return rootHash, nil
},
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
ShouldTakeSnapshotCalled: func() bool {
return true
},
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, expectedErr
},
PutCalled: func(key []byte, val []byte) error {
assert.Fail(t, "should have not called put")

return nil
},
}
},
}

_ = generateAccountDBFromTrie(trieStub)
})
t.Run("in import DB mode", func(t *testing.T) {
putCalled := false
trieStub := &trieMock.TrieStub{
RootCalled: func() ([]byte, error) {
return rootHash, nil
},
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
ShouldTakeSnapshotCalled: func() bool {
return true
},
GetLatestStorageEpochCalled: func() (uint32, error) {
return 1, nil
},
PutCalled: func(key []byte, val []byte) error {
assert.Equal(t, []byte(common.ActiveDBKey), key)
assert.Equal(t, []byte(common.ActiveDBVal), val)

putCalled = true

return nil
},
}
},
}

args := createMockAccountsDBArgs()
args.ProcessingMode = common.ImportDb
args.Trie = trieStub

_, _ = state.NewAccountsDB(args)

assert.True(t, putCalled)
})
}

func TestAccountsDB_NewAccountsDbStartsSnapshotAfterRestart(t *testing.T) {
t.Parallel()

rootHash := []byte("rootHash")
takeSnapshotCalled := atomicFlag.Flag{}
trieStub := &trieMock.TrieStub{
RootCalled: func() ([]byte, error) {
return rootHash, nil
},
GetStorageManagerCalled: func() common.StorageManager {
return &testscommon.StorageManagerStub{
GetCalled: func(key []byte) ([]byte, error) {
if bytes.Equal(key, []byte(common.ActiveDBKey)) {
return nil, fmt.Errorf("key not found")
}
return []byte("rootHash"), nil
},
ShouldTakeSnapshotCalled: func() bool {
return true
},
TakeSnapshotCalled: func(_ []byte, _ []byte, _ chan core.KeyValueHolder, _ chan error, _ common.SnapshotStatisticsHandler, _ uint32) {
takeSnapshotCalled.SetValue(true)
},
GetLatestStorageEpochCalled: func() (uint32, error) {
return 1, nil
},
}
},
}

_ = generateAccountDBFromTrie(trieStub)
time.Sleep(time.Second)
assert.True(t, takeSnapshotCalled.IsSet())
}

func BenchmarkAccountsDb_GetCodeEntry(b *testing.B) {
maxTrieLevelInMemory := uint(5)
marshaller := &testscommon.MarshalizerMock{}
Expand Down
1 change: 1 addition & 0 deletions state/interface.go
Expand Up @@ -125,6 +125,7 @@ type AccountsAdapter interface {
RecreateAllTries(rootHash []byte) (map[string]common.Trie, error)
GetTrie(rootHash []byte) (common.Trie, error)
GetStackDebugFirstEntry() []byte
StartSnapshotIfNeeded()
Close() error
IsInterfaceNil() bool
}
Expand Down
12 changes: 5 additions & 7 deletions state/peerAccountsDB.go
@@ -1,7 +1,6 @@
package state

import (
"bytes"
"fmt"
"sync"

Expand Down Expand Up @@ -39,15 +38,14 @@ func NewPeerAccountsDB(args ArgsAccountsDB) (*PeerAccountsDB, error) {
},
}

trieStorageManager := adb.mainTrie.GetStorageManager()
val, err := trieStorageManager.GetFromCurrentEpoch([]byte(common.ActiveDBKey))
if err != nil || !bytes.Equal(val, []byte(common.ActiveDBVal)) {
startSnapshotAfterRestart(adb, args)
}

return adb, nil
}

// StartSnapshotIfNeeded starts the snapshot if the previous snapshot process was not fully completed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checked with the rc/v1.4.0, looks good

func (adb *PeerAccountsDB) StartSnapshotIfNeeded() {
startSnapshotIfNeeded(adb, adb.mainTrie.GetStorageManager(), adb.processingMode)
}

// MarkSnapshotDone will mark that the snapshot process has been completed
func (adb *PeerAccountsDB) MarkSnapshotDone() {
trieStorageManager, epoch, err := adb.getTrieStorageManagerAndLatestEpoch()
Expand Down