Skip to content

Commit

Permalink
continue validator duties if chain does not progress for a long time (#…
Browse files Browse the repository at this point in the history
…6101)

Nimbus currently stops performing validator duties if the blockchain
does not progress for `node.config.syncHorizon` slots. This means that
the chain won't recover because no new blocks are proposed. To fix that,
continue performing validator duties if no progress is registered for a
long time, and none of our peers is indicating any progress.
  • Loading branch information
etan-status committed Mar 20, 2024
1 parent 8b604b5 commit 035ca01
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 29 deletions.
30 changes: 25 additions & 5 deletions beacon_chain/consensus_object_pools/block_clearance.nim
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ proc addResolvedHeadBlock(
epochRef = dag.getEpochRef(state, cache)
epochRefTick = Moment.now()

dag.resetChainProgressWatchdog()
debug "Block resolved",
blockRoot = shortLog(blockRoot),
blck = shortLog(trustedBlock.message),
Expand Down Expand Up @@ -134,7 +135,8 @@ proc checkStateTransition(
else:
ok()

proc advanceClearanceState*(dag: ChainDAGRef) =
proc advanceClearanceState*(
dag: ChainDAGRef, wallSlot: Slot, chainIsDegraded: bool) =
# When the chain is synced, the most likely block to be produced is the block
# right after head - we can exploit this assumption and advance the state
# to that slot before the block arrives, thus allowing us to do the expensive
Expand All @@ -143,8 +145,18 @@ proc advanceClearanceState*(dag: ChainDAGRef) =
# first be seen - later, this state will be copied to the head state!
let advanced = withState(dag.clearanceState):
forkyState.data.slot > forkyState.data.latest_block_header.slot
if not advanced:
let next = getStateField(dag.clearanceState, slot) + 1
if not advanced or chainIsDegraded:
let
clearanceSlot = getStateField(dag.clearanceState, slot)
next =
if not chainIsDegraded:
clearanceSlot + 1
else:
# The chain seems to have halted.
# Advance one epoch at a time to avoid long lag spikes
# so that new blocks may be produced once more
let maxSlot = max(clearanceSlot, wallSlot)
min((clearanceSlot.epoch + 1).start_slot, maxSlot)

let startTick = Moment.now()
var
Expand All @@ -153,8 +165,16 @@ proc advanceClearanceState*(dag: ChainDAGRef) =

dag.advanceSlots(dag.clearanceState, next, true, cache, info)

debug "Prepared clearance state for next block",
next, updateStateDur = Moment.now() - startTick
logScope:
oldSlot = clearanceSlot
newSlot = next
wallSlot
updateStateDur = Moment.now() - startTick
if not chainIsDegraded:
debug "Prepared clearance state for next block"
else:
let activeBalance = withEpochInfo(info): info.balances.current_epoch
info "Prepared clearance state for next block", activeBalance

proc checkHeadBlock*(
dag: ChainDAGRef, signedBlock: ForkySignedBeaconBlock):
Expand Down
3 changes: 3 additions & 0 deletions beacon_chain/consensus_object_pools/block_pools_types.nim
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ type

cfg*: RuntimeConfig

lastChainProgress*: Moment
## Indicates the last wall time at which meaningful progress was made

shufflingRefs*: LRUCache[16, ShufflingRef]

epochRefs*: LRUCache[32, EpochRef]
Expand Down
10 changes: 9 additions & 1 deletion beacon_chain/consensus_object_pools/blockchain_dag.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import
std/[algorithm, sequtils, tables, sets],
stew/[arrayops, assign2, byteutils],
metrics, results, snappy, chronicles,
chronos, metrics, results, snappy, chronicles,
../spec/[beaconstate, eth2_merkleization, eth2_ssz_serialization, helpers,
state_transition, validator],
../spec/forks,
Expand Down Expand Up @@ -1004,6 +1004,13 @@ proc applyBlock(

ok()

proc resetChainProgressWatchdog*(dag: ChainDAGRef) =
dag.lastChainProgress = Moment.now()

proc chainIsProgressing*(dag: ChainDAGRef): bool =
const watchdogDuration = chronos.minutes(60)
dag.lastChainProgress + watchdogDuration >= Moment.now()

proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
validatorMonitor: ref ValidatorMonitor, updateFlags: UpdateFlags,
eraPath = ".",
Expand Down Expand Up @@ -1044,6 +1051,7 @@ proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
# allow skipping some validation.
updateFlags: updateFlags * {strictVerification},
cfg: cfg,
lastChainProgress: Moment.now(),

vanityLogs: vanityLogs,

Expand Down
6 changes: 4 additions & 2 deletions beacon_chain/consensus_object_pools/consensus_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,10 @@ func setOptimisticHead*(
bid: BlockId, execution_block_hash: Eth2Digest) =
self.optimisticHead = (bid: bid, execution_block_hash: execution_block_hash)

proc updateExecutionClientHead(self: ref ConsensusManager,
newHead: BeaconHead): Future[Opt[void]] {.async: (raises: [CancelledError]).} =
proc updateExecutionClientHead*(
self: ref ConsensusManager,
newHead: BeaconHead
): Future[Opt[void]] {.async: (raises: [CancelledError]).} =
let headExecutionBlockHash =
self.dag.loadExecutionBlockHash(newHead.blck).valueOr:
# `BlockRef` are only created for blocks that have passed
Expand Down
12 changes: 10 additions & 2 deletions beacon_chain/nimbus_beacon_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1270,7 +1270,8 @@ proc updateGossipStatus(node: BeaconNode, slot: Slot) {.async.} =
if slot > head.slot: (slot - head.slot).uint64
else: 0'u64
isBehind =
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER
headDistance > TOPIC_SUBSCRIBE_THRESHOLD_SLOTS + HYSTERESIS_BUFFER and
node.syncStatus(head) == ChainSyncStatus.Syncing
targetGossipState =
getTargetGossipState(
slot.epoch,
Expand Down Expand Up @@ -1542,7 +1543,14 @@ proc onSlotEnd(node: BeaconNode, slot: Slot) {.async.} =
# probability of being prepared for the block that will arrive and the
# epoch processing that follows
await sleepAsync(advanceCutoff.offset)
node.dag.advanceClearanceState()
node.dag.advanceClearanceState(slot,
chainIsDegraded = (node.syncStatus(head) == ChainSyncStatus.Degraded))

# If the chain has halted, we have to ensure that the EL gets synced
# so that we can perform validator duties again
if not node.dag.head.executionValid and not node.dag.chainIsProgressing():
let beaconHead = node.attestationPool[].getBeaconHead(head)
discard await node.consensusManager.updateExecutionClientHead(beaconHead)

# Prepare action tracker for the next slot
node.consensusManager[].actionTracker.updateSlot(slot + 1)
Expand Down
75 changes: 56 additions & 19 deletions beacon_chain/validators/beacon_validators.nim
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import
keystore_management, slashing_protection, validator_duties, validator_pool],
".."/spec/mev/rest_deneb_mev_calls

from std/sequtils import mapIt
from std/sequtils import countIt, mapIt
from eth/async_utils import awaitWithTimeout

# Metrics for tracking attestation and beacon block loss
Expand Down Expand Up @@ -227,29 +227,66 @@ proc getGraffitiBytes*(
getGraffiti(node.config.validatorsDir, node.config.defaultGraffitiBytes(),
validator.pubkey)

proc isSynced*(node: BeaconNode, head: BlockRef): bool =
## TODO This function is here as a placeholder for some better heurestics to
## determine if we're in sync and should be producing blocks and
## attestations. Generally, the problem is that slot time keeps advancing
## even when there are no blocks being produced, so there's no way to
## distinguish validators geniunely going missing from the node not being
## well connected (during a network split or an internet outage for
## example). It would generally be correct to simply keep running as if
## we were the only legit node left alive, but then we run into issues:
## with enough many empty slots, the validator pool is emptied leading
## to empty committees and lots of empty slot processing that will be
## thrown away as soon as we're synced again.

type ChainSyncStatus* {.pure.} = enum
Syncing,
Synced,
Degraded

proc syncStatus*(node: BeaconNode, head: BlockRef): ChainSyncStatus =
## Generally, the problem is that slot time keeps advancing
## even when there are no blocks being produced, so there's no way to
## distinguish validators geniunely going missing from the node not being
## well connected (during a network split or an internet outage for
## example). It would generally be correct to simply keep running as if
## we were the only legit node left alive, but then we run into issues:
## with enough many empty slots, the validator pool is emptied leading
## to empty committees and lots of empty slot processing that will be
## thrown away as soon as we're synced again.
let
# The slot we should be at, according to the clock
beaconTime = node.beaconClock.now()
wallSlot = beaconTime.toSlot()

# TODO if everyone follows this logic, the network will not recover from a
# halt: nobody will be producing blocks because everone expects someone
# else to do it
not wallSlot.afterGenesis or
head.slot + node.config.syncHorizon >= wallSlot.slot
if not wallSlot.afterGenesis or
head.slot + node.config.syncHorizon >= wallSlot.slot:
node.dag.resetChainProgressWatchdog()
return ChainSyncStatus.Synced

if node.dag.chainIsProgressing():
# Chain is progressing, we are out of sync
return ChainSyncStatus.Syncing

let numPeers = len(node.network.peers)
if numPeers <= node.config.maxPeers div 4:
# We may have poor connectivity, wait until more peers are available
warn "Chain appears to have stalled, but have low peers",
numPeers, maxPeers = node.config.maxPeers
node.dag.resetChainProgressWatchdog()
return ChainSyncStatus.Syncing

let numPeersWithHigherProgress = node.network.peerPool.peers
.countIt(it != nil and it.getHeadSlot() > head.slot)
if numPeersWithHigherProgress > node.config.maxPeers div 8:
# A peer indicates that they are on a later slot, wait for sync manager
# to progress, or for it to kick the peer if they are faking the status
warn "Chain appears to have stalled, but peers indicate higher progress",
numPeersWithHigherProgress, numPeers, maxPeers = node.config.maxPeers
node.dag.resetChainProgressWatchdog()
return ChainSyncStatus.Syncing

# We are on the latest slot among all of our peers, and there has been no
# chain progress for an extended period of time.
let clearanceSlot = getStateField(node.dag.clearanceState, slot)
if clearanceSlot + node.config.syncHorizon < wallSlot.slot:
# If we were to propose a block now, we would incur a large lag spike
# that makes our block be way too late to be gossiped
return ChainSyncStatus.Degraded

# It is reasonable safe to assume that the network has halted, resume duties
ChainSyncStatus.Synced

proc isSynced*(node: BeaconNode, head: BlockRef): bool =
node.syncStatus(head) == ChainSyncStatus.Synced

proc handleLightClientUpdates*(node: BeaconNode, slot: Slot)
{.async: (raises: [CancelledError]).} =
Expand Down

0 comments on commit 035ca01

Please sign in to comment.