Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 11 additions & 55 deletions src/tasks/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use alloy::{
primitives::{B256, U256},
providers::Provider,
};
use init4_bin_base::deps::tracing::{self, Instrument, debug, error, info_span};
use init4_bin_base::deps::tracing::{Instrument, debug, error, info_span};
use tokio::{sync::watch, task::JoinHandle};
use tokio_stream::StreamExt;
use trevm::revm::{context::BlockEnv, context_interface::block::BlobExcessGasAndPrice};
Expand Down Expand Up @@ -58,35 +58,25 @@ impl EnvTask {
async fn task_fut(self, sender: watch::Sender<Option<SimEnv>>) {
let span = info_span!("EnvTask::task_fut::init");

let mut blocks = match self.ru_provider.subscribe_blocks().await {
let mut headers = match self.ru_provider.subscribe_blocks().await {
Ok(poller) => poller,
Err(err) => {
let _span = span.enter();
error!(%err, "Failed to subscribe to blocks");
span.in_scope(|| {
error!(%err, "Failed to subscribe to blocks");
});
return;
}
}
.into_stream();

while let Some(block) =
blocks.next().instrument(info_span!("EnvTask::task_fut::stream")).await
drop(span);

while let Some(rollup_header) =
headers.next().instrument(info_span!("EnvTask::task_fut::stream")).await
{
let span =
info_span!("EnvTask::task_fut::loop", %block.hash, number = tracing::field::Empty);
info_span!("EnvTask::task_fut::loop", %rollup_header.hash, %rollup_header.number);

// Get the rollup header for rollup block simulation environment configuration
let rollup_header = match self
.get_latest_rollup_header(&sender, &block.hash, &span)
.await
{
Some(value) => value,
None => {
// If we failed to get the rollup header, we skip this iteration.
debug!(%block.hash, "failed to get rollup header - continuing to next block");
continue;
}
};
debug!(rollup_header.number, "pulled rollup block for simulation");
span.record("rollup_block_number", rollup_header.number);

// Construct the block env using the previous block header
Expand All @@ -98,7 +88,7 @@ impl EnvTask {
);

if sender
.send(Some(SimEnv { block_env: signet_env, prev_header: rollup_header }))
.send(Some(SimEnv { block_env: signet_env, prev_header: rollup_header.inner }))
.is_err()
{
// The receiver has been dropped, so we can stop the task.
Expand All @@ -108,40 +98,6 @@ impl EnvTask {
}
}

/// Get latest rollup [`Header`] for the given block hash.
async fn get_latest_rollup_header(
&self,
sender: &watch::Sender<Option<SimEnv>>,
block: &alloy::primitives::FixedBytes<32>,
span: &tracing::Span,
) -> Option<Header> {
let previous = match self
.ru_provider
.get_block((*block).into())
.into_future()
.instrument(span.clone())
.await
{
Ok(Some(block)) => block.header.inner,
Ok(None) => {
let _span = span.enter();
let _ = sender.send(None);
debug!("rollup block not found");
// This may mean the chain had a rollback, so the next poll
// should find something.
return None;
}
Err(err) => {
let _span = span.enter();
let _ = sender.send(None);
error!(%err, "Failed to get latest block");
// Error may be transient, so we should not break the loop.
return None;
}
};
Some(previous)
}

/// Spawn the task and return a watch::Receiver for the BlockEnv.
pub fn spawn(self) -> (watch::Receiver<Option<SimEnv>>, JoinHandle<()>) {
let (sender, receiver) = watch::channel(None);
Expand Down