Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix/sync multiple tries #1864

Merged
merged 13 commits into from
Jun 5, 2020
30 changes: 18 additions & 12 deletions api/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package mock

// CacherStub -
type CacherStub struct {
ClearCalled func()
PutCalled func(key []byte, value interface{}) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
ClearCalled func()
PutCalled func(key []byte, value interface{}) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
UnRegisterHandlerCalled func(func(key []byte, value interface{}))
}

// UnRegisterHandler -
func (cs *CacherStub) UnRegisterHandler(handler func(key []byte, value interface{})) {
cs.UnRegisterHandlerCalled(handler)
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
}

// Clear -
Expand Down
4 changes: 2 additions & 2 deletions cmd/node/config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,9 @@
Shards = 16

[TrieNodesDataPool]
Capacity = 50000
Capacity = 900000
Type = "SizeLRU"
SizeInBytes = 104857600 #100MB
SizeInBytes = 314572800 #300MB

[WhiteListPool]
Capacity = 100000
Expand Down
7 changes: 5 additions & 2 deletions consensus/mock/cacherMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,11 @@ func (cm *CacherMock) MaxSize() int {
}

// RegisterHandler -
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{})) {
panic("implement me")
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{}), string) {
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
}

// UnRegisterHandler -
func (cm *CacherMock) UnRegisterHandler(_ string) {
}

// IsInterfaceNil returns true if there is no value under the interface
Expand Down
11 changes: 11 additions & 0 deletions core/common.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package core

import (
"crypto/rand"
)

// EmptyChannel empties the given channel
func EmptyChannel(ch chan bool) int {
readsCnt := 0
Expand All @@ -12,3 +16,10 @@ func EmptyChannel(ch chan bool) int {
}
}
}

// UniqueIdentifier returns a unique string identifier of 32 bytes
func UniqueIdentifier() string {
buff := make([]byte, 32)
_, _ = rand.Read(buff)
return string(buff)
}
6 changes: 6 additions & 0 deletions data/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ var ErrInvalidCacheSize = errors.New("cache size is invalid")

// ErrInvalidValue signals that an invalid value has been provided such as NaN to an integer field
var ErrInvalidValue = errors.New("invalid value")

// ErrNilThrottler signals that nil throttler has been provided
var ErrNilThrottler = errors.New("nil throttler")

// ErrTimeIsOut signals that time is out
var ErrTimeIsOut = errors.New("time is out")
8 changes: 8 additions & 0 deletions data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,11 @@ type ShardValidatorInfoHandler interface {
String() string
IsInterfaceNil() bool
}

// GoRoutineThrottler can monitor the number of the currently running go routines
type GoRoutineThrottler interface {
CanProcess() bool
StartProcessing()
EndProcessing()
IsInterfaceNil() bool
}
7 changes: 6 additions & 1 deletion data/mock/cacherMock.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,12 @@ func (cm *CacherMock) MaxSize() int {
}

// RegisterHandler -
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{})) {
func (cm *CacherMock) RegisterHandler(func(key []byte, value interface{}), string) {
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
return
}

// UnRegisterHandler -
func (cm *CacherMock) UnRegisterHandler(string) {
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
return
}

Expand Down
28 changes: 17 additions & 11 deletions data/mock/uint64CacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,23 @@ package mock

// Uint64CacherStub -
type Uint64CacherStub struct {
ClearCalled func()
PutCalled func(uint64, interface{}) bool
GetCalled func(uint64) (interface{}, bool)
HasCalled func(uint64) bool
PeekCalled func(uint64) (interface{}, bool)
HasOrAddCalled func(uint64, interface{}) (bool, bool)
RemoveCalled func(uint64)
RemoveOldestCalled func()
KeysCalled func() []uint64
LenCalled func() int
RegisterHandlerCalled func(handler func(nonce uint64))
ClearCalled func()
PutCalled func(uint64, interface{}) bool
GetCalled func(uint64) (interface{}, bool)
HasCalled func(uint64) bool
PeekCalled func(uint64) (interface{}, bool)
HasOrAddCalled func(uint64, interface{}) (bool, bool)
RemoveCalled func(uint64)
RemoveOldestCalled func()
KeysCalled func() []uint64
LenCalled func() int
RegisterHandlerCalled func(handler func(nonce uint64))
UnRegisterHandlerCalled func(func(key []byte, value interface{}))
}

// UnRegisterHandler -
func (ucs *Uint64CacherStub) UnRegisterHandler(handler func(key []byte, value interface{})) {
ucs.UnRegisterHandlerCalled(handler)
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
}

// Clear -
Expand Down
87 changes: 73 additions & 14 deletions data/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package syncer

import (
"context"
"sync"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/trie"
Expand All @@ -15,14 +18,19 @@ var _ epochStart.AccountsDBSyncer = (*userAccountsSyncer)(nil)

var log = logger.GetOrCreate("syncer")

const timeBetweenRetries = 100 * time.Millisecond

type userAccountsSyncer struct {
*baseAccountsSyncer
throttler data.GoRoutineThrottler
syncerMutex sync.Mutex
}

// ArgsNewUserAccountsSyncer defines the arguments needed for the new account syncer
type ArgsNewUserAccountsSyncer struct {
ArgsNewBaseAccountsSyncer
ShardId uint32
ShardId uint32
Throttler data.GoRoutineThrottler
}

// NewUserAccountsSyncer creates a user account syncer
Expand All @@ -32,6 +40,10 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
return nil, err
}

if check.IfNil(args.Throttler) {
return nil, data.ErrNilThrottler
}

b := &baseAccountsSyncer{
hasher: args.Hasher,
marshalizer: args.Marshalizer,
Expand All @@ -48,6 +60,7 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,

u := &userAccountsSyncer{
baseAccountsSyncer: b,
throttler: args.Throttler,
}

return u, nil
Expand Down Expand Up @@ -81,25 +94,71 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error {
}

func (u *userAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ctx context.Context) error {
var errFound error
errMutex := sync.Mutex{}

wg := sync.WaitGroup{}
wg.Add(len(rootHashes))

for _, rootHash := range rootHashes {
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
return err
for {
if u.throttler.CanProcess() {
break
}

select {
case <-time.After(timeBetweenRetries):
continue
case <-ctx.Done():
return data.ErrTimeIsOut
}
}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
return err
}
u.trieSyncers[string(rootHash)] = trieSyncer
go func(trieRootHash []byte) {
newErr := u.syncDataTrie(trieRootHash, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
}
wg.Done()
}(rootHash)
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}
wg.Wait()

errMutex.Lock()
defer errMutex.Unlock()

return errFound
}

func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ctx context.Context) error {
u.throttler.StartProcessing()

u.syncerMutex.Lock()
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
u.syncerMutex.Unlock()
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u.syncerMutex.Unlock() before return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
u.syncerMutex.Unlock()
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u.syncerMutex.Unlock() before return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
u.trieSyncers[string(rootHash)] = trieSyncer
u.syncerMutex.Unlock()

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}

u.throttler.EndProcessing()

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion data/trie/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/storage"
Expand All @@ -29,6 +30,7 @@ type trieSyncer struct {
requestHandler RequestHandler
interceptedNodes storage.Cacher
mutOperation sync.RWMutex
handlerID string
}

const maxNewMissingAddedPerTurn = 10
Expand Down Expand Up @@ -67,8 +69,8 @@ func NewTrieSyncer(
topic: topic,
shardId: shardId,
waitTimeBetweenRequests: time.Second,
handlerID: core.UniqueIdentifier(),
}
ts.interceptedNodes.RegisterHandler(ts.trieNodeIntercepted)

return ts, nil
}
Expand Down
32 changes: 19 additions & 13 deletions dataRetriever/mock/cacherStub.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package mock

// CacherStub -
type CacherStub struct {
ClearCalled func()
PutCalled func(key []byte, value interface{}, sizeInBytes int) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}, sizeInBytes int) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
ClearCalled func()
PutCalled func(key []byte, value interface{}, sizeInBytes int) (evicted bool)
GetCalled func(key []byte) (value interface{}, ok bool)
HasCalled func(key []byte) bool
PeekCalled func(key []byte) (value interface{}, ok bool)
HasOrAddCalled func(key []byte, value interface{}, sizeInBytes int) (ok, evicted bool)
RemoveCalled func(key []byte)
RemoveOldestCalled func()
KeysCalled func() [][]byte
LenCalled func() int
MaxSizeCalled func() int
RegisterHandlerCalled func(func(key []byte, value interface{}))
UnRegisterHandlerCalled func(id string)
}

// UnRegisterHandler -
func (cs *CacherStub) UnRegisterHandler(id string) {
cs.UnRegisterHandlerCalled(id)
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
}

// Clear -
Expand Down Expand Up @@ -67,7 +73,7 @@ func (cs *CacherStub) MaxSize() int {
}

// RegisterHandler -
func (cs *CacherStub) RegisterHandler(handler func(key []byte, value interface{})) {
func (cs *CacherStub) RegisterHandler(handler func(key []byte, value interface{}), _ string) {
cs.RegisterHandlerCalled(handler)
SebastianMarian marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down