Skip to content

Commit

Permalink
Merge branch 'lower-max-miss-slots-epoch' of github.com:prysmaticlabs…
Browse files Browse the repository at this point in the history
…/prysm into lower-max-miss-slots-epoch
  • Loading branch information
terencechain committed Mar 5, 2023
2 parents c28aaaa + 7fb0a8c commit a8e8fb8
Show file tree
Hide file tree
Showing 19 changed files with 291 additions and 163 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/chain_info.go
Expand Up @@ -33,7 +33,7 @@ type ChainInfoFetcher interface {
// HeadUpdater defines a common interface for methods in blockchain service
// which allow to update the head info
type HeadUpdater interface {
UpdateHead(context.Context) error
UpdateHead(context.Context, primitives.Slot)
}

// TimeFetcher retrieves the Ethereum consensus data that's related to time.
Expand Down
54 changes: 43 additions & 11 deletions beacon-chain/blockchain/forkchoice_update_execution.go
Expand Up @@ -5,11 +5,13 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
)

func (s *Service) isNewProposer() bool {
_, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(s.CurrentSlot()+1, [32]byte{} /* root */)
func (s *Service) isNewProposer(slot primitives.Slot) bool {
_, _, ok := s.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(slot, [32]byte{} /* root */)
return ok
}

Expand Down Expand Up @@ -40,28 +42,58 @@ func (s *Service) getStateAndBlock(ctx context.Context, r [32]byte) (state.Beaco
return headState, newHeadBlock, nil
}

func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte) error {
func (s *Service) forkchoiceUpdateWithExecution(ctx context.Context, newHeadRoot [32]byte, proposingSlot primitives.Slot) error {
isNewHead := s.isNewHead(newHeadRoot)
if !isNewHead && !s.isNewProposer() {
isNewProposer := s.isNewProposer(proposingSlot)
if !isNewHead && !isNewProposer {
return nil
}

headState, headBlock, err := s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
var headState state.BeaconState
var headBlock interfaces.ReadOnlySignedBeaconBlock
var headRoot [32]byte
var err error

shouldUpdate := isNewHead
if isNewHead && isNewProposer && !features.Get().DisableReorgLateBlocks {
if proposingSlot == s.CurrentSlot() {
proposerHead := s.ForkChoicer().GetProposerHead()
if proposerHead != newHeadRoot {
shouldUpdate = false
}
} else if s.ForkChoicer().ShouldOverrideFCU() {
shouldUpdate = false
}
}
if shouldUpdate {
headRoot = newHeadRoot
headState, headBlock, err = s.getStateAndBlock(ctx, newHeadRoot)
if err != nil {
log.WithError(err).Error("Could not get forkchoice update argument")
return nil
}
} else {
// We are guaranteed that the head block is the parent
// of the incoming block. We do not process the slot
// because it will be processed anyway in notifyForkchoiceUpdate
headState = s.headState(ctx)
headRoot = s.headRoot()
headBlock, err = s.headBlock()
if err != nil {
return errors.Wrap(err, "could not get head block")
}
}

_, err = s.notifyForkchoiceUpdate(ctx, &notifyForkchoiceUpdateArg{
headState: headState,
headRoot: newHeadRoot,
headRoot: headRoot,
headBlock: headBlock.Block(),
})
if err != nil {
return err
return errors.Wrap(err, "could not notify forkchoice update")
}

if isNewHead {
if shouldUpdate {
if err := s.saveHead(ctx, newHeadRoot, headBlock, headState); err != nil {
log.WithError(err).Error("could not save head")
}
Expand Down
18 changes: 10 additions & 8 deletions beacon-chain/blockchain/forkchoice_update_execution_test.go
Expand Up @@ -21,10 +21,10 @@ import (
func TestService_isNewProposer(t *testing.T) {
beaconDB := testDB.SetupDB(t)
service := setupBeaconChain(t, beaconDB)
require.Equal(t, false, service.isNewProposer())
require.Equal(t, false, service.isNewProposer(service.CurrentSlot()+1))

service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */)
require.Equal(t, true, service.isNewProposer())
require.Equal(t, true, service.isNewProposer(service.CurrentSlot()+1))
}

func TestService_isNewHead(t *testing.T) {
Expand Down Expand Up @@ -75,15 +75,15 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
service, err := NewService(ctx, opts...)
require.NoError(t, err)
service.cfg.ProposerSlotIndexCache = cache.NewProposerPayloadIDsCache()
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, service.headRoot()))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, service.headRoot(), service.CurrentSlot()+1))
hookErr := "could not notify forkchoice update"
invalidStateErr := "could not get state summary: could not find block in DB"
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
gb, err := blocks.NewSignedBeaconBlock(util.NewBeaconBlock())
require.NoError(t, err)
require.NoError(t, service.saveInitSyncBlock(ctx, [32]byte{'a'}, gb))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{'a'}, service.CurrentSlot()+1))
require.LogsContain(t, hook, invalidStateErr)

hook.Reset()
Expand All @@ -107,7 +107,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2})
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()))
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)

Expand All @@ -124,7 +124,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {
state: st,
}
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(2, 1, [8]byte{1}, [32]byte{2})
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r1, service.CurrentSlot()+1))
require.LogsDoNotContain(t, hook, invalidStateErr)
require.LogsDoNotContain(t, hook, hookErr)
vId, payloadID, has := service.cfg.ProposerSlotIndexCache.GetProposerPayloadIDs(2, [32]byte{2})
Expand All @@ -134,7 +134,7 @@ func TestService_forkchoiceUpdateWithExecution_exceptionalCases(t *testing.T) {

// Test zero headRoot returns immediately.
headRoot := service.headRoot()
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{}))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, [32]byte{}, service.CurrentSlot()+1))
require.Equal(t, service.headRoot(), headRoot)
}

Expand Down Expand Up @@ -184,7 +184,9 @@ func TestService_forkchoiceUpdateWithExecution_SameHeadRootNewProposer(t *testin

// Set head to be the same but proposing next slot
service.head.root = r
service.head.block = sb
service.head.state = st
service.cfg.ProposerSlotIndexCache.SetProposerAndPayloadIDs(service.CurrentSlot()+1, 0, [8]byte{}, [32]byte{} /* root */)
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r))
require.NoError(t, service.forkchoiceUpdateWithExecution(ctx, r, service.CurrentSlot()+1))

}
9 changes: 4 additions & 5 deletions beacon-chain/blockchain/process_attestation.go
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/pkg/errors"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/attestation"
Expand Down Expand Up @@ -37,7 +36,7 @@ import (
//
// # Update latest messages for attesting indices
// update_latest_messages(store, indexed_attestation.attesting_indices, attestation)
func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error {
func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation, disparity time.Duration) error {
ctx, span := trace.StartSpan(ctx, "blockChain.onAttestation")
defer span.End()

Expand All @@ -63,7 +62,7 @@ func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error
genesisTime := uint64(s.genesisTime.Unix())

// Verify attestation target is from current epoch or previous epoch.
if err := verifyAttTargetEpoch(ctx, genesisTime, uint64(time.Now().Unix()), tgt); err != nil {
if err := verifyAttTargetEpoch(ctx, genesisTime, uint64(time.Now().Add(disparity).Unix()), tgt); err != nil {
return err
}

Expand All @@ -72,11 +71,11 @@ func (s *Service) OnAttestation(ctx context.Context, a *ethpb.Attestation) error
return errors.Wrap(err, "could not verify attestation beacon block")
}

// Note that LMG GHOST and FFG consistency check is ignored because it was performed in sync's validation pipeline:
// Note that LMD GHOST and FFG consistency check is ignored because it was performed in sync's validation pipeline:
// validate_aggregate_proof.go and validate_beacon_attestation.go

// Verify attestations can only affect the fork choice of subsequent slots.
if err := slots.VerifyTime(genesisTime, a.Data.Slot+1, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
if err := slots.VerifyTime(genesisTime, a.Data.Slot+1, disparity); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/blockchain/process_attestation_test.go
Expand Up @@ -117,7 +117,7 @@ func TestStore_OnAttestation_ErrorConditions(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := service.OnAttestation(ctx, tt.a)
err := service.OnAttestation(ctx, tt.a, 0)
if tt.wantedErr != "" {
assert.ErrorContains(t, tt.wantedErr, err)
} else {
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestStore_OnAttestation_Ok_DoublyLinkedTree(t *testing.T) {
state, blkRoot, err := prepareForkchoiceState(ctx, 0, tRoot, tRoot, params.BeaconConfig().ZeroHash, ojc, ofc)
require.NoError(t, err)
require.NoError(t, service.cfg.ForkChoiceStore.InsertNode(ctx, state, blkRoot))
require.NoError(t, service.OnAttestation(ctx, att[0]))
require.NoError(t, service.OnAttestation(ctx, att[0], 0))
}

func TestStore_SaveCheckpointState(t *testing.T) {
Expand Down
23 changes: 7 additions & 16 deletions beacon-chain/blockchain/process_block.go
Expand Up @@ -214,7 +214,7 @@ func (s *Service) onBlock(ctx context.Context, signed interfaces.ReadOnlySignedB
}
newBlockHeadElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

if err := s.forkchoiceUpdateWithExecution(ctx, headRoot); err != nil {
if err := s.forkchoiceUpdateWithExecution(ctx, headRoot, s.CurrentSlot()+1); err != nil {
return err
}

Expand Down Expand Up @@ -667,32 +667,23 @@ func (s *Service) fillMissingPayloadIDRoutine(ctx context.Context, stateFeed *ev
break
}

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
attThreshold := params.BeaconConfig().SecondsPerSlot / 3
ticker := slots.NewSlotTickerWithOffset(s.genesisTime, time.Duration(attThreshold)*time.Second, params.BeaconConfig().SecondsPerSlot)
for {
select {
case ti := <-ticker.C:
if err := s.fillMissingBlockPayloadId(ctx, ti); err != nil {
case <-ticker.C():
if err := s.fillMissingBlockPayloadId(ctx); err != nil {
log.WithError(err).Error("Could not fill missing payload ID")
}
case <-s.ctx.Done():
case <-ctx.Done():
log.Debug("Context closed, exiting routine")
return
}
}
}()
}

// Returns true if time `t` is halfway through the slot in sec.
func atHalfSlot(t time.Time) bool {
s := params.BeaconConfig().SecondsPerSlot
return uint64(t.Second())%s == s/2
}

func (s *Service) fillMissingBlockPayloadId(ctx context.Context, ti time.Time) error {
if !atHalfSlot(ti) {
return nil
}
func (s *Service) fillMissingBlockPayloadId(ctx context.Context) error {
s.ForkChoicer().RLock()
highestReceivedSlot := s.cfg.ForkChoiceStore.HighestReceivedBlockSlot()
s.ForkChoicer().RUnlock()
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/blockchain/process_block_test.go
Expand Up @@ -2164,7 +2164,7 @@ func TestFillMissingBlockPayloadId_DiffSlotExitEarly(t *testing.T) {

service, err := NewService(ctx, opts...)
require.NoError(t, err)
require.NoError(t, service.fillMissingBlockPayloadId(ctx, time.Unix(int64(params.BeaconConfig().SecondsPerSlot/2), 0)))
require.NoError(t, service.fillMissingBlockPayloadId(ctx), 0)
}

// Helper function to simulate the block being on time or delayed for proposer
Expand Down
49 changes: 32 additions & 17 deletions beacon-chain/blockchain/receive_attestation.go
Expand Up @@ -11,14 +11,20 @@ import (
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/feed"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/features"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
"github.com/prysmaticlabs/prysm/v3/encoding/bytesutil"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/time/slots"
"github.com/sirupsen/logrus"
"go.opencensus.io/trace"
)

// reorgLateBlockCountAttestations is the time until the end of the slot in which we count
// attestations to see if we will reorg the incoming block
const reorgLateBlockCountAttestations = 2 * time.Second

// AttestationStateFetcher allows for retrieving a beacon state corresponding to the block
// root of an attestation's target checkpoint.
type AttestationStateFetcher interface {
Expand Down Expand Up @@ -88,30 +94,40 @@ func (s *Service) spawnProcessAttestationsRoutine(stateFeed *event.Feed) {
}

st := slots.NewSlotTicker(s.genesisTime, params.BeaconConfig().SecondsPerSlot)
pat := slots.NewSlotTickerWithOffset(s.genesisTime, -reorgLateBlockCountAttestations, params.BeaconConfig().SecondsPerSlot)
for {
select {
case <-s.ctx.Done():
return
case <-pat.C():
s.ForkChoicer().Lock()
s.UpdateHead(s.ctx, s.CurrentSlot()+1)
s.ForkChoicer().Unlock()
case <-st.C():
s.ForkChoicer().Lock()
if err := s.ForkChoicer().NewSlot(s.ctx, s.CurrentSlot()); err != nil {
log.WithError(err).Error("Could not process new slot")
log.WithError(err).Error("could not process new slot")
}

if err := s.UpdateHead(s.ctx); err != nil {
log.WithError(err).Error("Could not process attestations and update head")
}
s.UpdateHead(s.ctx, s.CurrentSlot())
s.ForkChoicer().Unlock()
}
}
}()
}

// UpdateHead updates the canonical head of the chain based on information from fork-choice attestations and votes.
// It requires no external inputs.
func (s *Service) UpdateHead(ctx context.Context) error {
// The caller of this function MUST hold a lock in forkchoice
func (s *Service) UpdateHead(ctx context.Context, proposingSlot primitives.Slot) {
start := time.Now()
s.ForkChoicer().Lock()
defer s.ForkChoicer().Unlock()
s.processAttestations(ctx)

// This function is only called at 10 seconds or 0 seconds into the slot
disparity := params.BeaconNetworkConfig().MaximumGossipClockDisparity
if !features.Get().DisableReorgLateBlocks {
disparity += reorgLateBlockCountAttestations
}
s.processAttestations(ctx, disparity)

processAttsElapsedTime.Observe(float64(time.Since(start).Milliseconds()))

start = time.Now()
Expand All @@ -129,21 +145,20 @@ func (s *Service) UpdateHead(ctx context.Context) error {
}).Debug("Head changed due to attestations")
}
s.headLock.RUnlock()
if err := s.forkchoiceUpdateWithExecution(ctx, newHeadRoot); err != nil {
return err
if err := s.forkchoiceUpdateWithExecution(s.ctx, newHeadRoot, proposingSlot); err != nil {
log.WithError(err).Error("could not update forkchoice")
}
return nil
}

// This processes fork choice attestations from the pool to account for validator votes and fork choice.
func (s *Service) processAttestations(ctx context.Context) {
func (s *Service) processAttestations(ctx context.Context, disparity time.Duration) {
atts := s.cfg.AttPool.ForkchoiceAttestations()
for _, a := range atts {
// Based on the spec, don't process the attestation until the subsequent slot.
// This delays consideration in the fork choice until their slot is in the past.
// https://github.com/ethereum/consensus-specs/blob/dev/specs/phase0/fork-choice.md#validate_on_attestation
nextSlot := a.Data.Slot + 1
if err := slots.VerifyTime(uint64(s.genesisTime.Unix()), nextSlot, params.BeaconNetworkConfig().MaximumGossipClockDisparity); err != nil {
if err := slots.VerifyTime(uint64(s.genesisTime.Unix()), nextSlot, disparity); err != nil {
continue
}

Expand All @@ -161,7 +176,7 @@ func (s *Service) processAttestations(ctx context.Context) {
continue
}

if err := s.receiveAttestationNoPubsub(ctx, a); err != nil {
if err := s.receiveAttestationNoPubsub(ctx, a, disparity); err != nil {
log.WithFields(logrus.Fields{
"slot": a.Data.Slot,
"committeeIndex": a.Data.CommitteeIndex,
Expand All @@ -178,11 +193,11 @@ func (s *Service) processAttestations(ctx context.Context) {
// 1. Validate attestation, update validator's latest vote
// 2. Apply fork choice to the processed attestation
// 3. Save latest head info
func (s *Service) receiveAttestationNoPubsub(ctx context.Context, att *ethpb.Attestation) error {
func (s *Service) receiveAttestationNoPubsub(ctx context.Context, att *ethpb.Attestation, disparity time.Duration) error {
ctx, span := trace.StartSpan(ctx, "beacon-chain.blockchain.receiveAttestationNoPubsub")
defer span.End()

if err := s.OnAttestation(ctx, att); err != nil {
if err := s.OnAttestation(ctx, att, disparity); err != nil {
return errors.Wrap(err, "could not process attestation")
}

Expand Down

0 comments on commit a8e8fb8

Please sign in to comment.