Skip to content

Commit

Permalink
Handle attestations with missing block (#4705)
Browse files Browse the repository at this point in the history
* Fmt
* Starting
* Cont
* Store aggregate attestation is better
* Conflict
* Done
* Merge branch 'master' of git+ssh://github.com/prysmaticlabs/prysm into process-pending-atts
* Comment
* Better logs
* Better logs
* Fix existing tests
* Update metric names
* Preston's feedback
* Broadcast atts once it's valid
* Gazelle
* Test for validating atts and pruning
* Tests
* Removed debug log
* Conflict
* Feedback
* Merge refs/heads/master into process-pending-atts
* Merge refs/heads/master into process-pending-atts
  • Loading branch information
terencechain committed Feb 2, 2020
1 parent f432f78 commit f77049a
Show file tree
Hide file tree
Showing 16 changed files with 533 additions and 65 deletions.
5 changes: 3 additions & 2 deletions beacon-chain/blockchain/chain_info.go
Expand Up @@ -22,9 +22,10 @@ type ChainInfoFetcher interface {
FinalizationFetcher
}

// GenesisTimeFetcher retrieves the Eth2 genesis timestamp.
type GenesisTimeFetcher interface {
// TimeFetcher retrieves the Eth2 data that's related to time.
type TimeFetcher interface {
GenesisTime() time.Time
CurrentSlot() uint64
}

// HeadFetcher defines a common interface for methods in blockchain service which
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/chain_info_test.go
Expand Up @@ -18,7 +18,7 @@ import (

// Ensure Service implements chain info interface.
var _ = ChainInfoFetcher(&Service{})
var _ = GenesisTimeFetcher(&Service{})
var _ = TimeFetcher(&Service{})
var _ = ForkFetcher(&Service{})

func TestFinalizedCheckpt_Nil(t *testing.T) {
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/blockchain/process_block_helpers.go
Expand Up @@ -20,6 +20,11 @@ import (
"go.opencensus.io/trace"
)

// CurrentSlot returns the current slot based on time.
func (s *Service) CurrentSlot() uint64 {
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
}

// getBlockPreState returns the pre state of an incoming block. It uses the parent root of the block
// to retrieve the state in DB. It verifies the pre state's validity and the incoming block
// is in the correct time window.
Expand Down Expand Up @@ -199,7 +204,7 @@ func (s *Service) rmStatesOlderThanLastFinalized(ctx context.Context, startSlot
// Otherwise, delay incorporation of new justified checkpoint until next epoch boundary.
// See https://ethresear.ch/t/prevention-of-bouncing-attack-on-ffg/6114 for more detailed analysis and discussion.
func (s *Service) shouldUpdateCurrentJustified(ctx context.Context, newJustifiedCheckpt *ethpb.Checkpoint) (bool, error) {
if helpers.SlotsSinceEpochStarts(s.currentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
if helpers.SlotsSinceEpochStarts(s.CurrentSlot()) < params.BeaconConfig().SafeSlotsToUpdateJustified {
return true, nil
}
newJustifiedBlockSigned, err := s.beaconDB.Block(ctx, bytesutil.ToBytes32(newJustifiedCheckpt.Root))
Expand Down Expand Up @@ -261,11 +266,6 @@ func (s *Service) updateJustified(ctx context.Context, state *stateTrie.BeaconSt
return s.beaconDB.SaveJustifiedCheckpoint(ctx, cpt)
}

// currentSlot returns the current slot based on time.
func (s *Service) currentSlot() uint64 {
return uint64(time.Now().Unix()-s.genesisTime.Unix()) / params.BeaconConfig().SecondsPerSlot
}

// This saves every finalized state in DB during initial sync, needed as part of optimization to
// use cache state during initial sync in case of restart.
func (s *Service) saveInitState(ctx context.Context, state *stateTrie.BeaconState) error {
Expand Down
5 changes: 5 additions & 0 deletions beacon-chain/blockchain/testing/mock.go
Expand Up @@ -193,6 +193,11 @@ func (ms *ChainService) GenesisTime() time.Time {
return ms.Genesis
}

// CurrentSlot mocks the same method in the chain service.
func (ms *ChainService) CurrentSlot() uint64 {
return 0
}

// Participation mocks the same method in the chain service.
func (ms *ChainService) Participation(epoch uint64) *precompute.Balance {
return ms.Balance
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/rpc/node/server.go
Expand Up @@ -26,7 +26,7 @@ type Server struct {
Server *grpc.Server
BeaconDB db.ReadOnlyDatabase
PeersFetcher p2p.PeersProvider
GenesisTimeFetcher blockchain.GenesisTimeFetcher
GenesisTimeFetcher blockchain.TimeFetcher
}

// GetSyncStatus checks the current network sync status of the node.
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/rpc/service.go
Expand Up @@ -58,7 +58,7 @@ type Service struct {
forkFetcher blockchain.ForkFetcher
finalizationFetcher blockchain.FinalizationFetcher
participationFetcher blockchain.ParticipationFetcher
genesisTimeFetcher blockchain.GenesisTimeFetcher
genesisTimeFetcher blockchain.TimeFetcher
attestationReceiver blockchain.AttestationReceiver
blockReceiver blockchain.BlockReceiver
powChainService powchain.Chain
Expand Down Expand Up @@ -104,7 +104,7 @@ type Config struct {
BlockReceiver blockchain.BlockReceiver
POWChainService powchain.Chain
ChainStartFetcher powchain.ChainStartFetcher
GenesisTimeFetcher blockchain.GenesisTimeFetcher
GenesisTimeFetcher blockchain.TimeFetcher
MockEth1Votes bool
AttestationsPool attestations.Pool
ExitPool *voluntaryexits.Pool
Expand Down
2 changes: 2 additions & 0 deletions beacon-chain/sync/BUILD.bazel
Expand Up @@ -9,6 +9,7 @@ go_library(
"error.go",
"log.go",
"metrics.go",
"pending_attestations_queue.go",
"pending_blocks_queue.go",
"rpc.go",
"rpc_beacon_blocks_by_range.go",
Expand Down Expand Up @@ -79,6 +80,7 @@ go_test(
size = "small",
srcs = [
"error_test.go",
"pending_attestations_queue_test.go",
"pending_blocks_queue_test.go",
"rpc_beacon_blocks_by_range_test.go",
"rpc_beacon_blocks_by_root_test.go",
Expand Down
24 changes: 24 additions & 0 deletions beacon-chain/sync/metrics.go
Expand Up @@ -33,4 +33,28 @@ var (
Help: "Count the number of times a node resyncs.",
},
)
numberOfBlocksRecoveredFromAtt = promauto.NewCounter(
prometheus.CounterOpts{
Name: "beacon_blocks_recovered_from_attestation_total",
Help: "Count the number of times a missing block recovered from attestation vote.",
},
)
numberOfBlocksNotRecoveredFromAtt = promauto.NewCounter(
prometheus.CounterOpts{
Name: "beacon_blocks_not_recovered_from_attestation_total",
Help: "Count the number of times a missing block not recovered and pruned from attestation vote.",
},
)
numberOfAttsRecovered = promauto.NewCounter(
prometheus.CounterOpts{
Name: "beacon_attestations_recovered_total",
Help: "Count the number of times attestation recovered because of missing block",
},
)
numberOfAttsNotRecovered = promauto.NewCounter(
prometheus.CounterOpts{
Name: "beacon_attestations_not_recovered_total",
Help: "Count the number of times attestation not recovered and pruned because of missing block",
},
)
)
162 changes: 162 additions & 0 deletions beacon-chain/sync/pending_attestations_queue.go
@@ -0,0 +1,162 @@
package sync

import (
"context"
"encoding/hex"
"time"

ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
"github.com/prysmaticlabs/prysm/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/shared/bls"
"github.com/prysmaticlabs/prysm/shared/bytesutil"
"github.com/prysmaticlabs/prysm/shared/params"
"github.com/prysmaticlabs/prysm/shared/runutil"
"github.com/prysmaticlabs/prysm/shared/traceutil"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
"golang.org/x/exp/rand"
)

// This defines how often a node cleans up and processes pending attestations in the queue.
var processPendingAttsPeriod = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second

// This processes pending attestation queues on every `processPendingAttsPeriod`.
func (s *Service) processPendingAttsQueue() {
ctx := context.Background()
runutil.RunEvery(s.ctx, processPendingAttsPeriod, func() {
s.processPendingAtts(ctx)
})
}

// This defines how pending attestations are processed. It contains features:
// 1. Clean up invalid pending attestations from the queue.
// 2. Check if pending attestations can be processed when the block has arrived.
// 3. Request block from a random peer if unable to proceed step 2.
func (s *Service) processPendingAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "processPendingAtts")
defer span.End()

pids := s.p2p.Peers().Connected()

// Before a node processes pending attestations queue, it verifies
// the attestations in the queue are still valid. Attestations will
// be deleted from the queue if invalid (ie. getting staled from falling too many slots behind).
s.validatePendingAtts(ctx, s.chain.CurrentSlot())

for bRoot, attestations := range s.blkRootToPendingAtts {
// Has the pending attestation's missing block arrived yet?
if s.db.HasBlock(ctx, bRoot) {
numberOfBlocksRecoveredFromAtt.Inc()
for _, att := range attestations {
// The pending attestations can arrive in both aggregated and unaggregated forms,
// each from has distinct validation steps.
if helpers.IsAggregated(att.Aggregate) {
// Save the pending aggregated attestation to the pool if it passes the aggregated
// validation steps.
if s.validateAggregatedAtt(ctx, att) {
if err := s.attPool.SaveAggregatedAttestation(att.Aggregate); err != nil {
return err
}
numberOfAttsRecovered.Inc()

// Broadcasting the attestation again once a node is able to process it.
if err := s.p2p.Broadcast(ctx, att); err != nil {
log.WithError(err).Error("Failed to broadcast")
}
}
} else {
// Save the pending unaggregated attestation to the pool if the BLS signature is
// valid.
if _, err := bls.SignatureFromBytes(att.Aggregate.Signature); err != nil {
continue
}
if err := s.attPool.SaveUnaggregatedAttestation(att.Aggregate); err != nil {
return err
}
numberOfAttsRecovered.Inc()

// Broadcasting the attestation again once a node is able to process it.
if err := s.p2p.Broadcast(ctx, att); err != nil {
log.WithError(err).Error("Failed to broadcast")
}
}
}
log.WithFields(logrus.Fields{
"blockRoot": hex.EncodeToString(bytesutil.Trunc(bRoot[:])),
"pendingAttsCount": len(attestations),
}).Info("Verified and saved pending attestations to pool")

// Delete the missing block root key from pending attestation queue so a node will not request for the block again.
delete(s.blkRootToPendingAtts, bRoot)
} else {
// Pending attestation's missing block has not arrived yet.
log.WithField("blockRoot", hex.EncodeToString(bytesutil.Trunc(bRoot[:]))).Info("Requesting block for pending attestation")

// Start with a random peer to query, but choose the first peer in our unsorted list that claims to
// have a head slot newer or equal to the pending attestation's target boundary slot.
pid := pids[rand.Int()%len(pids)]
targetSlot := helpers.SlotToEpoch(attestations[0].Aggregate.Data.Target.Epoch)
for _, p := range pids {
if cs, _ := s.p2p.Peers().ChainState(p); cs != nil && cs.HeadSlot >= targetSlot {
pid = p
break
}
}

req := [][32]byte{bRoot}
if err := s.sendRecentBeaconBlocksRequest(ctx, req, pid); err != nil {
traceutil.AnnotateError(span, err)
log.Errorf("Could not send recent block request: %v", err)
}
}
}
return nil
}

// This defines how pending attestations is saved in the map. The key is the
// root of the missing block. The value is the list of pending attestations
// that voted for that block root.
func (s *Service) savePendingAtt(att *ethpb.AggregateAttestationAndProof) {
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()

root := bytesutil.ToBytes32(att.Aggregate.Data.BeaconBlockRoot)

_, ok := s.blkRootToPendingAtts[root]
if !ok {
s.blkRootToPendingAtts[root] = []*ethpb.AggregateAttestationAndProof{att}
return
}

s.blkRootToPendingAtts[root] = append(s.blkRootToPendingAtts[root], att)
}

// This validates the pending attestations in the queue are still valid.
// If not valid, a node will remove it in the queue in place. The validity
// check specifies the pending attestation could not fall one epoch behind
// of the current slot.
func (s *Service) validatePendingAtts(ctx context.Context, slot uint64) {
s.pendingAttsLock.Lock()
defer s.pendingAttsLock.Unlock()

ctx, span := trace.StartSpan(ctx, "validatePendingAtts")
defer span.End()

for bRoot, atts := range s.blkRootToPendingAtts {
for i := len(atts) - 1; i >= 0; i-- {
if slot >= atts[i].Aggregate.Data.Slot+params.BeaconConfig().SlotsPerEpoch {
// Remove the pending attestation from the list in place.
atts = append(atts[:i], atts[i+1:]...)
numberOfAttsNotRecovered.Inc()
}
}
s.blkRootToPendingAtts[bRoot] = atts

// If the pending attestations list of a given block root is empty,
// a node will remove the key from the map to avoid dangling keys.
if len(s.blkRootToPendingAtts[bRoot]) == 0 {
delete(s.blkRootToPendingAtts, bRoot)
numberOfBlocksNotRecoveredFromAtt.Inc()
}
}
}

0 comments on commit f77049a

Please sign in to comment.