From 07943b04ef8e266479400a6e1b93660034531e1b Mon Sep 17 00:00:00 2001 From: James Date: Tue, 5 Aug 2025 11:09:42 -0400 Subject: [PATCH] refactor: builder uses stream and requires WS --- bin/builder.rs | 8 +++++-- src/config.rs | 42 +++++++++++++++++++++++++++---------- src/tasks/env.rs | 26 ++++++++--------------- tests/block_builder_test.rs | 2 +- tests/cache.rs | 2 +- tests/env.rs | 2 +- 6 files changed, 49 insertions(+), 33 deletions(-) diff --git a/bin/builder.rs b/bin/builder.rs index 663b45ff..60593c92 100644 --- a/bin/builder.rs +++ b/bin/builder.rs @@ -21,9 +21,14 @@ async fn main() -> eyre::Result<()> { let config = BuilderConfig::from_env()?.clone(); let constants = SignetSystemConstants::pecorino(); + // We connect the WS greedily, so we can fail early if the connection is + // invalid. + let ru_provider = config.connect_ru_provider().await?; + // Spawn the EnvTask let env_task = config.env_task(); - let (block_env, env_jh) = env_task.spawn(); + let (block_env, env_jh) = + env_task.await.expect("ws validity checked in connect_ru_provider above").spawn(); // Spawn the cache system let cache_tasks = CacheTasks::new(config.clone(), block_env.clone()); @@ -32,7 +37,6 @@ async fn main() -> eyre::Result<()> { // Prep providers and contracts let (host_provider, quincey) = tokio::try_join!(config.connect_host_provider(), config.connect_quincey())?; - let ru_provider = config.connect_ru_provider(); let zenith = config.connect_zenith(host_provider.clone()); // Set up the metrics task diff --git a/src/config.rs b/src/config.rs index edccc8bf..bec9f316 100644 --- a/src/config.rs +++ b/src/config.rs @@ -12,6 +12,7 @@ use alloy::{ SimpleNonceManager, WalletFiller, }, }, + rpc::client::BuiltInConnectionString, }; use eyre::Result; use init4_bin_base::{ @@ -63,11 +64,19 @@ pub struct BuilderConfig { pub ru_chain_id: u64, /// URL for Host RPC node. - #[from_env(var = "HOST_RPC_URL", desc = "URL for Host RPC node", infallible)] + #[from_env( + var = "HOST_RPC_URL", + desc = "URL for Host RPC node. This MUST be a valid HTTP or WS URL, starting with http://, https://, ws:// or wss://", + infallible + )] pub host_rpc_url: Cow<'static, str>, /// URL for the Rollup RPC node. - #[from_env(var = "ROLLUP_RPC_URL", desc = "URL for Rollup RPC node", infallible)] + #[from_env( + var = "ROLLUP_RPC_URL", + desc = "URL for Rollup RPC node. This MUST be a valid WS url starting with ws:// or wss://. Http providers are not supported.", + infallible + )] pub ru_rpc_url: Cow<'static, str>, /// URL of the tx pool to poll for incoming transactions. @@ -176,14 +185,25 @@ impl BuilderConfig { } /// Connect to the Rollup rpc provider. - pub fn connect_ru_provider(&self) -> RootProvider { - static ONCE: std::sync::OnceLock> = std::sync::OnceLock::new(); + pub async fn connect_ru_provider(&self) -> eyre::Result> { + static ONCE: tokio::sync::OnceCell> = + tokio::sync::OnceCell::const_new(); - ONCE.get_or_init(|| { - let url = url::Url::parse(&self.ru_rpc_url).expect("failed to parse URL"); - RootProvider::new_http(url) + ONCE.get_or_try_init(|| async { + let url = url::Url::parse(&self.ru_rpc_url)?; + + let scheme = url.scheme(); + eyre::ensure!( + scheme == "ws" || scheme == "wss", + "Invalid Rollup RPC URL scheme: {scheme}. Expected ws:// or wss://" + ); + + RootProvider::connect_with(BuiltInConnectionString::Ws(url, None)) + .await + .map_err(Into::into) }) - .clone() + .await + .cloned() } /// Connect to the Host rpc provider. @@ -245,9 +265,9 @@ impl BuilderConfig { } /// Create an [`EnvTask`] using this config. - pub fn env_task(&self) -> EnvTask { - let ru_provider = self.connect_ru_provider(); - EnvTask::new(self.clone(), ru_provider) + pub async fn env_task(&self) -> eyre::Result { + let ru_provider = self.connect_ru_provider().await?; + Ok(EnvTask::new(self.clone(), ru_provider)) } /// Create a [`SignetCfgEnv`] using this config. diff --git a/src/tasks/env.rs b/src/tasks/env.rs index b1327cd0..faace76a 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -6,7 +6,6 @@ use alloy::{ providers::Provider, }; use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span}; -use std::time::Duration; use tokio::{sync::watch, task::JoinHandle}; use tokio_stream::StreamExt; use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice}; @@ -58,39 +57,32 @@ impl EnvTask { /// Returns a sender that sends [`SimEnv`] for communicating the next block environment. async fn task_fut(self, sender: watch::Sender>) { let span = info_span!("EnvTask::task_fut::init"); - let mut poller = match self.ru_provider.watch_blocks().instrument(span.clone()).await { + + let mut blocks = match self.ru_provider.subscribe_blocks().await { Ok(poller) => poller, Err(err) => { let _span = span.enter(); - error!(%err, "Failed to watch blocks"); + error!(%err, "Failed to subscribe to blocks"); return; } - }; - - poller.set_poll_interval(Duration::from_millis(250)); - - let mut blocks = poller.into_stream(); + } + .into_stream(); - while let Some(blocks) = + while let Some(block) = blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await { - let Some(block_hash) = blocks.last() else { - // This case occurs when there are no changes to the block, - // so we do nothing. - continue; - }; let span = - info_span!("EnvTask::task_fut::loop", %block_hash, number = tracing::field::Empty); + info_span!("EnvTask::task_fut::loop", %block.hash, number = tracing::field::Empty); // Get the rollup header for rollup block simulation environment configuration let rollup_header = match self - .get_latest_rollup_header(&sender, block_hash, &span) + .get_latest_rollup_header(&sender, &block.hash, &span) .await { Some(value) => value, None => { // If we failed to get the rollup header, we skip this iteration. - debug!(%block_hash, "failed to get rollup header - continuing to next block"); + debug!(%block.hash, "failed to get rollup header - continuing to next block"); continue; } }; diff --git a/tests/block_builder_test.rs b/tests/block_builder_test.rs index c3f91dc7..df21cacc 100644 --- a/tests/block_builder_test.rs +++ b/tests/block_builder_test.rs @@ -42,7 +42,7 @@ async fn test_handle_build() { // Create a rollup provider let ru_provider = RootProvider::::new_http(anvil_instance.endpoint_url()); - let block_env = config.env_task().spawn().0; + let block_env = config.env_task().await.unwrap().spawn().0; let block_builder = Simulator::new(&config, ru_provider.clone(), block_env); diff --git a/tests/cache.rs b/tests/cache.rs index f9437436..2b77ae72 100644 --- a/tests/cache.rs +++ b/tests/cache.rs @@ -12,7 +12,7 @@ async fn test_bundle_poller_roundtrip() -> eyre::Result<()> { let config = setup_test_config().unwrap(); - let (block_env, _jh) = config.env_task().spawn(); + let (block_env, _jh) = config.env_task().await.unwrap().spawn(); let cache_tasks = CacheTasks::new(config.clone(), block_env); let cache_system = cache_tasks.spawn(); diff --git a/tests/env.rs b/tests/env.rs index adebe43e..6a8e970c 100644 --- a/tests/env.rs +++ b/tests/env.rs @@ -7,7 +7,7 @@ async fn test_bundle_poller_roundtrip() { let config = setup_test_config().unwrap(); let env_task = config.env_task(); - let (mut env_watcher, _jh) = env_task.spawn(); + let (mut env_watcher, _jh) = env_task.await.unwrap().spawn(); env_watcher.changed().await.unwrap(); let env = env_watcher.borrow_and_update();