From 8c472ae4b3cdc8a44d9bfc5aadad8fc67b8d1e24 Mon Sep 17 00:00:00 2001 From: evalir Date: Thu, 23 Apr 2026 23:27:05 +0200 Subject: [PATCH] feat: replace BundlePoller polling with SSE streaming Mirrors the TxPoller SSE change: subscribe to /bundles/feed via BuilderTxCache::subscribe_bundles for real-time delivery of new bundles, with an initial full_fetch on startup/block-env change and exponential backoff reconnect on error or stream end. Drops the 1s polling loop. --- src/tasks/cache/bundle.rs | 207 +++++++++++++++++++++++------------- src/tasks/cache/system.rs | 2 +- tests/bundle_poller_test.rs | 7 +- 3 files changed, 141 insertions(+), 75 deletions(-) diff --git a/src/tasks/cache/bundle.rs b/src/tasks/cache/bundle.rs index a45fff03..470b3964 100644 --- a/src/tasks/cache/bundle.rs +++ b/src/tasks/cache/bundle.rs @@ -1,113 +1,176 @@ //! Bundler service responsible for fetching bundles and sending them to the simulator. -use crate::config::BuilderConfig; -use futures_util::{TryFutureExt, TryStreamExt}; +use crate::{config::BuilderConfig, tasks::env::SimEnv}; +use futures_util::{Stream, StreamExt, TryFutureExt, TryStreamExt}; use init4_bin_base::perms::tx_cache::{BuilderTxCache, BuilderTxCacheError}; use signet_tx_cache::{TxCacheError, types::CachedBundle}; +use std::{ops::ControlFlow, pin::Pin, time::Duration}; use tokio::{ - sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, + sync::{mpsc, watch}, task::JoinHandle, - time::{self, Duration}, + time, }; use tracing::{Instrument, debug, trace, trace_span, warn}; -/// Poll interval for the bundle poller in milliseconds. -const POLL_INTERVAL_MS: u64 = 1000; +type SseStream = Pin> + Send>>; -/// The BundlePoller polls the tx-pool for bundles. +const INITIAL_RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +const MAX_RECONNECT_BACKOFF: Duration = Duration::from_secs(30); + +/// The BundlePoller fetches bundles from the tx-pool on startup and on each +/// block environment change, and subscribes to an SSE stream for real-time +/// delivery of new bundles in between. #[derive(Debug)] pub struct BundlePoller { /// The builder configuration values. config: &'static BuilderConfig, - /// Client for the tx cache. tx_cache: BuilderTxCache, - - /// Defines the interval at which the bundler polls the tx-pool for bundles. - poll_interval_ms: u64, -} - -impl Default for BundlePoller { - fn default() -> Self { - Self::new() - } + /// Receiver for block environment updates, used to trigger refetches. + envs: watch::Receiver>, } -/// Implements a poller for the block builder to pull bundles from the tx-pool. impl BundlePoller { - /// Creates a new BundlePoller from the provided builder config. - pub fn new() -> Self { - Self::new_with_poll_interval_ms(POLL_INTERVAL_MS) - } - - /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms. - pub fn new_with_poll_interval_ms(poll_interval_ms: u64) -> Self { + /// Returns a new [`BundlePoller`] with the given block environment receiver. + pub fn new(envs: watch::Receiver>) -> Self { let config = crate::config(); let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - Self { config, tx_cache, poll_interval_ms } + Self { config, tx_cache, envs } } - /// Returns the poll duration as a [`Duration`]. - const fn poll_duration(&self) -> Duration { - Duration::from_millis(self.poll_interval_ms) + async fn full_fetch(&self, outbound: &mpsc::UnboundedSender) { + let span = trace_span!("BundlePoller::full_fetch", url = %self.config.tx_pool_url); + + crate::metrics::inc_bundle_poll_count(); + if let Ok(bundles) = self + .tx_cache + .stream_bundles() + .try_collect::>() + // NotOurSlot is expected whenever the builder isn't slot-permissioned; + // don't bump the error counter or warn. + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to fetch bundles"); + } + _ => { + crate::metrics::inc_bundle_poll_errors(); + warn!(%error, "Failed to fetch bundles from tx-cache"); + } + }) + .instrument(span.clone()) + .await + { + let _guard = span.entered(); + crate::metrics::record_bundles_fetched(bundles.len()); + trace!(count = bundles.len(), "found bundles"); + for bundle in bundles { + if outbound.send(bundle).is_err() { + debug!("Outbound channel closed during full fetch"); + return; + } + } + } } - /// Fetches all bundles from the tx-cache, paginating through all available pages. - pub async fn check_bundle_cache(&self) -> Result, BuilderTxCacheError> { - self.tx_cache.stream_bundles().try_collect().await + /// Returns an empty stream on connection failure so the caller can handle + /// reconnection uniformly. + async fn subscribe(&self) -> SseStream { + self.tx_cache + .subscribe_bundles() + .await + .inspect( + |_| debug!(url = %self.config.tx_pool_url, "SSE bundle subscription established"), + ) + .inspect_err(|error| match error { + BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { + trace!("Not our slot to subscribe to bundles"); + } + _ => warn!(%error, "Failed to open SSE bundle subscription"), + }) + .map(|s| Box::pin(s) as SseStream) + .unwrap_or_else(|_| Box::pin(futures_util::stream::empty())) } - async fn task_future(self, outbound: UnboundedSender) { - loop { - let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url); + /// Runs a full refetch concurrently with re-subscribing, to cover any + /// items missed while disconnected. + async fn reconnect( + &mut self, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + ) -> SseStream { + tokio::select! { + // Biased: a block env change wins over the backoff sleep. An env + // change triggers a full refetch below anyway, which supersedes the + // sleep-then-reconnect path — so there's no point waiting out the + // backoff. + biased; + _ = self.envs.changed() => {} + _ = time::sleep(*backoff) => {} + } + *backoff = (*backoff * 2).min(MAX_RECONNECT_BACKOFF); + let (_, stream) = tokio::join!(self.full_fetch(outbound), self.subscribe()); + stream + } - // Check this here to avoid making the web request if we know - // we don't need the results. - if outbound.is_closed() { - span.in_scope(|| trace!("No receivers left, shutting down")); - break; + /// Returns `Break` when the outbound channel has closed and the task + /// should shut down. + async fn handle_sse_item( + &mut self, + item: Option>, + outbound: &mpsc::UnboundedSender, + backoff: &mut Duration, + stream: &mut SseStream, + ) -> ControlFlow<()> { + match item { + Some(Ok(bundle)) => { + *backoff = INITIAL_RECONNECT_BACKOFF; + if outbound.send(bundle).is_err() { + trace!("No receivers left, shutting down"); + return ControlFlow::Break(()); + } + } + Some(Err(error)) => { + warn!(%error, "SSE bundle stream interrupted, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; + } + None => { + warn!("SSE bundle stream ended, reconnecting"); + *stream = self.reconnect(outbound, backoff).await; } + } + ControlFlow::Continue(()) + } - crate::metrics::inc_bundle_poll_count(); - let Ok(bundles) = self - .check_bundle_cache() - .inspect_err(|error| match error { - BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => { - trace!("Not our slot to fetch bundles"); - } - _ => { - crate::metrics::inc_bundle_poll_errors(); - warn!(%error, "Failed to fetch bundles from tx-cache"); - } - }) - .instrument(span.clone()) - .await - else { - time::sleep(self.poll_duration()).await; - continue; - }; + async fn task_future(mut self, outbound: mpsc::UnboundedSender) { + let (_, mut sse_stream) = tokio::join!(self.full_fetch(&outbound), self.subscribe()); + let mut backoff = INITIAL_RECONNECT_BACKOFF; - { - let _guard = span.entered(); - crate::metrics::record_bundles_fetched(bundles.len()); - trace!(count = bundles.len(), "fetched bundles from tx-cache"); - for bundle in bundles { - if let Err(err) = outbound.send(bundle) { - debug!(?err, "Failed to send bundle - channel is dropped"); + loop { + tokio::select! { + item = sse_stream.next() => { + if self + .handle_sse_item(item, &outbound, &mut backoff, &mut sse_stream) + .await + .is_break() + { + break; + } + } + res = self.envs.changed() => { + if res.is_err() { + debug!("Block env channel closed, shutting down"); break; } + trace!("Block env changed, refetching all bundles"); + self.full_fetch(&outbound).await; } } - - time::sleep(self.poll_duration()).await; } } - /// Spawns a task that sends bundles it finds to its channel sender. - pub fn spawn(self) -> (UnboundedReceiver, JoinHandle<()>) { - let (outbound, inbound) = unbounded_channel(); - + /// Spawns the task future and returns a receiver for bundles it finds. + pub fn spawn(self) -> (mpsc::UnboundedReceiver, JoinHandle<()>) { + let (outbound, inbound) = mpsc::unbounded_channel(); let jh = tokio::spawn(self.task_future(outbound)); - (inbound, jh) } } diff --git a/src/tasks/cache/system.rs b/src/tasks/cache/system.rs index 89698a4d..26a1a5b6 100644 --- a/src/tasks/cache/system.rs +++ b/src/tasks/cache/system.rs @@ -27,7 +27,7 @@ impl CacheTasks { let (tx_receiver, tx_poller) = tx_poller.spawn(); // Bundle Poller pulls bundles from the cache - let bundle_poller = BundlePoller::new(); + let bundle_poller = BundlePoller::new(self.block_env.clone()); let (bundle_receiver, bundle_poller) = bundle_poller.spawn(); // Set up the cache task diff --git a/tests/bundle_poller_test.rs b/tests/bundle_poller_test.rs index 9acfe483..1dc7c3c4 100644 --- a/tests/bundle_poller_test.rs +++ b/tests/bundle_poller_test.rs @@ -2,15 +2,18 @@ use builder::test_utils::{setup_logging, setup_test_config}; use eyre::Result; +use futures_util::TryStreamExt; +use init4_bin_base::perms::tx_cache::BuilderTxCache; #[tokio::test] async fn test_bundle_poller_roundtrip() -> Result<()> { setup_logging(); setup_test_config(); - let bundle_poller = builder::tasks::cache::BundlePoller::new(); + let config = builder::config(); + let tx_cache = BuilderTxCache::new(config.tx_pool_url.clone(), config.oauth_token()); - let _ = bundle_poller.check_bundle_cache().await?; + let _bundles: Vec<_> = tx_cache.stream_bundles().try_collect().await?; Ok(()) }