Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch Block Roots Requesting #7027

Merged
merged 14 commits into from
Aug 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion beacon-chain/p2p/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ const maxBadResponses = 5
const cacheNumCounters, cacheMaxCost, cacheBufferItems = 1000, 1000, 64

// maxDialTimeout is the timeout for a single peer dial.
const maxDialTimeout = 30 * time.Second
var maxDialTimeout = params.BeaconNetworkConfig().RespTimeout

// Service for managing peer to peer (p2p) networking.
type Service struct {
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ go_test(
"//shared/bytesutil:go_default_library",
"//shared/featureconfig:go_default_library",
"//shared/params:go_default_library",
"//shared/rand:go_default_library",
"//shared/roughtime:go_default_library",
"//shared/testutil:go_default_library",
"//shared/testutil/assert:go_default_library",
Expand Down
36 changes: 3 additions & 33 deletions beacon-chain/sync/pending_attestations_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@ package sync
import (
"context"
"encoding/hex"
"io"
"sync"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bls"
Expand All @@ -17,7 +15,6 @@ import (
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)
Expand Down Expand Up @@ -47,8 +44,6 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()

pids := s.p2p.Peers().Connected()

// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
Expand All @@ -61,6 +56,7 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
}
s.pendingAttsLock.RUnlock()

pendingRoots := [][32]byte{}
randGen := rand.NewGenerator()
for _, bRoot := range roots {
s.pendingAttsLock.RLock()
Expand Down Expand Up @@ -123,36 +119,10 @@ func (s *Service) processPendingAtts(ctx context.Context) error {
"attCount": len(attestations),
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
}).Debug("Requesting block for pending attestation")

// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer or equal to the pending attestation's target boundary slot.
// If there are no peer id's available, then we should exit from this function. The function will
// be run again periodically, and there may be peers available in future runs.
if len(pids) == 0 {
log.Debug("No peer IDs available to request missing block from for pending attestation")
return nil
}
pid := pids[randGen.Int()%len(pids)]
targetSlot := helpers.SlotToEpoch(attestations[0].Message.Aggregate.Data.Target.Epoch)
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "could not get chain state for peer")
}
if cs != nil && cs.HeadSlot >= targetSlot {
pid = p
break
}
}

req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil && err == io.EOF {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
pendingRoots = append(pendingRoots, bRoot)
}
}
return nil
return s.sendBatchRootRequest(ctx, pendingRoots, randGen)
}

// This defines how pending attestations is saved in the map. The key is the
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/pending_attestations_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) {
r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{Genesis: roughtime.Now()},
chain: &mock.ChainService{Genesis: roughtime.Now(), FinalizedCheckPoint: &ethpb.Checkpoint{}},
blkRootToPendingAtts: make(map[[32]byte][]*ethpb.SignedAggregateAttestationAndProof),
stateSummaryCache: cache.NewStateSummaryCache(),
}
Expand Down
69 changes: 50 additions & 19 deletions beacon-chain/sync/pending_blocks_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/slotutil"
Expand All @@ -22,6 +23,9 @@ import (

var processPendingBlocksPeriod = slotutil.DivideSlotBy(3 /* times per slot */)

const maxPeerRequest = 50
const numOfTries = 5

// processes pending blocks queue on every processPendingBlocksPeriod
func (s *Service) processPendingBlocksQueue() {
ctx := context.Background()
Expand All @@ -46,6 +50,7 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
return errors.Wrap(err, "could not validate pending slots")
}
slots := s.sortedPendingSlots()
parentRoots := [][32]byte{}

span.AddAttributes(
trace.Int64Attribute("numSlots", int64(len(slots))),
Expand Down Expand Up @@ -110,26 +115,8 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
"currentSlot": b.Block.Slot,
"parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)),
}).Info("Requesting parent block")
req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)}

// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer than the block slot we are requesting.
pid := pids[randGen.Int()%len(pids)]
for _, p := range pids {
cs, err := s.p2p.Peers().ChainState(p)
if err != nil {
return errors.Wrap(err, "failed to read chain state for peer")
}
if cs != nil && cs.HeadSlot >= slot {
pid = p
break
}
}
parentRoots = append(parentRoots, bytesutil.ToBytes32(b.Block.ParentRoot))

if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
span.End()
continue
}
Expand Down Expand Up @@ -162,6 +149,50 @@ func (s *Service) processPendingBlocks(ctx context.Context) error {
}
}

return s.sendBatchRootRequest(ctx, parentRoots, randGen)
}

func (s *Service) sendBatchRootRequest(ctx context.Context, roots [][32]byte, randGen *rand.Rand) error {
ctx, span := trace.StartSpan(ctx, "sendBatchRootRequest")
defer span.End()

if len(roots) == 0 {
return nil
}

_, bestPeers := s.p2p.Peers().BestFinalized(maxPeerRequest, s.chain.FinalizedCheckpt().Epoch)
if len(bestPeers) == 0 {
return nil
}
roots = s.dedupRoots(roots)
// Randomly choose a peer to query from our best peers. If that peer cannot return
// all the requested blocks, we randomly select another peer.
pid := bestPeers[randGen.Int()%len(bestPeers)]
for i := 0; i < numOfTries; i++ {
req := roots
if len(roots) > int(params.BeaconNetworkConfig().MaxRequestBlocks) {
req = roots[:params.BeaconNetworkConfig().MaxRequestBlocks]
}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Debugf("Could not send recent block request: %v", err)
}
newRoots := make([][32]byte, 0, len(roots))
s.pendingQueueLock.RLock()
for _, rt := range roots {
if !s.seenPendingBlocks[rt] {
newRoots = append(newRoots, rt)
}
}
s.pendingQueueLock.RUnlock()
if len(newRoots) == 0 {
break
}
// Choosing a new peer with the leftover set of
// roots to request.
roots = newRoots
pid = bestPeers[randGen.Int()%len(bestPeers)]
}
return nil
}

Expand Down
82 changes: 82 additions & 0 deletions beacon-chain/sync/pending_blocks_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"math"
"sync"
"testing"
"time"

"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p-core/network"
Expand All @@ -17,6 +18,8 @@ import (
p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing"
"github.com/prysmaticlabs/prysm/beacon-chain/state/stateutil"
pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1"
"github.com/prysmaticlabs/prysm/shared/rand"
"github.com/prysmaticlabs/prysm/shared/testutil"
"github.com/prysmaticlabs/prysm/shared/testutil/assert"
"github.com/prysmaticlabs/prysm/shared/testutil/require"
)
Expand Down Expand Up @@ -278,3 +281,82 @@ func TestService_sortedPendingSlots(t *testing.T) {
want := []uint64{lastSlot - 5, lastSlot - 3, lastSlot - 2, lastSlot}
assert.DeepEqual(t, want, r.sortedPendingSlots(), "Unexpected pending slots list")
}

func TestService_BatchRootRequest(t *testing.T) {
db, _ := dbtest.SetupDB(t)
p1 := p2ptest.NewTestP2P(t)
p2 := p2ptest.NewTestP2P(t)
p1.Connect(p2)
assert.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")

r := &Service{
p2p: p1,
db: db,
chain: &mock.ChainService{
FinalizedCheckPoint: &ethpb.Checkpoint{
Epoch: 1,
},
},
slotToPendingBlocks: make(map[uint64][]*ethpb.SignedBeaconBlock),
seenPendingBlocks: make(map[[32]byte]bool),
}

err := r.initCaches()
require.NoError(t, err)
p1.Peers().Add(new(enr.Record), p2.PeerID(), nil, network.DirOutbound)
p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected)
p1.Peers().SetChainState(p2.PeerID(), &pb.Status{FinalizedEpoch: 2})

b0 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{}}
require.NoError(t, r.db.SaveBlock(context.Background(), b0))
b0Root, err := stateutil.BlockRoot(b0.Block)
require.NoError(t, err)
b1 := &ethpb.SignedBeaconBlock{Block: &ethpb.BeaconBlock{Slot: 1, ParentRoot: b0Root[:]}}
require.NoError(t, r.db.SaveBlock(context.Background(), b1))
b1Root, err := stateutil.BlockRoot(b1.Block)
require.NoError(t, err)

b2 := &ethpb.BeaconBlock{Slot: 2, ParentRoot: b1Root[:]}
b2Root, err := ssz.HashTreeRoot(b2)
require.NoError(t, err)
b5 := &ethpb.BeaconBlock{Slot: 5, ParentRoot: b2Root[:]}
b5Root, err := ssz.HashTreeRoot(b5)
require.NoError(t, err)
b3 := &ethpb.BeaconBlock{Slot: 3, ParentRoot: b0Root[:]}
b3Root, err := ssz.HashTreeRoot(b3)
require.NoError(t, err)
b4 := &ethpb.BeaconBlock{Slot: 4, ParentRoot: b3Root[:]}
b4Root, err := ssz.HashTreeRoot(b4)
require.NoError(t, err)

// Send in duplicated roots to also test deduplicaton.
sentRoots := [][32]byte{b2Root, b2Root, b3Root, b3Root, b4Root, b5Root}
expectedRoots := [][32]byte{b2Root, b3Root, b4Root, b5Root}

pcl := protocol.ID("/eth2/beacon_chain/req/beacon_blocks_by_root/1/ssz_snappy")
var wg sync.WaitGroup
wg.Add(1)
p2.BHost.SetStreamHandler(pcl, func(stream network.Stream) {
defer wg.Done()
out := [][32]byte{}
assert.NoError(t, p2.Encoding().DecodeWithMaxLength(stream, &out))
assert.DeepEqual(t, expectedRoots, out, "Did not receive expected message")
response := []*ethpb.SignedBeaconBlock{{Block: b2},
{Block: b3}, {Block: b4}, {Block: b5}}
for _, blk := range response {
_, err := stream.Write([]byte{responseCodeSuccess})
assert.NoError(t, err, "Failed to write to stream")
_, err = p2.Encoding().EncodeWithMaxLength(stream, blk)
assert.NoError(t, err, "Could not send response back")
}
assert.NoError(t, stream.Close())
})

require.NoError(t, r.sendBatchRootRequest(context.Background(), sentRoots, rand.NewGenerator()))

if testutil.WaitTimeout(&wg, 1*time.Second) {
t.Fatal("Did not receive stream within 1 sec")
}
assert.Equal(t, 4, len(r.slotToPendingBlocks), "Incorrect size for slot to pending blocks cache")
assert.Equal(t, 4, len(r.seenPendingBlocks), "Incorrect size for seen pending block")
}
13 changes: 13 additions & 0 deletions beacon-chain/sync/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,19 @@ func (s *Service) dedupBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][
return newBlks, newRoots, nil
}

func (s *Service) dedupRoots(roots [][32]byte) [][32]byte {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this tested? I think we can add a duplicated root in TestService_BatchRootRequest to cover this function?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, will add that in

newRoots := make([][32]byte, 0, len(roots))
rootMap := make(map[[32]byte]bool, len(roots))
for i, r := range roots {
if rootMap[r] {
continue
}
rootMap[r] = true
newRoots = append(newRoots, roots[i])
}
return newRoots
}

// sort the provided blocks and roots in ascending order. This method assumes that the size of
// block slice and root slice is equal.
func (s *Service) sortBlocksAndRoots(blks []*ethpb.SignedBeaconBlock, roots [][32]byte) ([]*ethpb.SignedBeaconBlock, [][32]byte) {
Expand Down