Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 29 additions & 41 deletions src/tasks/cache/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
//! Bundler service responsible for fetching bundles and sending them to the simulator.
use crate::config::BuilderConfig;
use futures_util::{TryFutureExt, TryStreamExt};
use init4_bin_base::{
deps::metrics::{counter, histogram},
perms::tx_cache::{BuilderTxCache, BuilderTxCacheError},
};
use signet_tx_cache::{
TxCacheError,
types::{BundleKey, CachedBundle},
};
use signet_tx_cache::{TxCacheError, types::CachedBundle};
use tokio::{
sync::mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel},
task::JoinHandle,
time::{self, Duration},
};
use tracing::{Instrument, trace, trace_span, warn};
use tracing::{Instrument, debug, trace, trace_span, warn};

/// Poll interval for the bundle poller in milliseconds.
const POLL_INTERVAL_MS: u64 = 1000;
Expand Down Expand Up @@ -58,56 +56,46 @@ impl BundlePoller {

/// Fetches all bundles from the tx-cache, paginating through all available pages.
pub async fn check_bundle_cache(&self) -> Result<Vec<CachedBundle>, BuilderTxCacheError> {
let mut all_bundles = Vec::new();
let mut cursor: Option<BundleKey> = None;

loop {
let resp = match self.tx_cache.get_bundles(cursor).await {
Ok(resp) => resp,
Err(error) => {
if matches!(&error, BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot)) {
trace!("Not our slot to fetch bundles");
} else {
counter!("signet.builder.cache.bundle_poll_errors").increment(1);
warn!(%error, "Failed to fetch bundles from tx-cache");
}
return Err(error);
}
};

let (bundle_list, next_cursor) = resp.into_parts();
all_bundles.extend(bundle_list.bundles);

let Some(next) = next_cursor else { break };
cursor = Some(next);
}

trace!(count = all_bundles.len(), "fetched all bundles from tx-cache");
histogram!("signet.builder.cache.bundles_fetched").record(all_bundles.len() as f64);
Ok(all_bundles)
self.tx_cache.stream_bundles().try_collect().await
}

async fn task_future(self, outbound: UnboundedSender<CachedBundle>) {
loop {
let span = trace_span!("BundlePoller::loop", url = %self.config.tx_pool_url);

// Enter the span for the next check.
let _guard = span.enter();

// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
span.in_scope(|| trace!("No receivers left, shutting down"));
break;
}
// exit the span after the check.
drop(_guard);

counter!("signet.builder.cache.bundle_poll_count").increment(1);
if let Ok(bundles) = self.check_bundle_cache().instrument(span.clone()).await {
for bundle in bundles.into_iter() {
let Ok(bundles) = self
.check_bundle_cache()
.inspect_err(|error| match error {
BuilderTxCacheError::TxCache(TxCacheError::NotOurSlot) => {
trace!("Not our slot to fetch bundles");
}
_ => {
counter!("signet.builder.cache.bundle_poll_errors").increment(1);
warn!(%error, "Failed to fetch bundles from tx-cache");
}
})
.instrument(span.clone())
.await
else {
time::sleep(self.poll_duration()).await;
continue;
};

{
let _guard = span.entered();
histogram!("signet.builder.cache.bundles_fetched").record(bundles.len() as f64);
trace!(count = bundles.len(), "fetched bundles from tx-cache");
for bundle in bundles {
if let Err(err) = outbound.send(bundle) {
span_debug!(span, ?err, "Failed to send bundle - channel is dropped");
debug!(?err, "Failed to send bundle - channel is dropped");
break;
}
}
Expand Down
11 changes: 7 additions & 4 deletions src/tasks/cache/tx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use alloy::{
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
providers::Provider,
};
use futures_util::TryStreamExt;
use futures_util::{TryFutureExt, TryStreamExt};
use init4_bin_base::deps::metrics::{counter, histogram};
use signet_tx_cache::{TxCache, TxCacheError};
use std::time::Duration;
Expand Down Expand Up @@ -108,16 +108,19 @@ impl TxPoller {
// Check this here to avoid making the web request if we know
// we don't need the results.
if outbound.is_closed() {
trace!("No receivers left, shutting down");
span.in_scope(|| trace!("No receivers left, shutting down"));
break;
}

counter!("signet.builder.cache.tx_poll_count").increment(1);
if let Ok(transactions) =
self.check_tx_cache().instrument(span.clone()).await.inspect_err(|error| {
if let Ok(transactions) = self
.check_tx_cache()
.inspect_err(|error| {
counter!("signet.builder.cache.tx_poll_errors").increment(1);
debug!(%error, "Error fetching transactions");
})
.instrument(span.clone())
.await
{
let _guard = span.entered();
histogram!("signet.builder.cache.txs_fetched").record(transactions.len() as f64);
Expand Down