Skip to content

Commit

Permalink
Merge refs/heads/master into v0.11
Browse files Browse the repository at this point in the history
  • Loading branch information
prylabs-bulldozer[bot] committed Apr 4, 2020
2 parents 7f0911f + f440c81 commit a0c62b1
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 17 deletions.
7 changes: 7 additions & 0 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,3 +503,10 @@ func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (u

return slot, nil
}

// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
func (f *blocksFetcher) bestFinalizedSlot() uint64 {
headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, finalizedEpoch, _ := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
return helpers.StartSlot(finalizedEpoch)
}
35 changes: 24 additions & 11 deletions beacon-chain/sync/initial-sync/blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/blockchain"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/shared/mathutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/sirupsen/logrus"
)

const (
Expand All @@ -35,6 +35,7 @@ type blocksProvider interface {
requestResponses() <-chan *fetchRequestResponse
scheduleRequest(ctx context.Context, start, count uint64) error
nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error)
bestFinalizedSlot() uint64
start() error
stop()
}
Expand Down Expand Up @@ -72,11 +73,15 @@ func newBlocksQueue(ctx context.Context, cfg *blocksQueueConfig) *blocksQueue {
p2p: cfg.p2p,
})
}
highestExpectedSlot := cfg.highestExpectedSlot
if highestExpectedSlot <= cfg.startSlot {
highestExpectedSlot = blocksFetcher.bestFinalizedSlot()
}

queue := &blocksQueue{
ctx: ctx,
cancel: cancel,
highestExpectedSlot: cfg.highestExpectedSlot,
highestExpectedSlot: highestExpectedSlot,
blocksFetcher: blocksFetcher,
headFetcher: cfg.headFetcher,
fetchedBlocks: make(chan *eth.SignedBeaconBlock, allowedBlocksPerSecond),
Expand Down Expand Up @@ -140,7 +145,13 @@ func (q *blocksQueue) loop() {
ticker := time.NewTicker(pollingInterval)
tickerEvents := []eventID{eventSchedule, eventReadyToSend, eventCheckStale, eventExtendWindow}
for {

if q.headFetcher.HeadSlot() >= q.highestExpectedSlot {
// By the time initial sync is complete, highest slot may increase, re-check.
if q.highestExpectedSlot < q.blocksFetcher.bestFinalizedSlot() {
q.highestExpectedSlot = q.blocksFetcher.bestFinalizedSlot()
continue
}
log.Debug("Highest expected slot reached")
q.cancel()
}
Expand All @@ -156,7 +167,11 @@ func (q *blocksQueue) loop() {
// Trigger events on each epoch's state machine.
for _, event := range tickerEvents {
if err := q.state.trigger(event, state.epoch, data); err != nil {
log.WithError(err).Debug("Can not trigger event")
log.WithFields(logrus.Fields{
"event": event,
"epoch": state.epoch,
"error": err.Error(),
}).Debug("Can not trigger event")
}
}

Expand Down Expand Up @@ -186,7 +201,11 @@ func (q *blocksQueue) loop() {
if ind, ok := q.state.findEpochState(epoch); ok {
state := q.state.epochs[ind]
if err := q.state.trigger(eventDataReceived, state.epoch, response); err != nil {
log.WithError(err).Debug("Can not trigger event")
log.WithFields(logrus.Fields{
"event": eventDataReceived,
"epoch": state.epoch,
"error": err.Error(),
}).Debug("Can not trigger event")
state.setState(stateNew)
continue
}
Expand All @@ -203,13 +222,7 @@ func (q *blocksQueue) loop() {
func (q *blocksQueue) onScheduleEvent(ctx context.Context) eventHandlerFn {
return func(es *epochState, in interface{}) (stateID, error) {
data := in.(*fetchRequestParams)
start := data.start
count := mathutil.Min(data.count, q.highestExpectedSlot-start+1)
if count <= 0 {
return es.state, errSlotIsTooHigh
}

if err := q.blocksFetcher.scheduleRequest(ctx, start, count); err != nil {
if err := q.blocksFetcher.scheduleRequest(ctx, data.start, data.count); err != nil {
return es.state, err
}
return stateScheduled, nil
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/initial-sync/blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ func TestBlocksQueueLoop(t *testing.T) {
}{
{
name: "Single peer with all blocks",
highestExpectedSlot: 251,
expectedBlockSlots: makeSequence(1, 251),
highestExpectedSlot: 251, // will be auto-fixed to 256 (to 8th epoch), by queue
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
Expand All @@ -169,8 +169,8 @@ func TestBlocksQueueLoop(t *testing.T) {
},
{
name: "Multiple peers with all blocks",
highestExpectedSlot: 251,
expectedBlockSlots: makeSequence(1, 251),
highestExpectedSlot: 256,
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestBlocksQueueLoop(t *testing.T) {
{
name: "Multiple peers with failures",
highestExpectedSlot: 128,
expectedBlockSlots: makeSequence(1, 128),
expectedBlockSlots: makeSequence(1, 256),
peers: []*peerData{
{
blocks: makeSequence(1, 320),
Expand Down Expand Up @@ -301,7 +301,7 @@ func TestBlocksQueueLoop(t *testing.T) {
t.Error(err)
}

if queue.headFetcher.HeadSlot() < uint64(len(tt.expectedBlockSlots)) {
if queue.headFetcher.HeadSlot() < tt.highestExpectedSlot {
t.Errorf("Not enough slots synced, want: %v, got: %v",
len(tt.expectedBlockSlots), queue.headFetcher.HeadSlot())
}
Expand Down

0 comments on commit a0c62b1

Please sign in to comment.