diff --git a/beacon-chain/db/kv/blocks.go b/beacon-chain/db/kv/blocks.go index d5a76a50857..0596548a6a2 100644 --- a/beacon-chain/db/kv/blocks.go +++ b/beacon-chain/db/kv/blocks.go @@ -16,6 +16,9 @@ import ( "go.opencensus.io/trace" ) +// used to represent errors for inconsistent slot ranges. +var errInvalidSlotRange = errors.New("invalid end slot and start slot provided") + // Block retrieval by root. func (s *Store) Block(ctx context.Context, blockRoot [32]byte) (*ethpb.SignedBeaconBlock, error) { ctx, span := trace.StartSpan(ctx, "BeaconDB.Block") @@ -418,20 +421,18 @@ func fetchBlockRootsBySlotRange( } min := bytesutil.Uint64ToBytesBigEndian(startSlot) max := bytesutil.Uint64ToBytesBigEndian(endSlot) - var conditional func(key, max []byte) bool - if endSlot == 0 { - conditional = func(key, max []byte) bool { - return key != nil - } - } else { - conditional = func(key, max []byte) bool { - return key != nil && bytes.Compare(key, max) <= 0 - } + + conditional := func(key, max []byte) bool { + return key != nil && bytes.Compare(key, max) <= 0 } - rootsRange := (endSlot - startSlot) / step if endSlot < startSlot { - rootsRange = 0 + return nil, errInvalidSlotRange + } + // Return nothing with an end slot of 0. + if endSlot == 0 { + return [][]byte{}, nil } + rootsRange := (endSlot - startSlot) / step roots := make([][]byte, 0, rootsRange) c := bkt.Cursor() for k, v := c.Seek(min); conditional(k, max); k, v = c.Next() { diff --git a/beacon-chain/db/kv/blocks_test.go b/beacon-chain/db/kv/blocks_test.go index 1bedccb4c40..cf7c0b743cb 100644 --- a/beacon-chain/db/kv/blocks_test.go +++ b/beacon-chain/db/kv/blocks_test.go @@ -101,6 +101,55 @@ func TestStore_BlocksBatchDelete(t *testing.T) { } } +func TestStore_BlocksHandleZeroCase(t *testing.T) { + db := setupDB(t) + ctx := context.Background() + numBlocks := 10 + totalBlocks := make([]*ethpb.SignedBeaconBlock, numBlocks) + blockRoots := make([][32]byte, 0) + for i := 0; i < len(totalBlocks); i++ { + b := testutil.NewBeaconBlock() + b.Block.Slot = uint64(i) + b.Block.ParentRoot = bytesutil.PadTo([]byte("parent"), 32) + totalBlocks[i] = b + r, err := totalBlocks[i].Block.HashTreeRoot() + require.NoError(t, err) + blockRoots = append(blockRoots, r) + } + require.NoError(t, db.SaveBlocks(ctx, totalBlocks)) + zeroFilter := filters.NewFilter().SetStartSlot(0).SetEndSlot(0) + retrieved, _, err := db.Blocks(ctx, zeroFilter) + require.NoError(t, err) + assert.Equal(t, 0, len(retrieved), "Unexpected number of blocks received, expected none") +} + +func TestStore_BlocksHandleInvalidEndSlot(t *testing.T) { + db := setupDB(t) + ctx := context.Background() + numBlocks := 10 + totalBlocks := make([]*ethpb.SignedBeaconBlock, numBlocks) + blockRoots := make([][32]byte, 0) + // Save blocks from slot 1 onwards. + for i := 0; i < len(totalBlocks); i++ { + b := testutil.NewBeaconBlock() + b.Block.Slot = uint64(i) + 1 + b.Block.ParentRoot = bytesutil.PadTo([]byte("parent"), 32) + totalBlocks[i] = b + r, err := totalBlocks[i].Block.HashTreeRoot() + require.NoError(t, err) + blockRoots = append(blockRoots, r) + } + require.NoError(t, db.SaveBlocks(ctx, totalBlocks)) + badFilter := filters.NewFilter().SetStartSlot(5).SetEndSlot(1) + _, _, err := db.Blocks(ctx, badFilter) + require.ErrorContains(t, errInvalidSlotRange.Error(), err) + + goodFilter := filters.NewFilter().SetStartSlot(0).SetEndSlot(1) + requested, _, err := db.Blocks(ctx, goodFilter) + require.NoError(t, err) + assert.Equal(t, 1, len(requested), "Unexpected number of blocks received, only expected two") +} + func TestStore_GenesisBlock(t *testing.T) { db := setupDB(t) ctx := context.Background() @@ -188,7 +237,7 @@ func TestStore_Blocks_FiltersCorrectly(t *testing.T) { expectedNumBlocks: 2, }, { - filter: filters.NewFilter().SetStartSlot(5), + filter: filters.NewFilter().SetStartSlot(5).SetEndSlot(9), expectedNumBlocks: 4, }, { diff --git a/beacon-chain/state/stategen/replay.go b/beacon-chain/state/stategen/replay.go index faf7d912366..2b0673d8188 100644 --- a/beacon-chain/state/stategen/replay.go +++ b/beacon-chain/state/stategen/replay.go @@ -53,6 +53,11 @@ func (s *State) ReplayBlocks(ctx context.Context, state *stateTrie.BeaconState, // LoadBlocks loads the blocks between start slot and end slot by recursively fetching from end block root. // The Blocks are returned in slot-descending order. func (s *State) LoadBlocks(ctx context.Context, startSlot, endSlot uint64, endBlockRoot [32]byte) ([]*ethpb.SignedBeaconBlock, error) { + // Nothing to load for invalid range. + // TODO(#7620): Return error for invalid range. + if endSlot < startSlot { + return nil, nil + } filter := filters.NewFilter().SetStartSlot(startSlot).SetEndSlot(endSlot) blocks, blockRoots, err := s.beaconDB.Blocks(ctx, filter) if err != nil { diff --git a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go index 3762bfe2f41..99467a5de6f 100644 --- a/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go +++ b/beacon-chain/sync/rpc_beacon_blocks_by_range_test.go @@ -82,6 +82,71 @@ func TestRPCBeaconBlocksByRange_RPCHandlerReturnsBlocks(t *testing.T) { } } +func TestRPCBeaconBlocksByRange_ReturnCorrectNumberBack(t *testing.T) { + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected") + d, _ := db.SetupDB(t) + + req := &pb.BeaconBlocksByRangeRequest{ + StartSlot: 0, + Step: 1, + Count: 200, + } + + genRoot := [32]byte{} + // Populate the database with blocks that would match the request. + for i := req.StartSlot; i < req.StartSlot+(req.Step*req.Count); i += req.Step { + blk := testutil.NewBeaconBlock() + blk.Block.Slot = i + if i == 0 { + rt, err := blk.Block.HashTreeRoot() + require.NoError(t, err) + genRoot = rt + } + require.NoError(t, d.SaveBlock(context.Background(), blk)) + } + require.NoError(t, d.SaveGenesisBlockRoot(context.Background(), genRoot)) + + // Start service with 160 as allowed blocks capacity (and almost zero capacity recovery). + r := &Service{p2p: p1, db: d, chain: &chainMock.ChainService{}, rateLimiter: newRateLimiter(p1)} + pcl := protocol.ID("/testing") + topic := string(pcl) + r.rateLimiter.limiterMap[topic] = leakybucket.NewCollector(0.000001, int64(req.Count*10), false) + var wg sync.WaitGroup + wg.Add(1) + + // Use a new request to test this out + newReq := &pb.BeaconBlocksByRangeRequest{StartSlot: 0, Step: 1, Count: 1} + + p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) { + defer wg.Done() + for i := newReq.StartSlot; i < newReq.StartSlot+newReq.Count*newReq.Step; i += newReq.Step { + expectSuccess(t, stream) + res := testutil.NewBeaconBlock() + assert.NoError(t, r.p2p.Encoding().DecodeWithMaxLength(stream, res)) + if (res.Block.Slot-newReq.StartSlot)%newReq.Step != 0 { + t.Errorf("Received unexpected block slot %d", res.Block.Slot) + } + // Expect EOF + b := make([]byte, 1) + _, err := stream.Read(b) + require.ErrorContains(t, io.EOF.Error(), err) + } + }) + + stream1, err := p1.BHost.NewStream(context.Background(), p2.BHost.ID(), pcl) + require.NoError(t, err) + + err = r.beaconBlocksByRangeRPCHandler(context.Background(), newReq, stream1) + require.NoError(t, err) + + if testutil.WaitTimeout(&wg, 1*time.Second) { + t.Fatal("Did not receive stream within 1 sec") + } +} + func TestRPCBeaconBlocksByRange_RPCHandlerReturnsSortedBlocks(t *testing.T) { p1 := p2ptest.NewTestP2P(t) p2 := p2ptest.NewTestP2P(t)