Skip to content

Commit

Permalink
Merge pull request #1864 from ElrondNetwork/bugFix/sync-multiple-tries
Browse files Browse the repository at this point in the history
Bug fix/sync multiple tries
  • Loading branch information
LucianMincu committed Jun 5, 2020
2 parents 0422e85 + 21eeac9 commit 5f2bb92
Show file tree
Hide file tree
Showing 44 changed files with 472 additions and 249 deletions.
30 changes: 18 additions & 12 deletions api/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package mock

// CacherStub -
type CacherStub struct {
ClearCalled func()
PutCalled func(key []byte, value interface{}) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
ClearCalled func()
PutCalled func(key []byte, value interface{}) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
UnRegisterHandlerCalled func(func(key []byte, value interface{}))
}

// UnRegisterHandler -
func (cs *CacherStub) UnRegisterHandler(handler func(key []byte, value interface{})) {
cs.UnRegisterHandlerCalled(handler)
}

// Clear -
Expand Down
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@
Shards = 16

[TrieNodesDataPool]
Capacity = 50000
Capacity = 900000
Type = "SizeLRU"
SizeInBytes = 104857600 #100MB
SizeInBytes = 314572800 #300MB

[WhiteListPool]
Capacity = 100000
Expand Down
7 changes: 5 additions & 2 deletions consensus/mock/cacherMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ func (cm *CacherMock) MaxSize() int {
}

// RegisterHandler -
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{})) {
panic("implement me")
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{}), string) {
}

// UnRegisterHandler -
func (cm *CacherMock) UnRegisterHandler(_ string) {
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
11 changes: 11 additions & 0 deletions core/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package core

import (
"crypto/rand"
)

// EmptyChannel empties the given channel
func EmptyChannel(ch chan bool) int {
readsCnt := 0
Expand All @@ -12,3 +16,10 @@ func EmptyChannel(ch chan bool) int {
}
}
}

// UniqueIdentifier returns a unique string identifier of 32 bytes
func UniqueIdentifier() string {
buff := make([]byte, 32)
_, _ = rand.Read(buff)
return string(buff)
}
6 changes: 6 additions & 0 deletions data/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ var ErrInvalidCacheSize = errors.New("cache size is invalid")

// ErrInvalidValue signals that an invalid value has been provided such as NaN to an integer field
var ErrInvalidValue = errors.New("invalid value")

// ErrNilThrottler signals that nil throttler has been provided
var ErrNilThrottler = errors.New("nil throttler")

// ErrTimeIsOut signals that time is out
var ErrTimeIsOut = errors.New("time is out")
8 changes: 8 additions & 0 deletions data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,11 @@ type ShardValidatorInfoHandler interface {
String() string
IsInterfaceNil() bool
}

// GoRoutineThrottler can monitor the number of the currently running go routines
type GoRoutineThrottler interface {
CanProcess() bool
StartProcessing()
EndProcessing()
IsInterfaceNil() bool
}
7 changes: 6 additions & 1 deletion data/mock/cacherMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ func (cm *CacherMock) MaxSize() int {
}

// RegisterHandler -
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{})) {
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{}), string) {
return
}

// UnRegisterHandler -
func (cm *CacherMock) UnRegisterHandler(string) {
return
}

Expand Down
28 changes: 17 additions & 11 deletions data/mock/uint64CacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ package mock

// Uint64CacherStub -
type Uint64CacherStub struct {
ClearCalled func()
PutCalled func(uint64, interface{}) bool
GetCalled func(uint64) (interface{}, bool)
HasCalled func(uint64) bool
PeekCalled func(uint64) (interface{}, bool)
HasOrAddCalled func(uint64, interface{}) (bool, bool)
RemoveCalled func(uint64)
RemoveOldestCalled func()
KeysCalled func() []uint64
LenCalled func() int
RegisterHandlerCalled func(handler func(nonce uint64))
ClearCalled func()
PutCalled func(uint64, interface{}) bool
GetCalled func(uint64) (interface{}, bool)
HasCalled func(uint64) bool
PeekCalled func(uint64) (interface{}, bool)
HasOrAddCalled func(uint64, interface{}) (bool, bool)
RemoveCalled func(uint64)
RemoveOldestCalled func()
KeysCalled func() []uint64
LenCalled func() int
RegisterHandlerCalled func(handler func(nonce uint64))
UnRegisterHandlerCalled func(func(key []byte, value interface{}))
}

// UnRegisterHandler -
func (ucs *Uint64CacherStub) UnRegisterHandler(handler func(key []byte, value interface{})) {
ucs.UnRegisterHandlerCalled(handler)
}

// Clear -
Expand Down
87 changes: 73 additions & 14 deletions data/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package syncer

import (
"context"
"sync"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/trie"
Expand All @@ -15,14 +18,19 @@ var _ epochStart.AccountsDBSyncer = (*userAccountsSyncer)(nil)

var log = logger.GetOrCreate("syncer")

const timeBetweenRetries = 100 * time.Millisecond

type userAccountsSyncer struct {
*baseAccountsSyncer
throttler data.GoRoutineThrottler
syncerMutex sync.Mutex
}

// ArgsNewUserAccountsSyncer defines the arguments needed for the new account syncer
type ArgsNewUserAccountsSyncer struct {
ArgsNewBaseAccountsSyncer
ShardId uint32
ShardId uint32
Throttler data.GoRoutineThrottler
}

// NewUserAccountsSyncer creates a user account syncer
Expand All @@ -32,6 +40,10 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
return nil, err
}

if check.IfNil(args.Throttler) {
return nil, data.ErrNilThrottler
}

b := &baseAccountsSyncer{
hasher: args.Hasher,
marshalizer: args.Marshalizer,
Expand All @@ -48,6 +60,7 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,

u := &userAccountsSyncer{
baseAccountsSyncer: b,
throttler: args.Throttler,
}

return u, nil
Expand Down Expand Up @@ -81,25 +94,71 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error {
}

func (u *userAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ctx context.Context) error {
var errFound error
errMutex := sync.Mutex{}

wg := sync.WaitGroup{}
wg.Add(len(rootHashes))

for _, rootHash := range rootHashes {
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
return err
for {
if u.throttler.CanProcess() {
break
}

select {
case <-time.After(timeBetweenRetries):
continue
case <-ctx.Done():
return data.ErrTimeIsOut
}
}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
return err
}
u.trieSyncers[string(rootHash)] = trieSyncer
go func(trieRootHash []byte) {
newErr := u.syncDataTrie(trieRootHash, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
}
wg.Done()
}(rootHash)
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}
wg.Wait()

errMutex.Lock()
defer errMutex.Unlock()

return errFound
}

func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ctx context.Context) error {
u.throttler.StartProcessing()

u.syncerMutex.Lock()
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
u.syncerMutex.Unlock()
return err
}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
u.syncerMutex.Unlock()
return err
}
u.trieSyncers[string(rootHash)] = trieSyncer
u.syncerMutex.Unlock()

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}

u.throttler.EndProcessing()

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion data/trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/storage"
Expand All @@ -29,6 +30,7 @@ type trieSyncer struct {
requestHandler RequestHandler
interceptedNodes storage.Cacher
mutOperation sync.RWMutex
handlerID string
}

const maxNewMissingAddedPerTurn = 10
Expand Down Expand Up @@ -67,8 +69,8 @@ func NewTrieSyncer(
topic: topic,
shardId: shardId,
waitTimeBetweenRequests: time.Second,
handlerID: core.UniqueIdentifier(),
}
ts.interceptedNodes.RegisterHandler(ts.trieNodeIntercepted)

return ts, nil
}
Expand Down
32 changes: 19 additions & 13 deletions dataRetriever/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package mock

// CacherStub -
type CacherStub struct {
ClearCalled func()
PutCalled func(key []byte, value interface{}, sizeInBytes int) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}, sizeInBytes int) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
ClearCalled func()
PutCalled func(key []byte, value interface{}, sizeInBytes int) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}, sizeInBytes int) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
UnRegisterHandlerCalled func(id string)
}

// UnRegisterHandler -
func (cs *CacherStub) UnRegisterHandler(id string) {
cs.UnRegisterHandlerCalled(id)
}

// Clear -
Expand Down Expand Up @@ -67,7 +73,7 @@ func (cs *CacherStub) MaxSize() int {
}

// RegisterHandler -
func (cs *CacherStub) RegisterHandler(handler func(key []byte, value interface{})) {
func (cs *CacherStub) RegisterHandler(handler func(key []byte, value interface{}), _ string) {
cs.RegisterHandlerCalled(handler)
}

Expand Down

0 comments on commit 5f2bb92

Please sign in to comment.