Skip to content

Commit

Permalink
Sort Blocks Before Returning them To Peer (#5894)
Browse files Browse the repository at this point in the history
* add tests and check
* lint
* Merge refs/heads/master into sortBlocks
  • Loading branch information
nisdas committed May 18, 2020
1 parent e16f384 commit eb77f56
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 0 deletions.
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Expand Up @@ -26,6 +26,7 @@ go_library(
"subscriber_beacon_blocks.go",
"subscriber_committee_index_beacon_attestation.go",
"subscriber_handlers.go",
"utils.go",
"validate_aggregate_proof.go",
"validate_attester_slashing.go",
"validate_beacon_blocks.go",
Expand Down Expand Up @@ -112,6 +113,7 @@ go_test(
"subscriber_committee_index_beacon_attestation_test.go",
"subscriber_test.go",
"sync_test.go",
"utils_test.go",
"validate_aggregate_proof_test.go",
"validate_attester_slashing_test.go",
"validate_beacon_blocks_test.go",
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_range.go
Expand Up @@ -144,6 +144,7 @@ func (r *Service) writeBlockRangeToStream(ctx context.Context, startSlot, endSlo
blks = append([]*ethpb.SignedBeaconBlock{genBlock}, blks...)
roots = append([][32]byte{genRoot}, roots...)
}
blks, roots = r.sortBlocksAndRoots(blks, roots)
checkpoint, err := r.db.FinalizedCheckpoint(ctx)
if err != nil {
log.WithError(err).Error("Failed to retrieve finalized checkpoint")
Expand Down
60 changes: 60 additions & 0 deletions beacon-chain/sync/rpc_beacon_blocks_by_range_test.go
Expand Up @@ -83,6 +83,66 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) {
}
}

func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
if len(p1.Host.Network().Peers()) != 1 {
t.Error("Expected peers to be connected")
}
d := db.SetupDB(t)

req := &pb.BeaconBlocksByRangeRequest{
StartSlot: 200,
Step: 21,
Count: 33,
}

endSlot := req.StartSlot + (req.Step * (req.Count - 1))
// Populate the database with blocks that would match the request.
for i := endSlot; i >= req.StartSlot; i -= req.Step {
if err := d.SaveBlock(context.Background(), &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: i}}); err != nil {
t.Fatal(err)
}
}

// Start service with 160 as allowed blocks capacity (and almost zero capacity recovery).
r := &Service{p2p: p1, db: d, blocksRateLimiter: leakybucket.NewCollector(0.000001, int64(req.Count*10), false)}
pcl := protocol.ID("/testing")

var wg sync.WaitGroup
wg.Add(1)
p2.Host.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
prevSlot := uint64(0)
for i := req.StartSlot; i < req.StartSlot+req.Count*req.Step; i += req.Step {
expectSuccess(t, r, stream)
res := &ethpb.SignedBeaconBlock{}
if err := r.p2p.Encoding().DecodeWithLength(stream, res); err != nil {
t.Error(err)
}
if res.Block.Slot < prevSlot {
t.Errorf("Received block is unsorted with slot %d lower than previous slot %d", res.Block.Slot, prevSlot)
}
prevSlot = res.Block.Slot
}
})

stream1, err := p1.Host.NewStream(context.Background(), p2.Host.ID(), pcl)
if err != nil {
t.Fatal(err)
}

err = r.beaconBlocksByRangeRPCHandler(context.Background(), req, stream1)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
}

func TestRPCBeaconBlocksByRange_ReturnsGenesisBlock(t *testing.T) {
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
Expand Down
39 changes: 39 additions & 0 deletions beacon-chain/sync/utils.go
@@ -0,0 +1,39 @@
package sync

import (
"sort"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
)

// A type to represent beacon blocks and roots which have methods
// which satisfy the Interface in `Sort` so that this type can
// be sorted in ascending order.
type sortedObj struct {
blks []*ethpb.SignedBeaconBlock
roots [][32]byte
}

func (s sortedObj) Less(i, j int) bool {
return s.blks[i].Block.Slot < s.blks[j].Block.Slot
}

func (s sortedObj) Swap(i, j int) {
s.blks[i], s.blks[j] = s.blks[j], s.blks[i]
s.roots[i], s.roots[j] = s.roots[j], s.roots[i]
}

func (s sortedObj) Len() int {
return len(s.blks)
}

// sort the provided blocks and roots in ascending order. This method assumes that the size of
// block slice and root slice is equal.
func (r *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {
obj := sortedObj{
blks: blks,
roots: roots,
}
sort.Sort(obj)
return obj.blks, obj.roots
}
42 changes: 42 additions & 0 deletions beacon-chain/sync/utils_test.go
@@ -0,0 +1,42 @@
package sync

import (
"math/rand"
"testing"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
)

func TestSortedObj_SortBlocksRoots(t *testing.T) {
source := rand.NewSource(33)
randGen := rand.New(source)
blks := []*ethpb.SignedBeaconBlock{}
roots := [][32]byte{}
randFunc := func() int64 {
return randGen.Int63n(50)
}

for i := 0; i < 10; i++ {
slot := uint64(randFunc())
newBlk := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: slot}}
blks = append(blks, newBlk)
root := bytesutil.ToBytes32(bytesutil.Bytes32(slot))
roots = append(roots, root)
}

r := &Service{}

newBlks, newRoots := r.sortBlocksAndRoots(blks, roots)

previousSlot := uint64(0)
for i, b := range newBlks {
if b.Block.Slot < previousSlot {
t.Errorf("Block list is not sorted as %d is smaller than previousSlot %d", b.Block.Slot, previousSlot)
}
if bytesutil.FromBytes8(newRoots[i][:]) != b.Block.Slot {
t.Errorf("root doesn't match stored slot in block: wanted %d but got %d", b.Block.Slot, bytesutil.FromBytes8(newRoots[i][:]))
}
previousSlot = b.Block.Slot
}
}

0 comments on commit eb77f56

Please sign in to comment.