Skip to content

Commit

Permalink
Allow to jump over 50K+ skipped slots in init-sync (#5975)
Browse files Browse the repository at this point in the history
* allow to jump over 50K skipped slots
* more tests
* removes debug logs
* minor fixes
* re-arrage pid updates
* Merge refs/heads/master into init-sync-50K-skipped-slots
  • Loading branch information
farazdagi committed May 25, 2020
1 parent 277fa33 commit f81c8c6
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 52 deletions.
93 changes: 70 additions & 23 deletions beacon-chain/sync/initial-sync/blocks_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ const (
peerLocksPollingInterval = 5 * time.Minute
// peerLockMaxAge is maximum time before stale lock is purged.
peerLockMaxAge = 60 * time.Minute
// nonSkippedSlotsFullSearchEpochs how many epochs to check in full, before resorting to random
// sampling of slots once per epoch
nonSkippedSlotsFullSearchEpochs = 10
)

var (
Expand Down Expand Up @@ -586,45 +589,89 @@ func (f *blocksFetcher) filterPeers(peers []peer.ID, peersPercentage float64) []
return peers
}

// nonSkippedSlotAfter checks slots after the given one in an attempt to find non-empty future slot.
// nonSkippedSlotAfter checks slots after the given one in an attempt to find a non-empty future slot.
// For efficiency only one random slot is checked per epoch, so returned slot might not be the first
// non-skipped slot. This shouldn't be a problem, as in case of adversary peer, we might get incorrect
// data anyway, so code that relies on this function must be robust enough to re-request, if no progress
// is possible with a returned value.
func (f *blocksFetcher) nonSkippedSlotAfter(ctx context.Context, slot uint64) (uint64, error) {
ctx, span := trace.StartSpan(ctx, "initialsync.nonSkippedSlotAfter")
defer span.End()

headEpoch := helpers.SlotToEpoch(f.headFetcher.HeadSlot())
_, epoch, peers := f.p2p.Peers().BestFinalized(params.BeaconConfig().MaxPeersToSync, headEpoch)
peers = f.filterPeers(peers, peersPercentagePerRequest)
if len(peers) == 0 {
return 0, errNoPeersAvailable
}
randGenerator := rand.New(rand.NewSource(roughtime.Now().UnixNano()))
slotsPerEpoch := params.BeaconConfig().SlotsPerEpoch
pidInd := 0

randGenerator := rand.New(rand.NewSource(roughtime.Now().Unix()))
nextPID := func() peer.ID {
randGenerator.Shuffle(len(peers), func(i, j int) {
peers[i], peers[j] = peers[j], peers[i]
})
return peers[0]
}

for slot <= helpers.StartSlot(epoch+1) {
fetch := func(pid peer.ID, start, count, step uint64) (uint64, error) {
req := &p2ppb.BeaconBlocksByRangeRequest{
StartSlot: slot + 1,
Count: f.blocksPerSecond,
Step: 1,
StartSlot: start,
Count: count,
Step: step,
}

blocks, err := f.requestBlocks(ctx, req, nextPID())
blocks, err := f.requestBlocks(ctx, req, pid)
if err != nil {
return slot, err
return 0, err
}

if len(blocks) > 0 {
slots := make([]uint64, len(blocks))
for i, block := range blocks {
slots[i] = block.Block.Slot
for _, block := range blocks {
if block.Block.Slot > slot {
return block.Block.Slot, nil
}
}
return blocks[0].Block.Slot, nil
}
slot += f.blocksPerSecond
return 0, nil
}

return slot, nil
// Start by checking several epochs fully, w/o resorting to random sampling.
start := slot + 1
end := start + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
for ind := start; ind < end; ind += slotsPerEpoch {
nextSlot, err := fetch(peers[pidInd%len(peers)], ind, slotsPerEpoch, 1)
if err != nil {
return 0, err
}
if nextSlot > slot {
return nextSlot, nil
}
pidInd++
}

// Quickly find the close enough epoch where a non-empty slot definitely exists.
// Only single random slot per epoch is checked - allowing to move forward relatively quickly.
slot = slot + nonSkippedSlotsFullSearchEpochs*slotsPerEpoch
upperBoundSlot := helpers.StartSlot(epoch + 1)
for ind := slot + 1; ind < upperBoundSlot; ind += (slotsPerEpoch * slotsPerEpoch) / 2 {
start := ind + uint64(randGenerator.Intn(int(slotsPerEpoch)))
nextSlot, err := fetch(peers[pidInd%len(peers)], start, slotsPerEpoch/2, slotsPerEpoch)
if err != nil {
return 0, err
}
pidInd++
if nextSlot > slot && upperBoundSlot >= nextSlot {
upperBoundSlot = nextSlot
break
}
}

// Epoch with non-empty slot is located. Check all slots within two nearby epochs.
if upperBoundSlot > slotsPerEpoch {
upperBoundSlot -= slotsPerEpoch
}
upperBoundSlot = helpers.StartSlot(helpers.SlotToEpoch(upperBoundSlot))
nextSlot, err := fetch(peers[pidInd%len(peers)], upperBoundSlot, slotsPerEpoch*2, 1)
if err != nil {
return 0, err
}
if nextSlot < slot || helpers.StartSlot(epoch+1) < nextSlot {
return 0, errors.New("invalid range for non-skipped slot")
}
return nextSlot, nil
}

// bestFinalizedSlot returns the highest finalized slot of the majority of connected peers.
Expand Down
95 changes: 66 additions & 29 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package initialsync

import (
"context"
"fmt"
"reflect"
"sort"
"sync"
Expand Down Expand Up @@ -626,26 +627,28 @@ func TestBlocksFetcherSelectFailOverPeer(t *testing.T) {
}

func TestBlocksFetcherNonSkippedSlotAfter(t *testing.T) {
peersGen := func(size int) []*peerData {
blocks := append(makeSequence(1, 64), makeSequence(500, 640)...)
blocks = append(blocks, makeSequence(51200, 51264)...)
blocks = append(blocks, 55000)
blocks = append(blocks, makeSequence(57000, 57256)...)
var peers []*peerData
for i := 0; i < size; i++ {
peers = append(peers, &peerData{
blocks: blocks,
finalizedEpoch: 1800,
headSlot: 57000,
})
}
return peers
}
chainConfig := struct {
expectedBlockSlots []uint64
peers []*peerData
peers []*peerData
}{
expectedBlockSlots: makeSequence(1, 320),
peers: []*peerData{
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 320,
},
{
blocks: append(makeSequence(1, 64), makeSequence(500, 640)...),
finalizedEpoch: 18,
headSlot: 320,
},
},
peers: peersGen(5),
}

mc, p2p, _ := initializeTestServices(t, chainConfig.expectedBlockSlots, chainConfig.peers)
mc, p2p, _ := initializeTestServices(t, []uint64{}, chainConfig.peers)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -654,24 +657,58 @@ func TestBlocksFetcherNonSkippedSlotAfter(t *testing.T) {
&blocksFetcherConfig{
headFetcher: mc,
p2p: p2p,
})

},
)
fetcher.rateLimiter = leakybucket.NewCollector(6400, 6400, false)
seekSlots := map[uint64]uint64{
0: 1,
10: 11,
32: 33,
63: 64,
64: 500,
0: 1,
10: 11,
31: 32,
32: 33,
63: 64,
64: 500,
160: 500,
352: 500,
480: 500,
512: 513,
639: 640,
640: 51200,
6640: 51200,
51200: 51201,
}
for seekSlot, expectedSlot := range seekSlots {
slot, err := fetcher.nonSkippedSlotAfter(ctx, seekSlot)
if err != nil {
t.Error(err)
t.Run(fmt.Sprintf("range: %d (%d-%d)", expectedSlot-seekSlot, seekSlot, expectedSlot), func(t *testing.T) {
slot, err := fetcher.nonSkippedSlotAfter(ctx, seekSlot)
if err != nil {
t.Error(err)
}
if slot != expectedSlot {
t.Errorf("unexpected slot, want: %v, got: %v", expectedSlot, slot)
}
})
}

t.Run("test isolated non-skipped slot", func(t *testing.T) {
seekSlot := uint64(51264)
expectedSlot := uint64(55000)
found := false
var i int
for i = 0; i < 100; i++ {
slot, err := fetcher.nonSkippedSlotAfter(ctx, seekSlot)
if err != nil {
t.Error(err)
}
if slot == expectedSlot {
found = true
break
}
}
if slot != expectedSlot {
t.Errorf("unexpected slot, want: %v, got: %v", expectedSlot, slot)
if !found {
t.Errorf("Isolated non-skipped slot not found in %d iterations: %v", i, expectedSlot)
} else {
t.Logf("Isolated non-skipped slot found in %d iterations", i)
}
}
})
}

func TestBlocksFetcherFilterPeers(t *testing.T) {
Expand Down

0 comments on commit f81c8c6

Please sign in to comment.