Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use broadcast channel for event listeners #8193

Merged
merged 44 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
e8be997
feat: use broadcast channel for event listeners
fgimenez May 9, 2024
b62e3a3
move event_listeners from manager to network
fgimenez May 10, 2024
6623c1b
updated transactions
fgimenez May 10, 2024
99c35a4
update pruner
fgimenez May 10, 2024
e389289
EventNotifier moved to tokio-util
fgimenez May 10, 2024
3ec087d
configurable broadcast channel size and default
fgimenez May 10, 2024
a49bd2b
updated consensus
fgimenez May 11, 2024
4802a90
updated builder
fgimenez May 11, 2024
ac3b701
updated commands
fgimenez May 11, 2024
7f940cc
updated tests
fgimenez May 12, 2024
eb11e55
clippy
fgimenez May 13, 2024
1974131
fix tests
fgimenez May 13, 2024
6d92fe6
removed uneeded methods
fgimenez May 13, 2024
5a4793d
EventListeners::notify doesn't return result
fgimenez May 13, 2024
64a9754
merge EventNotifier and EventListener
fgimenez May 13, 2024
dd9b14b
remove uneeded subscriber count
fgimenez May 13, 2024
8cb1626
engine handle receives listeners
fgimenez May 14, 2024
49ce4d3
feat: use broadcast channel for event listeners
fgimenez May 9, 2024
31f9edb
move event_listeners from manager to network
fgimenez May 10, 2024
20876fb
update pruner
fgimenez May 10, 2024
73c188c
EventNotifier moved to tokio-util
fgimenez May 10, 2024
6df1cae
updated consensus
fgimenez May 11, 2024
cb8b69c
fix tests
fgimenez May 13, 2024
ecc2383
EventListeners::notify doesn't return result
fgimenez May 13, 2024
2339d22
merge EventNotifier and EventListener
fgimenez May 13, 2024
3e8527c
remove uneeded subscriber count
fgimenez May 13, 2024
3e4ba41
engine handle receives listeners
fgimenez May 14, 2024
a53f942
Apply suggestions from code review
fgimenez May 20, 2024
e86ec97
clippy
fgimenez May 20, 2024
cfdc856
use EventListeners in network manager
fgimenez May 20, 2024
891d73c
remove unused method
fgimenez May 21, 2024
ebafb68
derive Clone
fgimenez May 21, 2024
1a4d803
log network event receiving error
fgimenez May 21, 2024
d52e0eb
eprintln -> error
fgimenez May 21, 2024
5dd690a
panic -> error
fgimenez May 21, 2024
23c2d6e
do not handle broadcast send Ok result with 0 listeners
fgimenez May 21, 2024
64b00dc
Update crates/tokio-util/src/event_listeners.rs
fgimenez May 21, 2024
064c90f
fmt
fgimenez May 21, 2024
c400887
bump default broadcast channel size to 2000
fgimenez May 21, 2024
e30e0cf
add EventStream
fgimenez May 22, 2024
40180b8
use EventStream in EventListeners
fgimenez May 22, 2024
8efc1a7
BroadcastStream -> EventStream
fgimenez May 22, 2024
7bb9960
remove NetworkHandleMessage::EventListener variant and set network ha…
fgimenez May 22, 2024
37f24d5
EventListeners -> EventSender
fgimenez May 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 6 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion bin/reth/src/commands/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ where

let max_block = file_client.max_block().unwrap_or(0);

let mut pipeline = Pipeline::builder()
let pipeline = Pipeline::builder()
.with_tip_sender(tip_tx)
// we want to sync all blocks the file client provides or 0 if empty
.with_max_block(max_block)
Expand Down
1 change: 1 addition & 0 deletions crates/consensus/auto-seal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ reth-engine-primitives.workspace = true
reth-consensus.workspace = true
reth-rpc-types.workspace = true
reth-network-types.workspace = true
reth-tokio-util.workspace = true

# async
futures-util.workspace = true
Expand Down
8 changes: 4 additions & 4 deletions crates/consensus/auto-seal/src/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use reth_primitives::{
use reth_provider::{CanonChainTracker, CanonStateNotificationSender, Chain, StateProviderFactory};
use reth_rpc_types::engine::ForkchoiceState;
use reth_stages_api::PipelineEvent;
use reth_tokio_util::EventStream;
use reth_transaction_pool::{TransactionPool, ValidPoolTransaction};
use std::{
collections::VecDeque,
Expand All @@ -18,7 +19,6 @@ use std::{
task::{Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use tracing::{debug, error, warn};

/// A Future that listens for new ready transactions and puts new blocks into storage
Expand All @@ -30,7 +30,7 @@ pub struct MiningTask<Client, Pool: TransactionPool, Executor, Engine: EngineTyp
/// The active miner
miner: MiningMode,
/// Single active future that inserts a new block into `storage`
insert_task: Option<BoxFuture<'static, Option<UnboundedReceiverStream<PipelineEvent>>>>,
insert_task: Option<BoxFuture<'static, Option<EventStream<PipelineEvent>>>>,
/// Shared storage to insert new blocks
storage: Storage,
/// Pool where transactions are stored
Expand All @@ -42,7 +42,7 @@ pub struct MiningTask<Client, Pool: TransactionPool, Executor, Engine: EngineTyp
/// Used to notify consumers of new blocks
canon_state_notification: CanonStateNotificationSender,
/// The pipeline events to listen on
pipe_line_events: Option<UnboundedReceiverStream<PipelineEvent>>,
pipe_line_events: Option<EventStream<PipelineEvent>>,
/// The type used for block execution
block_executor: Executor,
}
Expand Down Expand Up @@ -80,7 +80,7 @@ impl<Executor, Client, Pool: TransactionPool, Engine: EngineTypes>
}

/// Sets the pipeline events to listen on.
pub fn set_pipeline_events(&mut self, events: UnboundedReceiverStream<PipelineEvent>) {
pub fn set_pipeline_events(&mut self, events: EventStream<PipelineEvent>) {
self.pipe_line_events = Some(events);
}
}
Expand Down
29 changes: 11 additions & 18 deletions crates/consensus/beacon/src/engine/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,20 @@ use reth_interfaces::RethResult;
use reth_rpc_types::engine::{
CancunPayloadFields, ExecutionPayload, ForkchoiceState, ForkchoiceUpdated, PayloadStatus,
};
use tokio::sync::{mpsc, mpsc::UnboundedSender, oneshot};
use tokio_stream::wrappers::UnboundedReceiverStream;
use reth_tokio_util::{EventSender, EventStream};
use tokio::sync::{mpsc::UnboundedSender, oneshot};

/// A _shareable_ beacon consensus frontend type. Used to interact with the spawned beacon consensus
/// engine task.
///
/// See also `BeaconConsensusEngine`
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
pub(crate) to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
}

impl<Engine> Clone for BeaconConsensusEngineHandle<Engine>
where
Engine: EngineTypes,
{
fn clone(&self) -> Self {
Self { to_engine: self.to_engine.clone() }
}
event_sender: EventSender<BeaconConsensusEngineEvent>,
}

// === impl BeaconConsensusEngineHandle ===
Expand All @@ -41,8 +33,11 @@ where
Engine: EngineTypes,
{
/// Creates a new beacon consensus engine handle.
pub fn new(to_engine: UnboundedSender<BeaconEngineMessage<Engine>>) -> Self {
Self { to_engine }
pub fn new(
to_engine: UnboundedSender<BeaconEngineMessage<Engine>>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self { to_engine, event_sender }
}

/// Sends a new payload message to the beacon consensus engine and waits for a response.
Expand Down Expand Up @@ -97,9 +92,7 @@ where
}

/// Creates a new [`BeaconConsensusEngineEvent`] listener stream.
pub fn event_listener(&self) -> UnboundedReceiverStream<BeaconConsensusEngineEvent> {
let (tx, rx) = mpsc::unbounded_channel();
let _ = self.to_engine.send(BeaconEngineMessage::EventListener(tx));
UnboundedReceiverStream::new(rx)
pub fn event_listener(&self) -> EventStream<BeaconConsensusEngineEvent> {
self.event_sender.new_listener()
}
}
3 changes: 1 addition & 2 deletions crates/consensus/beacon/src/engine/hooks/static_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ impl<DB: Database + 'static> StaticFileHook<DB> {
return Ok(None)
};

let Some(mut locked_static_file_producer) = static_file_producer.try_lock_arc()
else {
let Some(locked_static_file_producer) = static_file_producer.try_lock_arc() else {
trace!(target: "consensus::engine::hooks::static_file", "StaticFileProducer lock is already taken");
return Ok(None)
};
Expand Down
9 changes: 2 additions & 7 deletions crates/consensus/beacon/src/engine/message.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use crate::{
engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus},
BeaconConsensusEngineEvent,
};
use crate::engine::{error::BeaconOnNewPayloadError, forkchoice::ForkchoiceStatus};
use futures::{future::Either, FutureExt};
use reth_engine_primitives::EngineTypes;
use reth_interfaces::RethResult;
Expand All @@ -15,7 +12,7 @@ use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::oneshot;

/// Represents the outcome of forkchoice update.
///
Expand Down Expand Up @@ -162,6 +159,4 @@ pub enum BeaconEngineMessage<Engine: EngineTypes> {
},
/// Message with exchanged transition configuration.
TransitionConfigurationExchanged,
/// Add a new listener for [`BeaconEngineMessage`].
EventListener(UnboundedSender<BeaconConsensusEngineEvent>),
}
30 changes: 11 additions & 19 deletions crates/consensus/beacon/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use reth_rpc_types::engine::{
};
use reth_stages_api::{ControlFlow, Pipeline};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use reth_tokio_util::EventSender;
use std::{
pin::Pin,
sync::Arc,
Expand Down Expand Up @@ -202,8 +202,8 @@ where
/// be used to download and execute the missing blocks.
pipeline_run_threshold: u64,
hooks: EngineHooksController,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent>,
/// Consensus engine metrics.
metrics: EngineMetrics,
}
Expand Down Expand Up @@ -282,16 +282,16 @@ where
engine_message_stream: BoxStream<'static, BeaconEngineMessage<EngineT>>,
hooks: EngineHooks,
) -> RethResult<(Self, BeaconConsensusEngineHandle<EngineT>)> {
let handle = BeaconConsensusEngineHandle { to_engine };
let listeners = EventListeners::default();
let event_sender = EventSender::default();
let handle = BeaconConsensusEngineHandle::new(to_engine, event_sender.clone());
let sync = EngineSyncController::new(
pipeline,
client,
task_spawner.clone(),
run_pipeline_continuously,
max_block,
blockchain.chain_spec(),
listeners.clone(),
event_sender.clone(),
);
let mut this = Self {
sync,
Expand All @@ -306,7 +306,7 @@ where
blockchain_tree_action: None,
pipeline_run_threshold,
hooks: EngineHooksController::new(hooks),
listeners,
event_sender,
metrics: EngineMetrics::default(),
};

Expand Down Expand Up @@ -406,7 +406,7 @@ where
if should_update_head {
let head = outcome.header();
let _ = self.update_head(head.clone());
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
Box::new(head.clone()),
elapsed,
));
Expand Down Expand Up @@ -543,7 +543,7 @@ where
}

// notify listeners about new processed FCU
self.listeners.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
self.event_sender.notify(BeaconConsensusEngineEvent::ForkchoiceUpdated(state, status));
}

/// Check if the pipeline is consistent (all stages have the checkpoint block numbers no less
Expand Down Expand Up @@ -597,13 +597,6 @@ where
self.handle.clone()
}

/// Pushes an [UnboundedSender] to the engine's listeners. Also pushes an [UnboundedSender] to
/// the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener.clone());
self.sync.push_listener(listener);
}

/// Returns true if the distance from the local tip to the block is greater than the configured
/// threshold.
///
Expand Down Expand Up @@ -1255,7 +1248,7 @@ where
} else {
BeaconConsensusEngineEvent::ForkBlockAdded(block)
};
self.listeners.notify(event);
self.event_sender.notify(event);
PayloadStatusEnum::Valid
}
InsertPayloadOk::AlreadySeen(BlockStatus::Valid(_)) => {
Expand Down Expand Up @@ -1429,7 +1422,7 @@ where
match make_canonical_result {
Ok(outcome) => {
if let CanonicalOutcome::Committed { head } = &outcome {
self.listeners.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
self.event_sender.notify(BeaconConsensusEngineEvent::CanonicalChainCommitted(
Box::new(head.clone()),
elapsed,
));
Expand Down Expand Up @@ -1878,7 +1871,6 @@ where
BeaconEngineMessage::TransitionConfigurationExchanged => {
this.blockchain.on_transition_configuration_exchanged();
}
BeaconEngineMessage::EventListener(tx) => this.push_listener(tx),
}
continue
}
Expand Down
21 changes: 8 additions & 13 deletions crates/consensus/beacon/src/engine/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use reth_interfaces::p2p::{
use reth_primitives::{stage::PipelineTarget, BlockNumber, ChainSpec, SealedBlock, B256};
use reth_stages_api::{ControlFlow, Pipeline, PipelineError, PipelineWithResult};
use reth_tasks::TaskSpawner;
use reth_tokio_util::EventListeners;
use reth_tokio_util::EventSender;
use std::{
cmp::{Ordering, Reverse},
collections::{binary_heap::PeekMut, BinaryHeap},
sync::Arc,
task::{ready, Context, Poll},
};
use tokio::sync::{mpsc::UnboundedSender, oneshot};
use tokio::sync::oneshot;
use tracing::trace;

/// Manages syncing under the control of the engine.
Expand Down Expand Up @@ -49,8 +49,8 @@ where
inflight_full_block_requests: Vec<FetchFullBlockFuture<Client>>,
/// In-flight full block _range_ requests in progress.
inflight_block_range_requests: Vec<FetchFullBlockRangeFuture<Client>>,
/// Listeners for engine events.
listeners: EventListeners<BeaconConsensusEngineEvent>,
/// Sender for engine events.
event_sender: EventSender<BeaconConsensusEngineEvent>,
/// Buffered blocks from downloads - this is a min-heap of blocks, using the block number for
/// ordering. This means the blocks will be popped from the heap with ascending block numbers.
range_buffered_blocks: BinaryHeap<Reverse<OrderedSealedBlock>>,
Expand All @@ -76,7 +76,7 @@ where
run_pipeline_continuously: bool,
max_block: Option<BlockNumber>,
chain_spec: Arc<ChainSpec>,
listeners: EventListeners<BeaconConsensusEngineEvent>,
event_sender: EventSender<BeaconConsensusEngineEvent>,
) -> Self {
Self {
full_block_client: FullBlockClient::new(
Expand All @@ -90,7 +90,7 @@ where
inflight_block_range_requests: Vec::new(),
range_buffered_blocks: BinaryHeap::new(),
run_pipeline_continuously,
listeners,
event_sender,
max_block,
metrics: EngineSyncMetrics::default(),
}
Expand Down Expand Up @@ -127,11 +127,6 @@ where
self.run_pipeline_continuously
}

/// Pushes an [UnboundedSender] to the sync controller's listeners.
pub(crate) fn push_listener(&mut self, listener: UnboundedSender<BeaconConsensusEngineEvent>) {
self.listeners.push_listener(listener);
}

/// Returns `true` if a pipeline target is queued and will be triggered on the next `poll`.
#[allow(dead_code)]
pub(crate) fn is_pipeline_sync_pending(&self) -> bool {
Expand Down Expand Up @@ -169,7 +164,7 @@ where
);

// notify listeners that we're downloading a block range
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: count,
target: hash,
Expand Down Expand Up @@ -198,7 +193,7 @@ where
);

// notify listeners that we're downloading a block
self.listeners.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
self.event_sender.notify(BeaconConsensusEngineEvent::LiveSyncProgress(
ConsensusEngineLiveSyncProgress::DownloadingBlocks {
remaining_blocks: 1,
target: hash,
Expand Down