Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug fix/sync multiple tries #1864

Merged
merged 13 commits into from
Jun 5, 2020
6 changes: 6 additions & 0 deletions data/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,9 @@ var ErrInvalidCacheSize = errors.New("cache size is invalid")

// ErrInvalidValue signals that an invalid value has been provided such as NaN to an integer field
var ErrInvalidValue = errors.New("invalid value")

// ErrNilThrottler signals that nil throttler has been provided
var ErrNilThrottler = errors.New("nil throttler")

// ErrTimeIsOut signals that time is out
var ErrTimeIsOut = errors.New("time is out")
8 changes: 8 additions & 0 deletions data/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,11 @@ type ShardValidatorInfoHandler interface {
String() string
IsInterfaceNil() bool
}

// GoRoutineThrottler can monitor the number of the currently running go routines
type GoRoutineThrottler interface {
CanProcess() bool
StartProcessing()
EndProcessing()
IsInterfaceNil() bool
}
81 changes: 68 additions & 13 deletions data/syncer/userAccountsSyncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package syncer

import (
"context"
"sync"
"time"

logger "github.com/ElrondNetwork/elrond-go-logger"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/state"
"github.com/ElrondNetwork/elrond-go/data/trie"
Expand All @@ -15,14 +18,19 @@ var _ epochStart.AccountsDBSyncer = (*userAccountsSyncer)(nil)

var log = logger.GetOrCreate("syncer")

const timeBetweenRetries = 100 * time.Millisecond

type userAccountsSyncer struct {
*baseAccountsSyncer
throttler data.GoRoutineThrottler
syncerMutex sync.Mutex
}

// ArgsNewUserAccountsSyncer defines the arguments needed for the new account syncer
type ArgsNewUserAccountsSyncer struct {
ArgsNewBaseAccountsSyncer
ShardId uint32
ShardId uint32
Throttler data.GoRoutineThrottler
}

// NewUserAccountsSyncer creates a user account syncer
Expand All @@ -32,6 +40,10 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,
return nil, err
}

if check.IfNil(args.Throttler) {
return nil, data.ErrNilThrottler
}

b := &baseAccountsSyncer{
hasher: args.Hasher,
marshalizer: args.Marshalizer,
Expand All @@ -48,6 +60,7 @@ func NewUserAccountsSyncer(args ArgsNewUserAccountsSyncer) (*userAccountsSyncer,

u := &userAccountsSyncer{
baseAccountsSyncer: b,
throttler: args.Throttler,
}

return u, nil
Expand Down Expand Up @@ -81,25 +94,67 @@ func (u *userAccountsSyncer) SyncAccounts(rootHash []byte) error {
}

func (u *userAccountsSyncer) syncAccountDataTries(rootHashes [][]byte, ctx context.Context) error {
var errFound error
errMutex := sync.Mutex{}

for _, rootHash := range rootHashes {
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
return err
for {
if u.throttler.CanProcess() {
break
}

select {
case <-time.After(timeBetweenRetries):
continue
case <-ctx.Done():
return data.ErrTimeIsOut
}
}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
return err
go func(trieRootHash []byte) {
newErr := u.syncDataTrie(trieRootHash, ctx)
if newErr != nil {
errMutex.Lock()
errFound = newErr
errMutex.Unlock()
}
}(rootHash)

errMutex.Lock()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing errMutex.Unlock() after line 128. I suggest to replace the code between lines 123-128 with:

errMutex.RLock()
returnErr := errFound
errMutex.RUnlock()

if returnErr != nil {
	return returnErr
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if errFound != nil {
returnErr := errFound
errMutex.Unlock()
return returnErr
}
u.trieSyncers[string(rootHash)] = trieSyncer
}

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}
return nil
}

func (u *userAccountsSyncer) syncDataTrie(rootHash []byte, ctx context.Context) error {
u.throttler.StartProcessing()

u.syncerMutex.Lock()
dataTrie, err := trie.NewTrie(u.trieStorageManager, u.marshalizer, u.hasher, u.maxTrieLevelInMemory)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u.syncerMutex.Unlock() before return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

u.dataTries[string(rootHash)] = dataTrie
trieSyncer, err := trie.NewTrieSyncer(u.requestHandler, u.cacher, dataTrie, u.shardId, factory.AccountTrieNodesTopic)
if err != nil {
return err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

u.syncerMutex.Unlock() before return

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
u.trieSyncers[string(rootHash)] = trieSyncer
u.syncerMutex.Unlock()

err = trieSyncer.StartSyncing(rootHash, ctx)
if err != nil {
return err
}

u.throttler.EndProcessing()

return nil
}

Expand Down
32 changes: 26 additions & 6 deletions dataRetriever/requestHandlers/requestHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ var _ epochStart.RequestHandler = (*resolverRequestHandler)(nil)

var log = logger.GetOrCreate("dataretriever/requesthandlers")

const minHashesToRequest = 10
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please replace line 456 from addRequestedItems method with continue instead return. This will fix the situation when the call of rrh.requestedItemsHandler.Add for a key, fails for whatever reason, and because of this the all others keys would not be added in the requested items list.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

const timeToAccumulateTrieHashes = 100 * time.Millisecond

type resolverRequestHandler struct {
epoch uint32
shardID uint32
Expand All @@ -28,6 +31,10 @@ type resolverRequestHandler struct {
sweepTime time.Time
requestInterval time.Duration
mutSweepTime sync.Mutex

trieHashAccumulator [][]byte
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trieHashesAcumulator ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

lastTrieRequestTime time.Time
mutexTrieHashes sync.Mutex
}

// NewResolverRequestHandler creates a requestHandler interface implementation with request functions
Expand Down Expand Up @@ -64,6 +71,7 @@ func NewResolverRequestHandler(
maxTxsToRequest: maxTxsToRequest,
whiteList: whiteList,
requestInterval: requestInterval,
trieHashAccumulator: make([][]byte, 0),
}

rrh.sweepTime = time.Now()
Expand Down Expand Up @@ -346,10 +354,22 @@ func (rrh *resolverRequestHandler) RequestTrieNodes(destShardID uint32, hashes [
if len(unrequestedHashes) == 0 {
return
}

rrh.mutexTrieHashes.Lock()
defer rrh.mutexTrieHashes.Unlock()

rrh.trieHashAccumulator = append(rrh.trieHashAccumulator, unrequestedHashes...)
elapsedTime := time.Since(rrh.lastTrieRequestTime)
if len(rrh.trieHashAccumulator) < minHashesToRequest && elapsedTime < timeToAccumulateTrieHashes {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the last bulk of trieHashes is smaller than 10? Let say 8. This will make the method to return in this point. It would be called again for the same 8 hashes after a while?

  1. If yes, they would be added again in rrh.trieHashAccumulator, and now we have 16, but they are duplicates.
  2. If not, they would be not requested anymore.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should either call: rrh.addRequestedItems(rrh.trieHashAccumulator) before return or better rrh.trieHashAccumulator = rrh.trieHashAccumulator[:0] and add them again in the next call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are also other exit points below so I think you should put this line: rrh.trieHashAccumulator = rrh.trieHashAccumulator[:0], in a defer method above and remove it from line 400

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the request trie hashes method is called in a continuous way at every second. Thus the accumulated hashes will be surely requested.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then you need only to manage the duplicate hashes added in rrh.trieHashAccumulator after each call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done with MAP.

return
}

rrh.lastTrieRequestTime = time.Now()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this to line 391 where you are sure that the rrh.requestHashesWithDataSplit(rrh.trieHashAccumulator, trieResolver) would be executed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make requestHashesWithDataSplit to return error at least if the SplitDataInChunks call fails. And set the rrh.lastTrieRequestTime = time.Now() only if the request has been done with success.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


log.Debug("requesting trie nodes from network",
"topic", topic,
"shard", destShardID,
"num nodes", len(unrequestedHashes),
"num nodes", len(rrh.trieHashAccumulator),
"firstHash", unrequestedHashes[0],
)

Expand All @@ -369,15 +389,15 @@ func (rrh *resolverRequestHandler) RequestTrieNodes(destShardID uint32, hashes [
return
}

for _, txHash := range hashes {
for _, txHash := range rrh.trieHashAccumulator {
log.Trace("requestByHashes", "hash", txHash)
}

rrh.whiteList.Add(unrequestedHashes)

go rrh.requestHashesWithDataSplit(unrequestedHashes, trieResolver)
rrh.whiteList.Add(rrh.trieHashAccumulator)
rrh.requestHashesWithDataSplit(rrh.trieHashAccumulator, trieResolver)
rrh.addRequestedItems(rrh.trieHashAccumulator)

rrh.addRequestedItems(unrequestedHashes)
rrh.trieHashAccumulator = rrh.trieHashAccumulator[:0]
}

// RequestMetaHeaderByNonce method asks for meta header from the connected peers by nonce
Expand Down
10 changes: 9 additions & 1 deletion epochStart/bootstrap/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/ElrondNetwork/elrond-go/config"
"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/partitioning"
"github.com/ElrondNetwork/elrond-go/core/throttler"
"github.com/ElrondNetwork/elrond-go/crypto"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/block"
Expand Down Expand Up @@ -46,6 +47,7 @@ const timeBetweenRequests = 100 * time.Millisecond
const maxToRequest = 100
const gracePeriodInPercentage = float64(0.25)
const roundGracePeriod = 25
const numConcurrentTrieSyncers = 50

// Parameters defines the DTO for the result produced by the bootstrap component
type Parameters struct {
Expand Down Expand Up @@ -734,6 +736,11 @@ func (e *epochStartBootstrap) requestAndProcessForShard() error {
}

func (e *epochStartBootstrap) syncUserAccountsState(rootHash []byte) error {
thr, err := throttler.NewNumGoRoutinesThrottler(numConcurrentTrieSyncers)
if err != nil {
return err
}

argsUserAccountsSyncer := syncer.ArgsNewUserAccountsSyncer{
ArgsNewBaseAccountsSyncer: syncer.ArgsNewBaseAccountsSyncer{
Hasher: e.hasher,
Expand All @@ -744,7 +751,8 @@ func (e *epochStartBootstrap) syncUserAccountsState(rootHash []byte) error {
Cacher: e.dataPool.TrieNodes(),
MaxTrieLevelInMemory: e.generalConfig.StateTriesConfig.MaxStateTrieLevelInMemory,
},
ShardId: e.shardCoordinator.SelfId(),
ShardId: e.shardCoordinator.SelfId(),
Throttler: thr,
}
accountsDBSyncer, err := syncer.NewUserAccountsSyncer(argsUserAccountsSyncer)
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion update/factory/accountDBSyncerContainerFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/ElrondNetwork/elrond-go/core"
"github.com/ElrondNetwork/elrond-go/core/check"
"github.com/ElrondNetwork/elrond-go/core/throttler"
"github.com/ElrondNetwork/elrond-go/data"
"github.com/ElrondNetwork/elrond-go/data/syncer"
"github.com/ElrondNetwork/elrond-go/hashing"
Expand All @@ -17,6 +18,8 @@ import (
"github.com/ElrondNetwork/elrond-go/update/genesis"
)

const numConcurrentTrieSyncers = 50

// ArgsNewAccountsDBSyncersContainerFactory defines the arguments needed to create accounts DB syncers container
type ArgsNewAccountsDBSyncersContainerFactory struct {
TrieCacher storage.Cacher
Expand Down Expand Up @@ -106,6 +109,11 @@ func (a *accountDBSyncersContainerFactory) Create() (update.AccountsDBSyncContai
}

func (a *accountDBSyncersContainerFactory) createUserAccountsSyncer(shardId uint32) error {
thr, err := throttler.NewNumGoRoutinesThrottler(numConcurrentTrieSyncers)
if err != nil {
return err
}

args := syncer.ArgsNewUserAccountsSyncer{
ArgsNewBaseAccountsSyncer: syncer.ArgsNewBaseAccountsSyncer{
Hasher: a.hasher,
Expand All @@ -116,7 +124,8 @@ func (a *accountDBSyncersContainerFactory) createUserAccountsSyncer(shardId uint
Cacher: a.trieCacher,
MaxTrieLevelInMemory: a.maxTrieLevelinMemory,
},
ShardId: shardId,
ShardId: shardId,
Throttler: thr,
}
accountSyncer, err := syncer.NewUserAccountsSyncer(args)
if err != nil {
Expand Down