Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lastSnapshot marker fix #5642

Merged
merged 20 commits into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
76f7576
change gas cost for migrate data trie built in function
BeniaminDrasovean Sep 13, 2023
cb56dd8
Merge branch 'rc/v1.6.0' into change-gas-cost-for-data-trie-migration
BeniaminDrasovean Sep 13, 2023
2fde00e
add print for num migrated leaves
BeniaminDrasovean Sep 18, 2023
8ab37f0
Merge branch 'rc/v1.6.0' into change-gas-cost-for-data-trie-migration
raduchis Sep 18, 2023
ccbf8a7
update trieLoadPerNode gas cost
BeniaminDrasovean Sep 20, 2023
353c1dc
create a new component which handles the lastSnapshot marker
BeniaminDrasovean Oct 6, 2023
72b7643
fix test race conditions
BeniaminDrasovean Oct 6, 2023
ded414a
Merge branch 'rc/v1.6.0' into change-gas-cost-for-data-trie-migration
BeniaminDrasovean Oct 11, 2023
60191ea
- fixed create release workflow
iulianpascalau Oct 12, 2023
7caabe9
Merge branch 'rc/v1.6.0' into change-gas-cost-for-data-trie-migration
BeniaminDrasovean Oct 12, 2023
19a7a43
- fixing workflow file
iulianpascalau Oct 12, 2023
c47d9c0
Merge pull request #5579 from multiversx/change-gas-cost-for-data-tri…
BeniaminDrasovean Oct 12, 2023
42209c2
Merge branch 'rc/v1.6.0' into fix-create-release-workflow
iulianpascalau Oct 12, 2023
3afc232
Merge pull request #5655 from multiversx/fix-create-release-workflow
iulianpascalau Oct 12, 2023
e514d3b
update guardians tx construction example
bogdan-rosianu Oct 13, 2023
f5f3094
Merge pull request #5659 from multiversx/update-guardian-tx-construct…
bogdan-rosianu Oct 13, 2023
13cbac7
fixes after review
BeniaminDrasovean Oct 25, 2023
8e90b59
Merge branch 'rc/v1.6.0' into last-snapshot-marker-fix
BeniaminDrasovean Oct 25, 2023
92bab82
update logger name
BeniaminDrasovean Oct 26, 2023
5726d2d
Merge branch 'rc/v1.7.0' into last-snapshot-marker-fix
BeniaminDrasovean Oct 27, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
67 changes: 67 additions & 0 deletions common/storage/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package storage

import (
"context"
"fmt"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-go/common"
logger "github.com/multiversx/mx-chain-logger-go"
)

var log = logger.GetOrCreate("state")
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved

const (
// WaitTimeForSnapshotEpochCheck is the time to wait before checking the storage epoch
WaitTimeForSnapshotEpochCheck = time.Millisecond * 100

// SnapshotWaitTimeout is the timeout for waiting for the storage epoch to change
SnapshotWaitTimeout = time.Minute * 3
)

// StorageEpochChangeWaitArgs are the args needed for calling the WaitForStorageEpochChange function
type StorageEpochChangeWaitArgs struct {
TrieStorageManager common.StorageManager
Epoch uint32
WaitTimeForSnapshotEpochCheck time.Duration
SnapshotWaitTimeout time.Duration
}

// WaitForStorageEpochChange waits for the storage epoch to change to the given epoch
func WaitForStorageEpochChange(args StorageEpochChangeWaitArgs) error {
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
log.Debug("waiting for storage epoch change", "epoch", args.Epoch, "wait timeout", args.SnapshotWaitTimeout)

if args.SnapshotWaitTimeout < args.WaitTimeForSnapshotEpochCheck {
return fmt.Errorf("timeout (%s) must be greater than wait time between snapshot epoch check (%s)", args.SnapshotWaitTimeout, args.WaitTimeForSnapshotEpochCheck)
}

ctx, cancel := context.WithTimeout(context.Background(), args.SnapshotWaitTimeout)
defer cancel()
AdoAdoAdo marked this conversation as resolved.
Show resolved Hide resolved

timer := time.NewTimer(args.WaitTimeForSnapshotEpochCheck)
defer timer.Stop()

for {
timer.Reset(args.WaitTimeForSnapshotEpochCheck)

if args.TrieStorageManager.IsClosed() {
return core.ErrContextClosing
}

latestStorageEpoch, err := args.TrieStorageManager.GetLatestStorageEpoch()
if err != nil {
return err
}

if latestStorageEpoch == args.Epoch {
return nil
}

select {
case <-timer.C:
case <-ctx.Done():
return fmt.Errorf("timeout waiting for storage epoch change, snapshot epoch %d", args.Epoch)
}
}
}
93 changes: 93 additions & 0 deletions common/storage/storage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package storage

import (
"errors"
"testing"
"time"

"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-go/testscommon/storageManager"
"github.com/stretchr/testify/assert"
)

func getDefaultArgs() StorageEpochChangeWaitArgs {
return StorageEpochChangeWaitArgs{
Epoch: 1,
WaitTimeForSnapshotEpochCheck: time.Millisecond * 100,
SnapshotWaitTimeout: time.Second,
TrieStorageManager: &storageManager.StorageManagerStub{},
}
}

func TestSnapshotsManager_WaitForStorageEpochChange(t *testing.T) {
t.Parallel()

t.Run("invalid args", func(t *testing.T) {
t.Parallel()

args := getDefaultArgs()
args.SnapshotWaitTimeout = time.Millisecond

err := WaitForStorageEpochChange(args)
assert.Error(t, err)
})
t.Run("getLatestStorageEpoch error", func(t *testing.T) {
t.Parallel()

expectedError := errors.New("getLatestStorageEpoch error")

args := getDefaultArgs()
args.TrieStorageManager = &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, expectedError
},
}

err := WaitForStorageEpochChange(args)
assert.Equal(t, expectedError, err)
})
t.Run("storage manager closed error", func(t *testing.T) {
t.Parallel()

args := getDefaultArgs()
args.TrieStorageManager = &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, nil
},
IsClosedCalled: func() bool {
return true
},
}

err := WaitForStorageEpochChange(args)
assert.Equal(t, core.ErrContextClosing, err)
})
t.Run("storage epoch change timeout", func(t *testing.T) {
t.Parallel()

args := getDefaultArgs()
args.WaitTimeForSnapshotEpochCheck = time.Millisecond
args.SnapshotWaitTimeout = time.Millisecond * 5
args.TrieStorageManager = &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, nil
},
}

err := WaitForStorageEpochChange(args)
assert.Error(t, err)
})
t.Run("returns when latestStorageEpoch == snapshotEpoch", func(t *testing.T) {
t.Parallel()

args := getDefaultArgs()
args.TrieStorageManager = &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 1, nil
},
}

err := WaitForStorageEpochChange(args)
assert.Nil(t, err)
})
}
9 changes: 4 additions & 5 deletions state/accountsDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/multiversx/mx-chain-go/common/errChan"
"github.com/multiversx/mx-chain-go/common/holders"
"github.com/multiversx/mx-chain-go/state/iteratorChannelsProvider"
"github.com/multiversx/mx-chain-go/state/lastSnapshotMarker"
"github.com/multiversx/mx-chain-go/state/parsers"
"github.com/multiversx/mx-chain-go/state/stateMetrics"
"github.com/multiversx/mx-chain-go/trie/keyBuilder"
Expand All @@ -28,11 +29,8 @@ import (
)

const (
leavesChannelSize = 100
missingNodesChannelSize = 100
lastSnapshot = "lastSnapshot"
waitTimeForSnapshotEpochCheck = time.Millisecond * 100
snapshotWaitTimeout = time.Minute
leavesChannelSize = 100
missingNodesChannelSize = 100
)

type loadingMeasurements struct {
Expand Down Expand Up @@ -138,6 +136,7 @@ func NewAccountsDB(args ArgsAccountsDB) (*AccountsDB, error) {
StateMetrics: sm,
ChannelsProvider: iteratorChannelsProvider.NewUserStateIteratorChannelsProvider(),
AccountFactory: args.AccountFactory,
LastSnapshotMarker: lastSnapshotMarker.NewLastSnapshotMarker(),
}
snapshotManager, err := NewSnapshotsManager(argsSnapshotsManager)
if err != nil {
Expand Down
35 changes: 21 additions & 14 deletions state/accountsDB_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1026,7 +1026,7 @@ func TestAccountsDB_SnapshotStateOnAClosedStorageManagerShouldNotMarkActiveDB(t

mut.RLock()
defer mut.RUnlock()
assert.True(t, lastSnapshotStartedWasPut)
assert.False(t, lastSnapshotStartedWasPut)
assert.False(t, activeDBWasPut)
}

Expand Down Expand Up @@ -1065,11 +1065,14 @@ func TestAccountsDB_SnapshotStateWithErrorsShouldNotMarkActiveDB(t *testing.T) {

return nil
},
GetLatestStorageEpochCalled: func() (uint32, error) {
return 1, nil
},
}
},
}
adb := generateAccountDBFromTrie(trieStub)
adb.SnapshotState([]byte("roothash"), 0)
adb.SnapshotState([]byte("roothash"), 1)
time.Sleep(time.Second)

mut.RLock()
Expand Down Expand Up @@ -1106,14 +1109,14 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {

rootHash1 := []byte("rootHash1")
rootHash2 := []byte("rootHash2")
latestEpoch := uint32(0)
latestEpoch := atomic.Uint32{}
snapshotMutex := sync.RWMutex{}
takeSnapshotCalled := 0
trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return latestEpoch, nil
return latestEpoch.Get(), nil
},
TakeSnapshotCalled: func(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) {
snapshotMutex.Lock()
Expand Down Expand Up @@ -1141,7 +1144,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
snapshotMutex.Unlock()

// snapshot rootHash1 and epoch 1
latestEpoch = 1
latestEpoch.Set(1)
adb.SnapshotState(rootHash1, 1)
for adb.IsSnapshotInProgress() {
time.Sleep(waitForOpToFinish)
Expand All @@ -1151,7 +1154,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
snapshotMutex.Unlock()

// snapshot rootHash1 and epoch 0 again
latestEpoch = 0
latestEpoch.Set(0)
adb.SnapshotState(rootHash1, 0)
for adb.IsSnapshotInProgress() {
time.Sleep(waitForOpToFinish)
Expand Down Expand Up @@ -1179,7 +1182,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
snapshotMutex.Unlock()

// snapshot rootHash2 and epoch 1
latestEpoch = 1
latestEpoch.Set(1)
adb.SnapshotState(rootHash2, 1)
for adb.IsSnapshotInProgress() {
time.Sleep(waitForOpToFinish)
Expand All @@ -1189,7 +1192,7 @@ func TestAccountsDB_SnapshotStateSnapshotSameRootHash(t *testing.T) {
snapshotMutex.Unlock()

// snapshot rootHash2 and epoch 1 again
latestEpoch = 1
latestEpoch.Set(1)
adb.SnapshotState(rootHash2, 1)
for adb.IsSnapshotInProgress() {
time.Sleep(waitForOpToFinish)
Expand All @@ -1205,34 +1208,38 @@ func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T)
rootHashes := [][]byte{[]byte("rootHash1"), []byte("rootHash2"), []byte("rootHash3"), []byte("rootHash4")}
snapshotMutex := sync.RWMutex{}
takeSnapshotCalled := 0
numPutInEpochCalled := 0
numPutInEpochCalled := atomic.Uint32{}

trieStub := &trieMock.TrieStub{
GetStorageManagerCalled: func() common.StorageManager {
return &storageManager.StorageManagerStub{
GetLatestStorageEpochCalled: func() (uint32, error) {
return 0, nil
return uint32(mathRand.Intn(5)), nil
},
TakeSnapshotCalled: func(_ string, _ []byte, _ []byte, iteratorChannels *common.TrieIteratorChannels, _ chan []byte, stats common.SnapshotStatisticsHandler, _ uint32) {
snapshotMutex.Lock()
takeSnapshotCalled++
close(iteratorChannels.LeavesChan)
stats.SnapshotFinished()
for numPutInEpochCalled.Get() != 4 {
time.Sleep(time.Millisecond * 10)
}
snapshotMutex.Unlock()
},
PutInEpochCalled: func(key []byte, val []byte, epoch uint32) error {
assert.Equal(t, []byte(state.LastSnapshotStarted), key)
assert.Equal(t, rootHashes[epoch], val)
assert.Equal(t, rootHashes[epoch-1], val)

numPutInEpochCalled++
numPutInEpochCalled.Set(numPutInEpochCalled.Get() + 1)
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
return nil
},
}
},
}
adb := generateAccountDBFromTrie(trieStub)

for epoch, rootHash := range rootHashes {
for i, rootHash := range rootHashes {
epoch := i + 1
adb.SnapshotState(rootHash, uint32(epoch))
}
for adb.IsSnapshotInProgress() {
Expand All @@ -1242,7 +1249,7 @@ func TestAccountsDB_SnapshotStateSkipSnapshotIfSnapshotInProgress(t *testing.T)
snapshotMutex.Lock()
assert.Equal(t, 1, takeSnapshotCalled)
snapshotMutex.Unlock()
assert.Equal(t, len(rootHashes), numPutInEpochCalled)
assert.Equal(t, len(rootHashes), int(numPutInEpochCalled.Get()))
}

func TestAccountsDB_SnapshotStateCallsRemoveFromAllActiveEpochs(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions state/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,6 @@ var ErrNilStateMetrics = errors.New("nil sstate metrics")

// ErrNilChannelsProvider signals that a nil channels provider has been given
var ErrNilChannelsProvider = errors.New("nil channels provider")

// ErrNilLastSnapshotMarker signals that a nil last snapshot marker has been given
var ErrNilLastSnapshotMarker = errors.New("nil last snapshot marker")
20 changes: 1 addition & 19 deletions state/export_test.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,13 @@
package state

import (
"time"

"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/testscommon/storageManager"
vmcommon "github.com/multiversx/mx-chain-vm-common-go"
)

// LastSnapshotStarted -
const LastSnapshotStarted = lastSnapshot
const LastSnapshotStarted = "lastSnapshot"
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved

// LoadCode -
func (adb *AccountsDB) LoadCode(accountHandler baseAccountHandler) error {
Expand Down Expand Up @@ -92,21 +89,6 @@ func (sm *snapshotsManager) GetLastSnapshotInfo() ([]byte, uint32) {
return sm.lastSnapshot.rootHash, sm.lastSnapshot.epoch
}

// GetStorageEpochChangeWaitArgs -
func GetStorageEpochChangeWaitArgs() storageEpochChangeWaitArgs {
return storageEpochChangeWaitArgs{
Epoch: 1,
WaitTimeForSnapshotEpochCheck: time.Millisecond * 100,
SnapshotWaitTimeout: time.Second,
TrieStorageManager: &storageManager.StorageManagerStub{},
}
}

// WaitForStorageEpochChange
func (sm *snapshotsManager) WaitForStorageEpochChange(args storageEpochChangeWaitArgs) error {
return sm.waitForStorageEpochChange(args)
}

// NewNilSnapshotsManager -
func NewNilSnapshotsManager() *snapshotsManager {
return nil
Expand Down
11 changes: 10 additions & 1 deletion state/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ type DataTrie interface {
}

// PeerAccountHandler models a peer state account, which can journalize a normal account's data
// with some extra features like signing statistics or rating information
//
iulianpascalau marked this conversation as resolved.
Show resolved Hide resolved
// with some extra features like signing statistics or rating information
type PeerAccountHandler interface {
SetBLSPublicKey([]byte) error
GetRewardAddress() []byte
Expand Down Expand Up @@ -265,3 +266,11 @@ type SignRate interface {
GetNumSuccess() uint32
GetNumFailure() uint32
}

// LastSnapshotMarker manages the lastSnapshot marker operations
type LastSnapshotMarker interface {
AddMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte)
RemoveMarker(trieStorageManager common.StorageManager, epoch uint32, rootHash []byte)
GetMarkerInfo(trieStorageManager common.StorageManager) ([]byte, error)
IsInterfaceNil() bool
}