diff --git a/beacon-chain/blockchain/chain_info.go b/beacon-chain/blockchain/chain_info.go index aff67ec9582..176696d5d49 100644 --- a/beacon-chain/blockchain/chain_info.go +++ b/beacon-chain/blockchain/chain_info.go @@ -22,9 +22,10 @@ type ChainInfoFetcher interface { FinalizationFetcher } -// GenesisTimeFetcher retrieves the Eth2 genesis timestamp. -type GenesisTimeFetcher interface { +// TimeFetcher retrieves the Eth2 data that's related to time. +type TimeFetcher interface { GenesisTime() time.Time + CurrentSlot() uint64 } // HeadFetcher defines a common interface for methods in blockchain service which diff --git a/beacon-chain/blockchain/chain_info_test.go b/beacon-chain/blockchain/chain_info_test.go index 23da22bc022..fe7ad21acea 100644 --- a/beacon-chain/blockchain/chain_info_test.go +++ b/beacon-chain/blockchain/chain_info_test.go @@ -18,7 +18,7 @@ import ( // Ensure Service implements chain info interface. var _ = ChainInfoFetcher(&Service{}) -var _ = GenesisTimeFetcher(&Service{}) +var _ = TimeFetcher(&Service{}) var _ = ForkFetcher(&Service{}) func TestFinalizedCheckpt_Nil(t *testing.T) { diff --git a/beacon-chain/blockchain/process_block_helpers.go b/beacon-chain/blockchain/process_block_helpers.go index f53a1716dc5..b96592e6422 100644 --- a/beacon-chain/blockchain/process_block_helpers.go +++ b/beacon-chain/blockchain/process_block_helpers.go @@ -20,6 +20,11 @@ import ( "go.opencensus.io/trace" ) +// CurrentSlot returns the current slot based on time. +func (s *Service) CurrentSlot() uint64 { + return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot +} + // getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block // to retrieve the state in DB. It verifies the pre state's validity and the incoming block // is in the correct time window. @@ -199,7 +204,7 @@ func (s *Service) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot // Otherwise, delay incorporation of new justified checkpoint until next epoch boundary. // See https://ethresear.ch/t/prevention-of-bouncing-attack-on-ffg/6114 for more detailed analysis and discussion. func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustifiedCheckpt *ethpb.Checkpoint) (bool, error) { - if helpers.SlotsSinceEpochStarts(s.currentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified { + if helpers.SlotsSinceEpochStarts(s.CurrentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified { return true, nil } newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root)) @@ -261,11 +266,6 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt) } -// currentSlot returns the current slot based on time. -func (s *Service) currentSlot() uint64 { - return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot -} - // This saves every finalized state in DB during initial sync, needed as part of optimization to // use cache state during initial sync in case of restart. func (s *Service) saveInitState(ctx context.Context, state *stateTrie.BeaconState) error { diff --git a/beacon-chain/blockchain/testing/mock.go b/beacon-chain/blockchain/testing/mock.go index fd32d9375f7..289486db6cf 100644 --- a/beacon-chain/blockchain/testing/mock.go +++ b/beacon-chain/blockchain/testing/mock.go @@ -193,6 +193,11 @@ func (ms *ChainService) GenesisTime() time.Time { return ms.Genesis } +// CurrentSlot mocks the same method in the chain service. +func (ms *ChainService) CurrentSlot() uint64 { + return 0 +} + // Participation mocks the same method in the chain service. func (ms *ChainService) Participation(epoch uint64) *precompute.Balance { return ms.Balance diff --git a/beacon-chain/rpc/node/server.go b/beacon-chain/rpc/node/server.go index 1fe12ea18b1..fae108191da 100644 --- a/beacon-chain/rpc/node/server.go +++ b/beacon-chain/rpc/node/server.go @@ -26,7 +26,7 @@ type Server struct { Server *grpc.Server BeaconDB db.ReadOnlyDatabase PeersFetcher p2p.PeersProvider - GenesisTimeFetcher blockchain.GenesisTimeFetcher + GenesisTimeFetcher blockchain.TimeFetcher } // GetSyncStatus checks the current network sync status of the node. diff --git a/beacon-chain/rpc/service.go b/beacon-chain/rpc/service.go index 0872cf68c8e..0150ebf13bc 100644 --- a/beacon-chain/rpc/service.go +++ b/beacon-chain/rpc/service.go @@ -58,7 +58,7 @@ type Service struct { forkFetcher blockchain.ForkFetcher finalizationFetcher blockchain.FinalizationFetcher participationFetcher blockchain.ParticipationFetcher - genesisTimeFetcher blockchain.GenesisTimeFetcher + genesisTimeFetcher blockchain.TimeFetcher attestationReceiver blockchain.AttestationReceiver blockReceiver blockchain.BlockReceiver powChainService powchain.Chain @@ -104,7 +104,7 @@ type Config struct { BlockReceiver blockchain.BlockReceiver POWChainService powchain.Chain ChainStartFetcher powchain.ChainStartFetcher - GenesisTimeFetcher blockchain.GenesisTimeFetcher + GenesisTimeFetcher blockchain.TimeFetcher MockEth1Votes bool AttestationsPool attestations.Pool ExitPool *voluntaryexits.Pool diff --git a/beacon-chain/sync/BUILD.bazel b/beacon-chain/sync/BUILD.bazel index 6ca6b036101..5345987d199 100644 --- a/beacon-chain/sync/BUILD.bazel +++ b/beacon-chain/sync/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "error.go", "log.go", "metrics.go", + "pending_attestations_queue.go", "pending_blocks_queue.go", "rpc.go", "rpc_beacon_blocks_by_range.go", @@ -79,6 +80,7 @@ go_test( size = "small", srcs = [ "error_test.go", + "pending_attestations_queue_test.go", "pending_blocks_queue_test.go", "rpc_beacon_blocks_by_range_test.go", "rpc_beacon_blocks_by_root_test.go", diff --git a/beacon-chain/sync/metrics.go b/beacon-chain/sync/metrics.go index c1a09290025..95a2e1e6557 100644 --- a/beacon-chain/sync/metrics.go +++ b/beacon-chain/sync/metrics.go @@ -33,4 +33,28 @@ var ( Help: "Count the number of times a node resyncs.", }, ) + numberOfBlocksRecoveredFromAtt = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "beacon_blocks_recovered_from_attestation_total", + Help: "Count the number of times a missing block recovered from attestation vote.", + }, + ) + numberOfBlocksNotRecoveredFromAtt = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "beacon_blocks_not_recovered_from_attestation_total", + Help: "Count the number of times a missing block not recovered and pruned from attestation vote.", + }, + ) + numberOfAttsRecovered = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "beacon_attestations_recovered_total", + Help: "Count the number of times attestation recovered because of missing block", + }, + ) + numberOfAttsNotRecovered = promauto.NewCounter( + prometheus.CounterOpts{ + Name: "beacon_attestations_not_recovered_total", + Help: "Count the number of times attestation not recovered and pruned because of missing block", + }, + ) ) diff --git a/beacon-chain/sync/pending_attestations_queue.go b/beacon-chain/sync/pending_attestations_queue.go new file mode 100644 index 00000000000..8ba806e7064 --- /dev/null +++ b/beacon-chain/sync/pending_attestations_queue.go @@ -0,0 +1,162 @@ +package sync + +import ( + "context" + "encoding/hex" + "time" + + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/bytesutil" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/runutil" + "github.com/prysmaticlabs/prysm/shared/traceutil" + "github.com/sirupsen/logrus" + "go.opencensus.io/trace" + "golang.org/x/exp/rand" +) + +// This defines how often a node cleans up and processes pending attestations in the queue. +var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second + +// This processes pending attestation queues on every `processPendingAttsPeriod`. +func (s *Service) processPendingAttsQueue() { + ctx := context.Background() + runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() { + s.processPendingAtts(ctx) + }) +} + +// This defines how pending attestations are processed. It contains features: +// 1. Clean up invalid pending attestations from the queue. +// 2. Check if pending attestations can be processed when the block has arrived. +// 3. Request block from a random peer if unable to proceed step 2. +func (s *Service) processPendingAtts(ctx context.Context) error { + ctx, span := trace.StartSpan(ctx, "processPendingAtts") + defer span.End() + + pids := s.p2p.Peers().Connected() + + // Before a node processes pending attestations queue, it verifies + // the attestations in the queue are still valid. Attestations will + // be deleted from the queue if invalid (ie. getting staled from falling too many slots behind). + s.validatePendingAtts(ctx, s.chain.CurrentSlot()) + + for bRoot, attestations := range s.blkRootToPendingAtts { + // Has the pending attestation's missing block arrived yet? + if s.db.HasBlock(ctx, bRoot) { + numberOfBlocksRecoveredFromAtt.Inc() + for _, att := range attestations { + // The pending attestations can arrive in both aggregated and unaggregated forms, + // each from has distinct validation steps. + if helpers.IsAggregated(att.Aggregate) { + // Save the pending aggregated attestation to the pool if it passes the aggregated + // validation steps. + if s.validateAggregatedAtt(ctx, att) { + if err := s.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil { + return err + } + numberOfAttsRecovered.Inc() + + // Broadcasting the attestation again once a node is able to process it. + if err := s.p2p.Broadcast(ctx, att); err != nil { + log.WithError(err).Error("Failed to broadcast") + } + } + } else { + // Save the pending unaggregated attestation to the pool if the BLS signature is + // valid. + if _, err := bls.SignatureFromBytes(att.Aggregate.Signature); err != nil { + continue + } + if err := s.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil { + return err + } + numberOfAttsRecovered.Inc() + + // Broadcasting the attestation again once a node is able to process it. + if err := s.p2p.Broadcast(ctx, att); err != nil { + log.WithError(err).Error("Failed to broadcast") + } + } + } + log.WithFields(logrus.Fields{ + "blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])), + "pendingAttsCount": len(attestations), + }).Info("Verified and saved pending attestations to pool") + + // Delete the missing block root key from pending attestation queue so a node will not request for the block again. + delete(s.blkRootToPendingAtts, bRoot) + } else { + // Pending attestation's missing block has not arrived yet. + log.WithField("blockRoot", hex.EncodeToString(bytesutil.Trunc(bRoot[:]))).Info("Requesting block for pending attestation") + + // Start with a random peer to query, but choose the first peer in our unsorted list that claims to + // have a head slot newer or equal to the pending attestation's target boundary slot. + pid := pids[rand.Int()%len(pids)] + targetSlot := helpers.SlotToEpoch(attestations[0].Aggregate.Data.Target.Epoch) + for _, p := range pids { + if cs, _ := s.p2p.Peers().ChainState(p); cs != nil && cs.HeadSlot >= targetSlot { + pid = p + break + } + } + + req := [][32]byte{bRoot} + if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil { + traceutil.AnnotateError(span, err) + log.Errorf("Could not send recent block request: %v", err) + } + } + } + return nil +} + +// This defines how pending attestations is saved in the map. The key is the +// root of the missing block. The value is the list of pending attestations +// that voted for that block root. +func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) { + s.pendingAttsLock.Lock() + defer s.pendingAttsLock.Unlock() + + root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot) + + _, ok := s.blkRootToPendingAtts[root] + if !ok { + s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att} + return + } + + s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att) +} + +// This validates the pending attestations in the queue are still valid. +// If not valid, a node will remove it in the queue in place. The validity +// check specifies the pending attestation could not fall one epoch behind +// of the current slot. +func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) { + s.pendingAttsLock.Lock() + defer s.pendingAttsLock.Unlock() + + ctx, span := trace.StartSpan(ctx, "validatePendingAtts") + defer span.End() + + for bRoot, atts := range s.blkRootToPendingAtts { + for i := len(atts) - 1; i >= 0; i-- { + if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch { + // Remove the pending attestation from the list in place. + atts = append(atts[:i], atts[i+1:]...) + numberOfAttsNotRecovered.Inc() + } + } + s.blkRootToPendingAtts[bRoot] = atts + + // If the pending attestations list of a given block root is empty, + // a node will remove the key from the map to avoid dangling keys. + if len(s.blkRootToPendingAtts[bRoot]) == 0 { + delete(s.blkRootToPendingAtts, bRoot) + numberOfBlocksNotRecoveredFromAtt.Inc() + } + } +} diff --git a/beacon-chain/sync/pending_attestations_queue_test.go b/beacon-chain/sync/pending_attestations_queue_test.go new file mode 100644 index 00000000000..90c5378a20a --- /dev/null +++ b/beacon-chain/sync/pending_attestations_queue_test.go @@ -0,0 +1,257 @@ +package sync + +import ( + "context" + "reflect" + "testing" + "time" + + "github.com/libp2p/go-libp2p-core/network" + ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" + "github.com/prysmaticlabs/go-bitfield" + "github.com/prysmaticlabs/go-ssz" + mock "github.com/prysmaticlabs/prysm/beacon-chain/blockchain/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/core/helpers" + dbtest "github.com/prysmaticlabs/prysm/beacon-chain/db/testing" + "github.com/prysmaticlabs/prysm/beacon-chain/operations/attestations" + "github.com/prysmaticlabs/prysm/beacon-chain/p2p/peers" + p2ptest "github.com/prysmaticlabs/prysm/beacon-chain/p2p/testing" + pb "github.com/prysmaticlabs/prysm/proto/beacon/p2p/v1" + "github.com/prysmaticlabs/prysm/shared/attestationutil" + "github.com/prysmaticlabs/prysm/shared/bls" + "github.com/prysmaticlabs/prysm/shared/params" + "github.com/prysmaticlabs/prysm/shared/testutil" + logTest "github.com/sirupsen/logrus/hooks/test" +) + +func TestProcessPendingAtts_NoBlockRequestBlock(t *testing.T) { + hook := logTest.NewGlobal() + db := dbtest.SetupDB(t) + defer dbtest.TeardownDB(t, db) + p1 := p2ptest.NewTestP2P(t) + p2 := p2ptest.NewTestP2P(t) + p1.Connect(p2) + if len(p1.Host.Network().Peers()) != 1 { + t.Error("Expected peers to be connected") + } + p1.Peers().Add(p2.PeerID(), nil, network.DirOutbound) + p1.Peers().SetConnectionState(p2.PeerID(), peers.PeerConnected) + p1.Peers().SetChainState(p2.PeerID(), &pb.Status{}) + + r := &Service{ + p2p: p1, + db: db, + chain: &mock.ChainService{}, + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + } + + a := ðpb.AggregateAttestationAndProof{Aggregate: ðpb.Attestation{Data: ðpb.AttestationData{Target: ðpb.Checkpoint{}}}} + r.blkRootToPendingAtts[[32]byte{'A'}] = []*ethpb.AggregateAttestationAndProof{a} + if err := r.processPendingAtts(context.Background()); err != nil { + t.Fatal(err) + } + + testutil.AssertLogsContain(t, hook, "Requesting block for pending attestation") +} + +func TestProcessPendingAtts_HasBlockSaveUnAggregatedAtt(t *testing.T) { + hook := logTest.NewGlobal() + db := dbtest.SetupDB(t) + defer dbtest.TeardownDB(t, db) + p1 := p2ptest.NewTestP2P(t) + + r := &Service{ + p2p: p1, + db: db, + chain: &mock.ChainService{}, + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + attPool: attestations.NewPool(), + } + + a := ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Signature: bls.RandKey().Sign([]byte("foo"), 0).Marshal(), + AggregationBits: bitfield.Bitlist{0x02}, + Data: ðpb.AttestationData{ + Target: ðpb.Checkpoint{}}}} + + b := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + r32, _ := ssz.HashTreeRoot(b.Block) + r.db.SaveBlock(context.Background(), b) + + r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{a} + if err := r.processPendingAtts(context.Background()); err != nil { + t.Fatal(err) + } + + if len(r.attPool.UnaggregatedAttestations()) != 1 { + t.Error("Did not save unaggregated att") + } + if !reflect.DeepEqual(r.attPool.UnaggregatedAttestations()[0], a.Aggregate) { + t.Error("Incorrect saved att") + } + if len(r.attPool.AggregatedAttestations()) != 0 { + t.Error("Did save aggregated att") + } + + testutil.AssertLogsContain(t, hook, "Verified and saved pending attestations to pool") +} + +func TestProcessPendingAtts_HasBlockSaveAggregatedAtt(t *testing.T) { + hook := logTest.NewGlobal() + db := dbtest.SetupDB(t) + defer dbtest.TeardownDB(t, db) + p1 := p2ptest.NewTestP2P(t) + validators := uint64(256) + beaconState, privKeys := testutil.DeterministicGenesisState(t, validators) + + sb := ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + db.SaveBlock(context.Background(), sb) + root, _ := ssz.HashTreeRoot(sb.Block) + + aggBits := bitfield.NewBitlist(3) + aggBits.SetBitAt(0, true) + aggBits.SetBitAt(1, true) + att := ðpb.Attestation{ + Data: ðpb.AttestationData{ + BeaconBlockRoot: root[:], + Source: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")}, + Target: ðpb.Checkpoint{Epoch: 0, Root: []byte("hello-world")}, + }, + AggregationBits: aggBits, + } + + committee, err := helpers.BeaconCommitteeFromState(beaconState, att.Data.Slot, att.Data.CommitteeIndex) + if err != nil { + t.Error(err) + } + attestingIndices, err := attestationutil.AttestingIndices(att.AggregationBits, committee) + if err != nil { + t.Error(err) + } + hashTreeRoot, err := ssz.HashTreeRoot(att.Data) + if err != nil { + t.Error(err) + } + domain := helpers.Domain(beaconState.Fork(), 0, params.BeaconConfig().DomainBeaconAttester) + sigs := make([]*bls.Signature, len(attestingIndices)) + for i, indice := range attestingIndices { + sig := privKeys[indice].Sign(hashTreeRoot[:], domain) + sigs[i] = sig + } + att.Signature = bls.AggregateSignatures(sigs).Marshal()[:] + + slotRoot, err := ssz.HashTreeRoot(att.Data.Slot) + if err != nil { + t.Fatal(err) + } + + sig := privKeys[154].Sign(slotRoot[:], domain) + aggregateAndProof := ðpb.AggregateAttestationAndProof{ + SelectionProof: sig.Marshal(), + Aggregate: att, + AggregatorIndex: 154, + } + + if err := beaconState.SetGenesisTime(uint64(time.Now().Unix())); err != nil { + t.Fatal(err) + } + + r := &Service{ + p2p: p1, + db: db, + chain: &mock.ChainService{Genesis: time.Now(), + State: beaconState, + FinalizedCheckPoint: ðpb.Checkpoint{ + Epoch: 0, + }}, + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + attPool: attestations.NewPool(), + } + + sb = ðpb.SignedBeaconBlock{Block: ðpb.BeaconBlock{}} + r32, _ := ssz.HashTreeRoot(sb.Block) + r.db.SaveBlock(context.Background(), sb) + + r.blkRootToPendingAtts[r32] = []*ethpb.AggregateAttestationAndProof{aggregateAndProof} + if err := r.processPendingAtts(context.Background()); err != nil { + t.Fatal(err) + } + + if len(r.attPool.AggregatedAttestations()) != 1 { + t.Error("Did not save aggregated att") + } + if !reflect.DeepEqual(r.attPool.AggregatedAttestations()[0], att) { + t.Error("Incorrect saved att") + } + if len(r.attPool.UnaggregatedAttestations()) != 0 { + t.Error("Did save unaggregated att") + } + + testutil.AssertLogsContain(t, hook, "Verified and saved pending attestations to pool") +} + +func TestValidatePendingAtts_CanPruneOldAtts(t *testing.T) { + db := dbtest.SetupDB(t) + defer dbtest.TeardownDB(t, db) + + s := &Service{ + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + } + + // 100 Attestations per block root. + r1 := [32]byte{'A'} + r2 := [32]byte{'B'} + r3 := [32]byte{'C'} + + for i := 0; i < 100; i++ { + s.savePendingAtt(ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r1[:]}}}) + s.savePendingAtt(ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r2[:]}}}) + s.savePendingAtt(ðpb.AggregateAttestationAndProof{ + Aggregate: ðpb.Attestation{ + Data: ðpb.AttestationData{Slot: uint64(i), BeaconBlockRoot: r3[:]}}}) + } + + if len(s.blkRootToPendingAtts[r1]) != 100 { + t.Error("Did not save pending atts") + } + if len(s.blkRootToPendingAtts[r2]) != 100 { + t.Error("Did not save pending atts") + } + if len(s.blkRootToPendingAtts[r3]) != 100 { + t.Error("Did not save pending atts") + } + + // Set current slot to 50, it should prune 19 attestations. (50 - 31) + s.validatePendingAtts(context.Background(), 50) + if len(s.blkRootToPendingAtts[r1]) != 81 { + t.Error("Did not delete pending atts") + } + if len(s.blkRootToPendingAtts[r2]) != 81 { + t.Error("Did not delete pending atts") + } + if len(s.blkRootToPendingAtts[r3]) != 81 { + t.Error("Did not delete pending atts") + } + + // Set current slot to 100 + slot_duration, it should prune all the attestations. + s.validatePendingAtts(context.Background(), 100+params.BeaconConfig().SlotsPerEpoch) + if len(s.blkRootToPendingAtts[r1]) != 0 { + t.Error("Did not delete pending atts") + } + if len(s.blkRootToPendingAtts[r2]) != 0 { + t.Error("Did not delete pending atts") + } + if len(s.blkRootToPendingAtts[r3]) != 0 { + t.Error("Did not delete pending atts") + } + + // Verify the keys are deleted. + if len(s.blkRootToPendingAtts) != 0 { + t.Error("Did not delete block keys") + } +} diff --git a/beacon-chain/sync/pending_blocks_queue.go b/beacon-chain/sync/pending_blocks_queue.go index 4b97a49cbff..018f2542501 100644 --- a/beacon-chain/sync/pending_blocks_queue.go +++ b/beacon-chain/sync/pending_blocks_queue.go @@ -68,7 +68,7 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { if !inPendingQueue && !inDB && hasPeer { log.WithFields(logrus.Fields{ "currentSlot": b.Block.Slot, - "parentRoot": hex.EncodeToString(b.Block.ParentRoot), + "parentRoot": hex.EncodeToString(bytesutil.Trunc(b.Block.ParentRoot)), }).Info("Requesting parent block") req := [][32]byte{bytesutil.ToBytes32(b.Block.ParentRoot)} @@ -112,10 +112,14 @@ func (r *Service) processPendingBlocks(ctx context.Context) error { delete(r.seenPendingBlocks, blkRoot) r.pendingQueueLock.Unlock() - log.Infof("Processed ancestor block with slot %d and cleared pending block cache", s) + log.WithFields(logrus.Fields{ + "slot": s, + "blockRoot": hex.EncodeToString(bytesutil.Trunc(blkRoot[:])), + }).Info("Processed pending block and cleared it in cache") span.End() } + return nil } diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index d012013f38b..ecee04e0b97 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -39,25 +39,26 @@ type blockchainService interface { blockchain.FinalizationFetcher blockchain.ForkFetcher blockchain.AttestationReceiver - blockchain.GenesisTimeFetcher + blockchain.TimeFetcher } // NewRegularSync service. func NewRegularSync(cfg *Config) *Service { ctx, cancel := context.WithCancel(context.Background()) r := &Service{ - ctx: ctx, - cancel: cancel, - db: cfg.DB, - p2p: cfg.P2P, - attPool: cfg.AttPool, - exitPool: cfg.ExitPool, - chain: cfg.Chain, - initialSync: cfg.InitialSync, - slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), - seenPendingBlocks: make(map[[32]byte]bool), - stateNotifier: cfg.StateNotifier, - blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), + ctx: ctx, + cancel: cancel, + db: cfg.DB, + p2p: cfg.P2P, + attPool: cfg.AttPool, + exitPool: cfg.ExitPool, + chain: cfg.Chain, + initialSync: cfg.InitialSync, + slotToPendingBlocks: make(map[uint64]*ethpb.SignedBeaconBlock), + seenPendingBlocks: make(map[[32]byte]bool), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), + stateNotifier: cfg.StateNotifier, + blocksRateLimiter: leakybucket.NewCollector(allowedBlocksPerSecond, allowedBlocksBurst, false /* deleteEmptyBuckets */), } r.registerRPCHandlers() @@ -69,21 +70,23 @@ func NewRegularSync(cfg *Config) *Service { // Service is responsible for handling all run time p2p related operations as the // main entry point for network messages. type Service struct { - ctx context.Context - cancel context.CancelFunc - p2p p2p.P2P - db db.NoHeadAccessDatabase - attPool attestations.Pool - exitPool *voluntaryexits.Pool - chain blockchainService - slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock - seenPendingBlocks map[[32]byte]bool - pendingQueueLock sync.RWMutex - chainStarted bool - initialSync Checker - validateBlockLock sync.RWMutex - stateNotifier statefeed.Notifier - blocksRateLimiter *leakybucket.Collector + ctx context.Context + cancel context.CancelFunc + p2p p2p.P2P + db db.NoHeadAccessDatabase + attPool attestations.Pool + exitPool *voluntaryexits.Pool + chain blockchainService + slotToPendingBlocks map[uint64]*ethpb.SignedBeaconBlock + seenPendingBlocks map[[32]byte]bool + blkRootToPendingAtts map[[32]byte][]*ethpb.AggregateAttestationAndProof + pendingAttsLock sync.RWMutex + pendingQueueLock sync.RWMutex + chainStarted bool + initialSync Checker + validateBlockLock sync.RWMutex + stateNotifier statefeed.Notifier + blocksRateLimiter *leakybucket.Collector } // Start the regular sync service. @@ -91,6 +94,7 @@ func (r *Service) Start() { r.p2p.AddConnectionHandler(r.sendRPCStatusRequest) r.p2p.AddDisconnectionHandler(r.removeDisconnectedPeerStatus) r.processPendingBlocksQueue() + r.processPendingAttsQueue() r.maintainPeerStatuses() r.resyncIfBehind() } diff --git a/beacon-chain/sync/validate_aggregate_proof.go b/beacon-chain/sync/validate_aggregate_proof.go index 19a8cca021a..fdcd326a729 100644 --- a/beacon-chain/sync/validate_aggregate_proof.go +++ b/beacon-chain/sync/validate_aggregate_proof.go @@ -49,8 +49,6 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return false } - attSlot := m.Aggregate.Data.Slot - // Verify aggregate attestation has not already been seen via aggregate gossip, within a block, or through the creation locally. seen, err := r.attPool.HasAggregatedAttestation(m.Aggregate) if err != nil { @@ -61,8 +59,25 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms return false } - // Verify the block being voted for passes validation. The block should have passed validation if it's in the DB. - if !r.db.HasBlock(ctx, bytesutil.ToBytes32(m.Aggregate.Data.BeaconBlockRoot)) { + if !r.validateAggregatedAtt(ctx, m) { + return false + } + + msg.ValidatorData = m + + return true +} + +func (r *Service) validateAggregatedAtt(ctx context.Context, a *ethpb.AggregateAttestationAndProof) bool { + ctx, span := trace.StartSpan(ctx, "sync.validateAggregatedAtt") + defer span.End() + + attSlot := a.Aggregate.Data.Slot + + // Verify the block being voted is in DB. The block should have passed validation if it's in the DB. + if !r.db.HasBlock(ctx, bytesutil.ToBytes32(a.Aggregate.Data.BeaconBlockRoot)) { + // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. + r.savePendingAtt(a) return false } @@ -90,25 +105,23 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms } // Verify validator index is within the aggregate's committee. - if err := validateIndexInCommittee(ctx, s, m.Aggregate, m.AggregatorIndex); err != nil { + if err := validateIndexInCommittee(ctx, s, a.Aggregate, a.AggregatorIndex); err != nil { traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate index in committee")) return false } // Verify selection proof reflects to the right validator and signature is valid. - if err := validateSelection(ctx, s, m.Aggregate.Data, m.AggregatorIndex, m.SelectionProof); err != nil { - traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate selection for validator %d", m.AggregatorIndex)) + if err := validateSelection(ctx, s, a.Aggregate.Data, a.AggregatorIndex, a.SelectionProof); err != nil { + traceutil.AnnotateError(span, errors.Wrapf(err, "Could not validate selection for validator %d", a.AggregatorIndex)) return false } // Verify aggregated attestation has a valid signature. - if err := blocks.VerifyAttestation(ctx, s, m.Aggregate); err != nil { + if err := blocks.VerifyAttestation(ctx, s, a.Aggregate); err != nil { traceutil.AnnotateError(span, err) return false } - msg.ValidatorData = m - return true } diff --git a/beacon-chain/sync/validate_aggregate_proof_test.go b/beacon-chain/sync/validate_aggregate_proof_test.go index 34644b8c46f..a6e40369c71 100644 --- a/beacon-chain/sync/validate_aggregate_proof_test.go +++ b/beacon-chain/sync/validate_aggregate_proof_test.go @@ -128,10 +128,11 @@ func TestValidateAggregateAndProof_NoBlock(t *testing.T) { } r := &Service{ - p2p: p, - db: db, - initialSync: &mockSync.Sync{IsSyncing: false}, - attPool: attestations.NewPool(), + p2p: p, + db: db, + initialSync: &mockSync.Sync{IsSyncing: false}, + attPool: attestations.NewPool(), + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), } buf := new(bytes.Buffer) diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation.go b/beacon-chain/sync/validate_committee_index_beacon_attestation.go index 172a5860c85..ff93fb1cbc4 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation.go @@ -2,7 +2,6 @@ package sync import ( "context" - "errors" "fmt" "reflect" "strings" @@ -19,8 +18,6 @@ import ( "go.opencensus.io/trace" ) -var errPointsToBlockNotInDatabase = errors.New("attestation points to a block which is not in the database") - // Validation // - The attestation's committee index (attestation.data.index) is for the correct subnet. // - The attestation is unaggregated -- that is, it has exactly one participating validator (len([bit for bit in attestation.aggregation_bits if bit == 0b1]) == 1). @@ -76,13 +73,10 @@ func (s *Service) validateCommitteeIndexBeaconAttestation(ctx context.Context, p return false } - // Attestation's block must exist in database (only valid blocks are stored). + // Verify the block being voted is in DB. The block should have passed validation if it's in the DB. if !s.db.HasBlock(ctx, bytesutil.ToBytes32(att.Data.BeaconBlockRoot)) { - log.WithField( - "blockRoot", - fmt.Sprintf("%#x", att.Data.BeaconBlockRoot), - ).WithError(errPointsToBlockNotInDatabase).Debug("Ignored incoming attestation that points to a block which is not in the database") - traceutil.AnnotateError(span, errPointsToBlockNotInDatabase) + // A node doesn't have the block, it'll request from peer while saving the pending attestation to a queue. + s.savePendingAtt(ð.AggregateAttestationAndProof{Aggregate: att}) return false } diff --git a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go index 8c8ac4fd1d7..b9488767804 100644 --- a/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go +++ b/beacon-chain/sync/validate_committee_index_beacon_attestation_test.go @@ -31,6 +31,7 @@ func TestService_validateCommitteeIndexBeaconAttestation(t *testing.T) { chain: &mockChain.ChainService{ Genesis: time.Now().Add(time.Duration(-64*int64(params.BeaconConfig().SecondsPerSlot)) * time.Second), // 64 slots ago }, + blkRootToPendingAtts: make(map[[32]byte][]*ethpb.AggregateAttestationAndProof), } blk := ðpb.SignedBeaconBlock{