Skip to content

Commit

Permalink
feat: add BeaconConsensusEvent for live sync download requests (#7230)
Browse files Browse the repository at this point in the history
Co-authored-by: Alexey Shekhirin <a.shekhirin@gmail.com>
  • Loading branch information
Rjected and shekhirin committed Mar 19, 2024
1 parent d86d4d2 commit ce89a2b
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 9 deletions.
16 changes: 15 additions & 1 deletion crates/consensus/beacon/src/engine/event.rs
@@ -1,6 +1,6 @@
use crate::engine::forkchoice::ForkchoiceStatus;
use reth_interfaces::consensus::ForkchoiceState;
use reth_primitives::{SealedBlock, SealedHeader};
use reth_primitives::{SealedBlock, SealedHeader, B256};
use std::{sync::Arc, time::Duration};

/// Events emitted by [crate::BeaconConsensusEngine].
Expand All @@ -12,6 +12,20 @@ pub enum BeaconConsensusEngineEvent {
CanonicalBlockAdded(Arc<SealedBlock>, Duration),
/// A canonical chain was committed, and the elapsed time committing the data
CanonicalChainCommitted(Box<SealedHeader>, Duration),
/// The consensus engine is involved in live sync, and has specific progress
LiveSyncProgress(ConsensusEngineLiveSyncProgress),
/// A block was added to the fork chain.
ForkBlockAdded(Arc<SealedBlock>),
}

/// Progress of the consensus engine during live sync.
#[derive(Clone, Debug)]
pub enum ConsensusEngineLiveSyncProgress {
/// The consensus engine is downloading blocks from the network.
DownloadingBlocks {
/// The number of blocks remaining to download.
remaining_blocks: u64,
/// The target block hash and number to download.
target: B256,
},
}
17 changes: 12 additions & 5 deletions crates/consensus/beacon/src/engine/mod.rs
Expand Up @@ -63,7 +63,7 @@ mod invalid_headers;
use invalid_headers::InvalidHeaderCache;

mod event;
pub use event::BeaconConsensusEngineEvent;
pub use event::{BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress};

mod handle;
pub use handle::BeaconConsensusEngineHandle;
Expand Down Expand Up @@ -287,13 +287,15 @@ where
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let handle = BeaconConsensusEngineHandle { to_engine };
let listeners = EventListeners::default();
let sync = EngineSyncController::new(
pipeline,
client,
task_spawner.clone(),
run_pipeline_continuously,
max_block,
blockchain.chain_spec(),
listeners.clone(),
);
let mut this = Self {
sync,
Expand All @@ -304,7 +306,7 @@ where
handle: handle.clone(),
forkchoice_state_tracker: Default::default(),
payload_builder,
listeners: EventListeners::default(),
listeners,
invalid_headers: InvalidHeaderCache::new(MAX_INVALID_HEADERS),
metrics: EngineMetrics::default(),
pipeline_run_threshold,
Expand Down Expand Up @@ -605,6 +607,13 @@ where
self.handle.clone()
}

/// Pushes an [UnboundedSender] to the engine's listeners. Also pushes an [UnboundedSender] to
/// the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener.clone());
self.sync.push_listener(listener);
}

/// Returns true if the distance from the local tip to the block is greater than the configured
/// threshold.
///
Expand Down Expand Up @@ -1867,9 +1876,7 @@ where
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => {
this.listeners.push_listener(tx);
}
BeaconEngineMessage::EventListener(tx) => this.push_listener(tx),
}
continue
}
Expand Down
35 changes: 33 additions & 2 deletions crates/consensus/beacon/src/engine/sync.rs
@@ -1,6 +1,9 @@
//! Sync management for the engine implementation.

use crate::{engine::metrics::EngineSyncMetrics, BeaconConsensus};
use crate::{
engine::metrics::EngineSyncMetrics, BeaconConsensus, BeaconConsensusEngineEvent,
ConsensusEngineLiveSyncProgress,
};
use futures::FutureExt;
use reth_db::database::Database;
use reth_interfaces::p2p::{
Expand All @@ -11,13 +14,14 @@ use reth_interfaces::p2p::{
use reth_primitives::{BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::oneshot;
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tracing::trace;

/// Manages syncing under the control of the engine.
Expand Down Expand Up @@ -45,6 +49,8 @@ where
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
Expand All @@ -70,6 +76,7 @@ where
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
listeners: EventListeners<BeaconConsensusEngineEvent>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
Expand All @@ -83,6 +90,7 @@ where
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
listeners,
max_block,
metrics: EngineSyncMetrics::default(),
}
Expand Down Expand Up @@ -119,6 +127,11 @@ where
self.run_pipeline_continuously
}

/// Pushes an [UnboundedSender] to the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener);
}

/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
Expand All @@ -145,6 +158,14 @@ where
/// If the `count` is 1, this will use the `download_full_block` method instead, because it
/// downloads headers and bodies for the block concurrently.
pub(crate) fn download_block_range(&mut self, hash: B256, count: u64) {
// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
},
));

if count == 1 {
self.download_full_block(hash);
} else {
Expand Down Expand Up @@ -176,6 +197,15 @@ where
?hash,
"Start downloading full block"
);

// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
},
));

let request = self.full_block_client.get_full_block(hash);
self.inflight_full_block_requests.push(request);

Expand Down Expand Up @@ -525,6 +555,7 @@ mod tests {
false,
self.max_block,
chain_spec,
Default::default(),
)
}
}
Expand Down
18 changes: 17 additions & 1 deletion crates/node-core/src/events/node.rs
Expand Up @@ -2,7 +2,9 @@

use crate::events::cl::ConsensusLayerHealthEvent;
use futures::Stream;
use reth_beacon_consensus::{BeaconConsensusEngineEvent, ForkchoiceStatus};
use reth_beacon_consensus::{
BeaconConsensusEngineEvent, ConsensusEngineLiveSyncProgress, ForkchoiceStatus,
};
use reth_db::{database::Database, database_metrics::DatabaseMetadata};
use reth_interfaces::consensus::ForkchoiceState;
use reth_network::{NetworkEvent, NetworkHandle};
Expand Down Expand Up @@ -233,6 +235,20 @@ impl<DB> NodeState<DB> {
self.safe_block_hash = Some(safe_block_hash);
self.finalized_block_hash = Some(finalized_block_hash);
}
BeaconConsensusEngineEvent::LiveSyncProgress(live_sync_progress) => {
match live_sync_progress {
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks,
target,
} => {
info!(
remaining_blocks,
target_block_hash=?target,
"Live sync in progress, downloading blocks"
);
}
}
}
BeaconConsensusEngineEvent::CanonicalBlockAdded(block, elapsed) => {
info!(
number=block.number,
Expand Down

0 comments on commit ce89a2b

Please sign in to comment.