From 1f432f933619aa3343d632f7094d5a98eb97ee48 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 1 Apr 2026 17:04:33 +0200 Subject: [PATCH 1/3] refactor: use SDK stream_bundles() for bundle cache pagination --- src/tasks/cache/bundle.rs | 53 ++++++++++++--------------------------- 1 file changed, 16 insertions(+), 37 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 79050821..4f6e947e 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,13 +1,11 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; +use futures_util::TryStreamExt; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, }; -use signet_tx_cache::{ - TxCacheError, - types::{BundleKey, CachedBundle}, -}; +use signet_tx_cache::{TxCacheError, types::CachedBundle}; use tokio::{ sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, task::JoinHandle, @@ -58,53 +56,34 @@ impl BundlePoller { /// Fetches all bundles from the tx-cache, paginating through all available pages. pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - let mut all_bundles = Vec::new(); - let mut cursor: Option = None; - - loop { - let resp = match self.tx_cache.get_bundles(cursor).await { - Ok(resp) => resp, - Err(error) => { - if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { - trace!("Not our slot to fetch bundles"); - } else { - counter!("signet.builder.cache.bundle_poll_errors").increment(1); - warn!(%error, "Failed to fetch bundles from tx-cache"); - } - return Err(error); - } - }; - - let (bundle_list, next_cursor) = resp.into_parts(); - all_bundles.extend(bundle_list.bundles); - - let Some(next) = next_cursor else { break }; - cursor = Some(next); - } - - trace!(count = all_bundles.len(), "fetched all bundles from tx-cache"); - histogram!("signet.builder.cache.bundles_fetched").record(all_bundles.len() as f64); - Ok(all_bundles) + self.tx_cache.stream_bundles().try_collect().await } async fn task_future(self, outbound: UnboundedSender) { loop { let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); - // Enter the span for the next check. - let _guard = span.enter(); - // Check this here to avoid making the web request if we know // we don't need the results. if outbound.is_closed() { trace!("No receivers left, shutting down"); break; } - // exit the span after the check. - drop(_guard); counter!("signet.builder.cache.bundle_poll_count").increment(1); - if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await { + if let Ok(bundles) = + self.check_bundle_cache().instrument(span.clone()).await.inspect_err(|error| { + if matches!(error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { + trace!("Not our slot to fetch bundles"); + } else { + counter!("signet.builder.cache.bundle_poll_errors").increment(1); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + }) + { + let _guard = span.enter(); + histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); + trace!(count = bundles.len(), "fetched bundles from tx-cache"); for bundle in bundles.into_iter() { if let Err(err) = outbound.send(bundle) { span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); From b9480cca112356cb7e4ec00741823eee85e93968 Mon Sep 17 00:00:00 2001 From: evalir Date: Wed, 1 Apr 2026 17:19:47 +0200 Subject: [PATCH 2/3] refactor: use SDK stream_bundles() for bundle cache pagination --- src/tasks/cache/bundle.rs | 36 +++++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 4f6e947e..795af753 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -71,24 +71,30 @@ impl BundlePoller { } counter!("signet.builder.cache.bundle_poll_count").increment(1); - if let Ok(bundles) = + let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await.inspect_err(|error| { - if matches!(error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) { - trace!("Not our slot to fetch bundles"); - } else { - counter!("signet.builder.cache.bundle_poll_errors").increment(1); - warn!(%error, "Failed to fetch bundles from tx-cache"); + match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + counter!("signet.builder.cache.bundle_poll_errors").increment(1); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } } }) - { - let _guard = span.enter(); - histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); - trace!(count = bundles.len(), "fetched bundles from tx-cache"); - for bundle in bundles.into_iter() { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; - } + else { + time::sleep(self.poll_duration()).await; + continue; + }; + + let _guard = span.enter(); + histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); + trace!(count = bundles.len(), "fetched bundles from tx-cache"); + for bundle in bundles { + if let Err(err) = outbound.send(bundle) { + span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); + break; } } From 7455b331d86a650fed3ffb19f211fc425e1e5548 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 2 Apr 2026 16:22:15 +0200 Subject: [PATCH 3/3] fix: chain inspect_err before instrument for correct span context --- src/tasks/cache/bundle.rs | 43 +++++++++++++++++++++------------------ src/tasks/cache/tx.rs | 11 ++++++---- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index 795af753..8b6a4416 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,6 +1,6 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. use crate::config::BuilderConfig; -use futures_util::TryStreamExt; +use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::{ deps::metrics::{counter, histogram}, perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}, @@ -11,7 +11,7 @@ use tokio::{ task::JoinHandle, time::{self, Duration}, }; -use tracing::{Instrument, trace, trace_span, warn}; +use tracing::{Instrument, debug, trace, trace_span, warn}; /// Poll interval for the bundle poller in milliseconds. const POLL_INTERVAL_MS: u64 = 1000; @@ -66,35 +66,38 @@ impl BundlePoller { // Check this here to avoid making the web request if we know // we don't need the results. if outbound.is_closed() { - trace!("No receivers left, shutting down"); + span.in_scope(|| trace!("No receivers left, shutting down")); break; } counter!("signet.builder.cache.bundle_poll_count").increment(1); - let Ok(bundles) = - self.check_bundle_cache().instrument(span.clone()).await.inspect_err(|error| { - match error { - BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { - trace!("Not our slot to fetch bundles"); - } - _ => { - counter!("signet.builder.cache.bundle_poll_errors").increment(1); - warn!(%error, "Failed to fetch bundles from tx-cache"); - } + let Ok(bundles) = self + .check_bundle_cache() + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + counter!("signet.builder.cache.bundle_poll_errors").increment(1); + warn!(%error, "Failed to fetch bundles from tx-cache"); } }) + .instrument(span.clone()) + .await else { time::sleep(self.poll_duration()).await; continue; }; - let _guard = span.enter(); - histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); - trace!(count = bundles.len(), "fetched bundles from tx-cache"); - for bundle in bundles { - if let Err(err) = outbound.send(bundle) { - span_debug!(span, ?err, "Failed to send bundle - channel is dropped"); - break; + { + let _guard = span.entered(); + histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64); + trace!(count = bundles.len(), "fetched bundles from tx-cache"); + for bundle in bundles { + if let Err(err) = outbound.send(bundle) { + debug!(?err, "Failed to send bundle - channel is dropped"); + break; + } } } diff --git a/src/tasks/cache/tx.rs b/src/tasks/cache/tx.rs index 9bfb0ee6..def47090 100644 --- a/src/tasks/cache/tx.rs +++ b/src/tasks/cache/tx.rs @@ -4,7 +4,7 @@ use alloy::{ consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable}, providers::Provider, }; -use futures_util::TryStreamExt; +use futures_util::{TryFutureExt, TryStreamExt}; use init4_bin_base::deps::metrics::{counter, histogram}; use signet_tx_cache::{TxCache, TxCacheError}; use std::time::Duration; @@ -108,16 +108,19 @@ impl TxPoller { // Check this here to avoid making the web request if we know // we don't need the results. if outbound.is_closed() { - trace!("No receivers left, shutting down"); + span.in_scope(|| trace!("No receivers left, shutting down")); break; } counter!("signet.builder.cache.tx_poll_count").increment(1); - if let Ok(transactions) = - self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| { + if let Ok(transactions) = self + .check_tx_cache() + .inspect_err(|error| { counter!("signet.builder.cache.tx_poll_errors").increment(1); debug!(%error, "Error fetching transactions"); }) + .instrument(span.clone()) + .await { let _guard = span.entered(); histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64);