Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add BeaconConsensusEvent for live sync download requests #7230

Merged
merged 3 commits into from Mar 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
16 changes: 15 additions & 1 deletion crates/consensus/beacon/src/engine/event.rs
@@ -1,6 +1,6 @@
use crate::engine::forkchoice::ForkchoiceStatus;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{SealedBlock, SealedHeader};
use reth_primitives::{SealedBlock, SealedHeader, B256};
use std::{sync::Arc, time::Duration};

/// Events emitted by [crate::BeaconConsensusEngine].
Expand All @@ -12,6 +12,20 @@ pub enum BeaconConsensusEngineEvent {
CanonicalBlockAdded(Arc<SealedBlock>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
LiveSyncProgress(ConsensusEngineLiveSyncProgress),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock>),
}

/// Progress of the consensus engine during live sync.
#[derive(Clone, Debug)]
pub enum ConsensusEngineLiveSyncProgress {
/// The consensus engine is downloading blocks from the network.
DownloadingBlocks {
/// The number of blocks remaining to download.
remaining_blocks: u64,
/// The target block hash and number to download.
target: B256,
},
}
17 changes: 12 additions & 5 deletions crates/consensus/beacon/src/engine/mod.rs
Expand Up @@ -63,7 +63,7 @@ mod invalid_headers;
use invalid_headers::InvalidHeaderCache;

mod event;
pub use event::BeaconConsensusEngineEvent;
pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};

mod handle;
pub use handle::BeaconConsensusEngineHandle;
Expand Down Expand Up @@ -287,13 +287,15 @@ where
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let handle = BeaconConsensusEngineHandle { to_engine };
let listeners = EventListeners::default();
let sync = EngineSyncController::new(
pipeline,
client,
task_spawner.clone(),
run_pipeline_continuously,
max_block,
blockchain.chain_spec(),
listeners.clone(),
);
let mut this = Self {
sync,
Expand All @@ -304,7 +306,7 @@ where
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
listeners: EventListeners::default(),
listeners,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
pipeline_run_threshold,
Expand Down Expand Up @@ -605,6 +607,13 @@ where
self.handle.clone()
}

/// Pushes an [UnboundedSender] to the engine's listeners. Also pushes an [UnboundedSender] to
/// the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener.clone());
self.sync.push_listener(listener);
}

/// Returns true if the distance from the local tip to the block is greater than the configured
/// threshold.
///
Expand Down Expand Up @@ -1867,9 +1876,7 @@ where
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
BeaconEngineMessage::EventListener(tx) => this.push_listener(tx),
}
continue
}
Expand Down
35 changes: 33 additions & 2 deletions crates/consensus/beacon/src/engine/sync.rs
@@ -1,6 +1,9 @@
//! Sync management for the engine implementation.

use crate::{engine::metrics::EngineSyncMetrics, BeaconConsensus};
use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensus, BeaconConsensusEngineEvent,
ConsensusEngineLiveSyncProgress,
};
use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::p2p::{
Expand All @@ -11,13 +14,14 @@ use reth_interfaces::p2p::{
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::trace;

/// Manages syncing under the control of the engine.
Expand Down Expand Up @@ -45,6 +49,8 @@ where
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
Expand All @@ -70,6 +76,7 @@ where
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
listeners: EventListeners<BeaconConsensusEngineEvent>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
Expand All @@ -83,6 +90,7 @@ where
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
listeners,
max_block,
metrics: EngineSyncMetrics::default(),
}
Expand Down Expand Up @@ -119,6 +127,11 @@ where
self.run_pipeline_continuously
}

/// Pushes an [UnboundedSender] to the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener);
}

/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
Expand All @@ -145,6 +158,14 @@ where
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));

if count == 1 {
self.download_full_block(hash);
} else {
Expand Down Expand Up @@ -176,6 +197,15 @@ where
?hash,
"Start downloading full block"
);

// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));

let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);

Expand Down Expand Up @@ -525,6 +555,7 @@ mod tests {
false,
self.max_block,
chain_spec,
Default::default(),
)
}
}
Expand Down
18 changes: 17 additions & 1 deletion crates/node-core/src/events/node.rs
Expand Up @@ -2,7 +2,9 @@

use crate::events::cl::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ForkchoiceStatus};
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus,
};
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
Expand Down Expand Up @@ -211,6 +213,20 @@ impl<DB> NodeState<DB> {
self.safe_block_hash = Some(safe_block_hash);
self.finalized_block_hash = Some(finalized_block_hash);
}
BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
match live_sync_progress {
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks,
target,
} => {
info!(
remaining_blocks,
target_block_hash=?target,
"Live sync in progress, downloading blocks"
);
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,
Expand Down