Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 171 additions & 11 deletions crates/blockchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,56 @@ pub const MAX_ATTESTATIONS_DATA: usize = 16;
///
/// See: leanSpec PR #682.
pub const GOSSIP_DISPARITY_INTERVALS: u64 = 1;
/// Local head lag beyond which the node is considered to be syncing.
///
/// See: leanSpec PR #708.
const SYNC_LAG_THRESHOLD: u64 = 4;
/// Freshest-known block lag beyond which the network is considered stalled.
///
/// During a network-wide stall the node remains synced so validators can help
/// the chain recover.
const NETWORK_STALL_THRESHOLD: u64 = 8;
/// Recovery band that prevents the sync status from flapping near the threshold.
const SYNC_HYSTERESIS_BAND: u64 = 2;

#[derive(Default)]
struct SyncStatusTracker {
syncing: bool,
}

impl SyncStatusTracker {
fn update(
&mut self,
current_slot: u64,
head_slot: u64,
max_seen_slot: u64,
) -> metrics::SyncStatus {
let head_lag = current_slot.saturating_sub(head_slot);
let network_lag = current_slot.saturating_sub(max_seen_slot);

if network_lag > NETWORK_STALL_THRESHOLD {
self.syncing = false;
} else if self.syncing {
self.syncing = head_lag > SYNC_LAG_THRESHOLD.saturating_sub(SYNC_HYSTERESIS_BAND);
Comment thread
dicethedev marked this conversation as resolved.
} else {
self.syncing = head_lag > SYNC_LAG_THRESHOLD;
}

if self.syncing {
metrics::SyncStatus::Syncing
} else {
metrics::SyncStatus::Synced
}
}

fn duties_allowed(&self) -> bool {
!self.syncing
}

fn gate_proposer(&self, proposer: Option<u64>) -> Option<u64> {
proposer.filter(|_| self.duties_allowed())
}
}

/// Milliseconds until the next interval boundary, measured relative to genesis.
fn ms_until_next_interval(now_ms: u64, genesis_time_ms: u64) -> u64 {
Expand Down Expand Up @@ -100,6 +150,7 @@ impl BlockChain {
last_tick_instant: None,
attestation_committee_count,
pre_merge_coverage: None,
sync_status: SyncStatusTracker::default(),
}
.start();
let time_until_genesis = (SystemTime::UNIX_EPOCH + Duration::from_secs(genesis_time))
Expand Down Expand Up @@ -164,6 +215,9 @@ pub struct BlockChainServer {
/// single-threaded message loop, so no synchronization is needed.
/// Observability-only.
pre_merge_coverage: Option<coverage::CoverageSnapshot>,

/// Stateful sync heuristic used by `lean_node_sync_status`.
sync_status: SyncStatusTracker,
}

impl BlockChainServer {
Expand All @@ -190,6 +244,7 @@ impl BlockChainServer {

// Update current slot metric
metrics::update_current_slot(slot);
self.update_sync_status(slot);

// Snapshot the aggregator flag once per tick so all read sites within
// the tick see a consistent value even if the admin API toggles it
Expand All @@ -201,9 +256,16 @@ impl BlockChainServer {
// At interval 0, check if we will propose (but don't build the block yet).
// Tick forkchoice first to accept attestations, then build the block
// using the freshly-accepted attestations.
let proposer_validator_id = (interval == 0 && slot > 0)
let scheduled_proposer = (interval == 0 && slot > 0)
.then(|| self.get_our_proposer(slot))
.flatten();
let proposer_validator_id = self.sync_status.gate_proposer(scheduled_proposer);

if let Some(validator_id) = scheduled_proposer
&& proposer_validator_id.is_none()
{
info!(%slot, %validator_id, "Skipping block proposal while syncing");
}

// Snapshot the pre-merge `new_payloads` set at the end-of-slot promote
// (interval 4), so the post-block report for this round sees its
Expand Down Expand Up @@ -255,7 +317,11 @@ impl BlockChainServer {
slot - 1,
);
}
self.produce_attestations(slot, is_aggregator);
if self.sync_status.duties_allowed() {
self.produce_attestations(slot, is_aggregator);
} else if !self.key_manager.validator_ids().is_empty() {
info!(%slot, "Skipping attestations while syncing");
}
}

// Update safe target slot metric (updated by store.on_tick at interval 3)
Expand Down Expand Up @@ -451,15 +517,6 @@ impl BlockChainServer {
metrics::update_latest_finalized_slot(self.store.latest_finalized().slot);
metrics::update_validators_count(self.key_manager.validator_ids().len() as u64);

// Update sync status based on head slot vs wall clock slot
let current_slot = self.store.time() / INTERVALS_PER_SLOT;
let status = if head_slot >= current_slot {
metrics::SyncStatus::Synced
} else {
metrics::SyncStatus::Syncing
};
metrics::set_node_sync_status(status);

for table in ALL_TABLES {
metrics::update_table_bytes(table.name(), self.store.estimate_table_bytes(table));
}
Expand Down Expand Up @@ -677,6 +734,15 @@ impl BlockChainServer {
let _ = store::on_gossip_aggregated_attestation(&mut self.store, attestation)
.inspect_err(|err| warn!(%err, "Failed to process gossiped aggregated attestation"));
}

fn update_sync_status(&mut self, current_slot: u64) {
let head_slot = self.store.head_slot();
let max_seen_slot = self.store.max_live_chain_slot().unwrap_or(head_slot);
let status = self
.sync_status
.update(current_slot, head_slot, max_seen_slot);
metrics::set_node_sync_status(status);
}
}

// Protocol trait for internal messages only (tick scheduling).
Expand Down Expand Up @@ -826,3 +892,97 @@ impl Handler<AggregationDeadline> for BlockChainServer {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn sync_status_allows_lag_through_threshold() {
let mut tracker = SyncStatusTracker::default();

for lag in 0..=SYNC_LAG_THRESHOLD {
assert_eq!(
tracker.update(10 + lag, 10, 10 + lag),
metrics::SyncStatus::Synced
);
}

let first_syncing_slot = 10 + SYNC_LAG_THRESHOLD + 1;
assert_eq!(
tracker.update(first_syncing_slot, 10, first_syncing_slot),
metrics::SyncStatus::Syncing
);
}

#[test]
fn sync_status_detects_local_lag_when_fresh_blocks_are_known() {
Comment thread
dicethedev marked this conversation as resolved.
let mut tracker = SyncStatusTracker::default();
let current_slot = 10 + SYNC_LAG_THRESHOLD + 1;

assert_eq!(
tracker.update(current_slot, 10, current_slot),
metrics::SyncStatus::Syncing
);
}

#[test]
fn sync_status_treats_stale_known_blocks_as_network_stall() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(100, 0, 0), metrics::SyncStatus::Synced);
}

#[test]
fn sync_status_hysteresis_prevents_flapping() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 11, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 10, 15), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(15, 13, 15), metrics::SyncStatus::Synced);
}

#[test]
fn network_stall_reopens_sync_status() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(20, 0, 20), metrics::SyncStatus::Syncing);
assert_eq!(tracker.update(30, 0, 20), metrics::SyncStatus::Synced);
}

#[test]
fn future_head_saturates_lag_at_zero() {
let mut tracker = SyncStatusTracker::default();

assert_eq!(tracker.update(15, 20, 20), metrics::SyncStatus::Synced);
}

#[test]
fn syncing_gates_proposals_and_attestations() {
let mut tracker = SyncStatusTracker::default();
tracker.update(20, 0, 20);

assert!(!tracker.duties_allowed());
assert_eq!(tracker.gate_proposer(Some(3)), None);
}

#[test]
fn caught_up_node_allows_proposals_and_attestations() {
let mut tracker = SyncStatusTracker::default();
tracker.update(20, 0, 20);
tracker.update(20, 18, 20);

assert!(tracker.duties_allowed());
assert_eq!(tracker.gate_proposer(Some(3)), Some(3));
}

#[test]
fn network_stall_keeps_proposals_and_attestations_enabled() {
let mut tracker = SyncStatusTracker::default();
tracker.update(100, 0, 0);

assert!(tracker.duties_allowed());
assert_eq!(tracker.gate_proposer(Some(3)), Some(3));
}
}
1 change: 1 addition & 0 deletions crates/blockchain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ static LEAN_BLOCK_PROPOSAL_AGGREGATES_SELECTED: std::sync::LazyLock<Histogram> =
// --- Sync Status ---

/// Node synchronization status.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SyncStatus {
Idle,
Syncing,
Expand Down
10 changes: 10 additions & 0 deletions crates/storage/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,16 @@ impl Store {
.collect()
}

/// Return the highest slot in the live chain.
pub fn max_live_chain_slot(&self) -> Option<u64> {
let view = self.backend.begin_read().expect("read view");
view.prefix_iterator(Table::LiveChain, &[])
.expect("iterator")
.filter_map(Result::ok)
.map(|(key, _)| decode_live_chain_key(&key).0)
.max()
}

/// Get all known block roots as HashSet.
///
/// Useful for checking block existence without deserializing.
Expand Down