diff --git a/src/tasks/env.rs b/src/tasks/env.rs index faace76a..45c8a3be 100644 --- a/src/tasks/env.rs +++ b/src/tasks/env.rs @@ -5,7 +5,7 @@ use alloy::{ primitives::{B256, U256}, providers::Provider, }; -use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span}; +use init4_bin_base::deps::tracing::{Instrument, debug, error, info_span}; use tokio::{sync::watch, task::JoinHandle}; use tokio_stream::StreamExt; use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice}; @@ -58,35 +58,25 @@ impl EnvTask { async fn task_fut(self, sender: watch::Sender>) { let span = info_span!("EnvTask::task_fut::init"); - let mut blocks = match self.ru_provider.subscribe_blocks().await { + let mut headers = match self.ru_provider.subscribe_blocks().await { Ok(poller) => poller, Err(err) => { - let _span = span.enter(); - error!(%err, "Failed to subscribe to blocks"); + span.in_scope(|| { + error!(%err, "Failed to subscribe to blocks"); + }); return; } } .into_stream(); - while let Some(block) = - blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await + drop(span); + + while let Some(rollup_header) = + headers.next().instrument(info_span!("EnvTask::task_fut::stream")).await { let span = - info_span!("EnvTask::task_fut::loop", %block.hash, number = tracing::field::Empty); + info_span!("EnvTask::task_fut::loop", %rollup_header.hash, %rollup_header.number); - // Get the rollup header for rollup block simulation environment configuration - let rollup_header = match self - .get_latest_rollup_header(&sender, &block.hash, &span) - .await - { - Some(value) => value, - None => { - // If we failed to get the rollup header, we skip this iteration. - debug!(%block.hash, "failed to get rollup header - continuing to next block"); - continue; - } - }; - debug!(rollup_header.number, "pulled rollup block for simulation"); span.record("rollup_block_number", rollup_header.number); // Construct the block env using the previous block header @@ -98,7 +88,7 @@ impl EnvTask { ); if sender - .send(Some(SimEnv { block_env: signet_env, prev_header: rollup_header })) + .send(Some(SimEnv { block_env: signet_env, prev_header: rollup_header.inner })) .is_err() { // The receiver has been dropped, so we can stop the task. @@ -108,40 +98,6 @@ impl EnvTask { } } - /// Get latest rollup [`Header`] for the given block hash. - async fn get_latest_rollup_header( - &self, - sender: &watch::Sender>, - block: &alloy::primitives::FixedBytes<32>, - span: &tracing::Span, - ) -> Option
{ - let previous = match self - .ru_provider - .get_block((*block).into()) - .into_future() - .instrument(span.clone()) - .await - { - Ok(Some(block)) => block.header.inner, - Ok(None) => { - let _span = span.enter(); - let _ = sender.send(None); - debug!("rollup block not found"); - // This may mean the chain had a rollback, so the next poll - // should find something. - return None; - } - Err(err) => { - let _span = span.enter(); - let _ = sender.send(None); - error!(%err, "Failed to get latest block"); - // Error may be transient, so we should not break the loop. - return None; - } - }; - Some(previous) - } - /// Spawn the task and return a watch::Receiver for the BlockEnv. pub fn spawn(self) -> (watch::Receiver>, JoinHandle<()>) { let (sender, receiver) = watch::channel(None);