Skip to content

Commit

Permalink
init-sync revamp (#5148)
Browse files Browse the repository at this point in the history
* fix issue with rate limiting
* force fetcher to wait for minimum peers
* adds backoff interval
* cap the max blocks requested from a peer
* queue rewritten
* adds docs to fsm
* fix visibility
* updates fsm
* fsm tests added
* optimizes queue resource allocations
* removes debug log
* replace auto-fixed comment
* fixes typo
* better handling of evil peers
* fixes test
* minor fixes to fsm
* better interface for findEpochState func
  • Loading branch information
farazdagi committed Mar 27, 2020
1 parent 33f6c22 commit 7ebb3c1
Show file tree
Hide file tree
Showing 9 changed files with 812 additions and 892 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/sync/initial-sync/BUILD.bazel
Expand Up @@ -5,6 +5,7 @@ go_library(
srcs = [
"blocks_fetcher.go",
"blocks_queue.go",
"fsm.go",
"log.go",
"round_robin.go",
"service.go",
Expand Down Expand Up @@ -43,6 +44,7 @@ go_test(
srcs = [
"blocks_fetcher_test.go",
"blocks_queue_test.go",
"fsm_test.go",
"round_robin_test.go",
],
embed = [":go_default_library"],
Expand Down
80 changes: 69 additions & 11 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"math"
"math/rand"
"sort"
"sync"
Expand All @@ -16,14 +17,23 @@ import (
eth "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/flags"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
prysmsync "github.com/prysmaticlabs/prysm/beacon-chain/sync"
p2ppb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

const (
// maxPendingRequests limits how many concurrent fetch request one can initiate.
maxPendingRequests = 8
// peersPercentagePerRequest caps percentage of peers to be used in a request.
peersPercentagePerRequest = 0.75
)

var (
errNoPeersAvailable = errors.New("no peers available, waiting for reconnect")
errFetcherCtxIsDone = errors.New("fetcher's context is done, reinitialize")
Expand All @@ -40,6 +50,7 @@ type blocksFetcherConfig struct {
// On an incoming requests, requested block range is evenly divided
// among available peers (for fair network load distribution).
type blocksFetcher struct {
sync.Mutex
ctx context.Context
cancel context.CancelFunc
headFetcher blockchain.HeadFetcher
Expand Down Expand Up @@ -72,16 +83,16 @@ func newBlocksFetcher(ctx context.Context, cfg *blocksFetcherConfig) *blocksFetc
rateLimiter := leakybucket.NewCollector(
allowedBlocksPerSecond, /* rate */
allowedBlocksPerSecond, /* capacity */
false /* deleteEmptyBuckets */)
false /* deleteEmptyBuckets */)

return &blocksFetcher{
ctx: ctx,
cancel: cancel,
headFetcher: cfg.headFetcher,
p2p: cfg.p2p,
rateLimiter: rateLimiter,
fetchRequests: make(chan *fetchRequestParams, queueMaxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, queueMaxPendingRequests),
fetchRequests: make(chan *fetchRequestParams, maxPendingRequests),
fetchResponses: make(chan *fetchRequestResponse, maxPendingRequests),
quit: make(chan struct{}),
}
}
Expand Down Expand Up @@ -120,6 +131,11 @@ func (f *blocksFetcher) loop() {
}()

for {
// Make sure there is are available peers before processing requests.
if _, err := f.waitForMinimumPeers(f.ctx); err != nil {
log.Error(err)
}

select {
case <-f.ctx.Done():
log.Debug("Context closed, exiting goroutine (blocks fetcher)")
Expand Down Expand Up @@ -221,17 +237,11 @@ func (f *blocksFetcher) collectPeerResponses(
return nil, ctx.Err()
}

peers = f.selectPeers(peers)
if len(peers) == 0 {
return nil, errNoPeersAvailable
}

// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})

p2pRequests := new(sync.WaitGroup)
errChan := make(chan error)
blocksChan := make(chan []*eth.SignedBeaconBlock)
Expand All @@ -249,7 +259,7 @@ func (f *blocksFetcher) collectPeerResponses(
}

// Spread load evenly among available peers.
perPeerCount := count / uint64(len(peers))
perPeerCount := mathutil.Min(count/uint64(len(peers)), allowedBlocksPerSecond)
remainder := int(count % uint64(len(peers)))
for i, pid := range peers {
start, step := start+uint64(i)*step, step*uint64(len(peers))
Expand Down Expand Up @@ -354,6 +364,7 @@ func (f *blocksFetcher) requestBlocks(
req *p2ppb.BeaconBlocksByRangeRequest,
pid peer.ID,
) ([]*eth.SignedBeaconBlock, error) {
f.Lock()
if f.rateLimiter.Remaining(pid.String()) < int64(req.Count) {
log.WithField("peer", pid).Debug("Slowing down for rate limit")
time.Sleep(f.rateLimiter.TillEmpty(pid.String()))
Expand All @@ -366,6 +377,7 @@ func (f *blocksFetcher) requestBlocks(
"step": req.Step,
"head": fmt.Sprintf("%#x", req.HeadBlockRoot),
}).Debug("Requesting blocks")
f.Unlock()
stream, err := f.p2p.Send(ctx, req, pid)
if err != nil {
return nil, err
Expand Down Expand Up @@ -407,3 +419,49 @@ func selectFailOverPeer(excludedPID peer.ID, peers []peer.ID) (peer.ID, error) {

return peers[0], nil
}

// waitForMinimumPeers spins and waits up until enough peers are available.
func (f *blocksFetcher) waitForMinimumPeers(ctx context.Context) ([]peer.ID, error) {
required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}
for {
if ctx.Err() != nil {
return nil, ctx.Err()
}
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, _, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
if len(peers) >= required {
return peers, nil
}
log.WithFields(logrus.Fields{
"suitable": len(peers),
"required": required}).Info("Waiting for enough suitable peers before syncing")
time.Sleep(handshakePollingInterval)
}
}

// selectPeers returns transformed list of peers (randomized, constrained if necessary).
func (f *blocksFetcher) selectPeers(peers []peer.ID) []peer.ID {
if len(peers) == 0 {
return peers
}

// Shuffle peers to prevent a bad peer from
// stalling sync with invalid blocks.
randGenerator := rand.New(rand.NewSource(time.Now().Unix()))
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})

required := params.BeaconConfig().MaxPeersToSync
if flags.Get().MinimumSyncPeers < required {
required = flags.Get().MinimumSyncPeers
}

limit := uint64(math.Round(float64(len(peers)) * peersPercentagePerRequest))
limit = mathutil.Max(limit, uint64(required))
limit = mathutil.Min(limit, uint64(len(peers)))
return peers[:limit]
}
20 changes: 15 additions & 5 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Expand Up @@ -97,7 +97,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
}{
{
name: "Single peer with all blocks",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 3*blockBatchSize),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
Expand All @@ -122,7 +122,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
},
{
name: "Single peer with all blocks (many small requests)",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 80),
peers: []*peerData{
{
blocks: makeSequence(1, 131),
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
},
{
name: "Multiple peers with all blocks",
expectedBlockSlots: makeSequence(1, 128), // up to 4th epoch
expectedBlockSlots: makeSequence(1, 96), // up to 4th epoch
peers: []*peerData{
{
blocks: makeSequence(1, 131),
Expand Down Expand Up @@ -218,6 +218,16 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 640,
},
},
requests: []*fetchRequestParams{
{
Expand All @@ -233,8 +243,8 @@ func TestBlocksFetcherRoundRobin(t *testing.T) {
count: blockBatchSize,
},
{
start: 400,
count: 150,
start: 500,
count: 53,
},
{
start: 553,
Expand Down

0 comments on commit 7ebb3c1

Please sign in to comment.