diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 79050821..8b6a4416 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,19 +1,17 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, }; -use signet_tx_cache::{ - TxCacheError, - types::{BundleKey, CachedBundle}, -}; +use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, trace, trace_span, warn}; +use tracing::{Instrument, debug, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -58,56 +56,46 @@ impl BundlePoller { /// Fetches all bundles from the tx-cache, paginating through all available pages. pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - let mut all_bundles = Vec::new(); - let mut cursor: Option = None; - - loop { - let resp = match self.tx_cache.get_bundles(cursor).await { - Ok(resp) => resp, - Err(error) => { - if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { - trace!("Not our slot to fetch bundles"); - } else { - counter!("signet.builder.cache.bundle_poll_errors").increment(1); - warn!(%error, "Failed to fetch bundles from tx-cache"); - } - return Err(error); - } - }; - - let (bundle_list, next_cursor) = resp.into_parts(); - all_bundles.extend(bundle_list.bundles); - - let Some(next) = next_cursor else { break }; - cursor = Some(next); - } - - trace!(count = all_bundles.len(), "fetched all bundles from tx-cache"); - histogram!("signet.builder.cache.bundles_fetched").record(all_bundles.len() as f64); - Ok(all_bundles) + self.tx_cache.stream_bundles().try_collect().await } async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); - // Enter the span for the next check. - let _guard = span.enter(); - // Check this here to avoid making the web request if we know // we don't need the results. if outbound.is_closed() { - trace!("No receivers left, shutting down"); + span.in_scope(|| trace!("No receivers left, shutting down")); break; } - // exit the span after the check. - drop(_guard); counter!("signet.builder.cache.bundle_poll_count").increment(1); - if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { - for bundle in bundles.into_iter() { + let Ok(bundles) = self + .check_bundle_cache() + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + counter!("signet.builder.cache.bundle_poll_errors").increment(1); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + }) + .instrument(span.clone()) + .await + else { + time::sleep(self.poll_duration()).await; + continue; + }; + + { + let _guard = span.entered(); + histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); + trace!(count = bundles.len(), "fetched bundles from tx-cache"); + for bundle in bundles { if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); + debug!(?err, "Failed to send bundle - channel is dropped"); break; } } diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 9bfb0ee6..def47090 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -4,7 +4,7 @@ use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::TryStreamExt; +use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; use std::time::Duration; @@ -108,16 +108,19 @@ impl TxPoller { // Check this here to avoid making the web request if we know // we don't need the results. if outbound.is_closed() { - trace!("No receivers left, shutting down"); + span.in_scope(|| trace!("No receivers left, shutting down")); break; } counter!("signet.builder.cache.tx_poll_count").increment(1); - if let Ok(transactions) = - self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| { + if let Ok(transactions) = self + .check_tx_cache() + .inspect_err(|error| { counter!("signet.builder.cache.tx_poll_errors").increment(1); debug!(%error, "Error fetching transactions"); }) + .instrument(span.clone()) + .await { let _guard = span.entered(); histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64);