Skip to content

Commit dd74569

Browse files
committed
feat: filter low nonces when ingesting txns
1 parent 32d1725 commit dd74569

File tree

1 file changed

+40
-7
lines changed

1 file changed

+40
-7
lines changed

src/tasks/cache/tx.rs

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
//! Transaction service responsible for fetching and sending trasnsactions to the simulator.
22
use crate::config::BuilderConfig;
3-
use alloy::consensus::TxEnvelope;
3+
use alloy::{
4+
consensus::{Transaction, TxEnvelope, transaction::SignerRecoverable},
5+
providers::Provider,
6+
};
47
use eyre::Error;
58
use reqwest::{Client, Url};
69
use serde::{Deserialize, Serialize};
710
use std::time::Duration;
811
use tokio::{sync::mpsc, task::JoinHandle, time};
9-
use tracing::{Instrument, debug, debug_span, trace};
12+
use tracing::{Instrument, debug, debug_span, info_span, trace, warn};
1013

1114
/// Poll interval for the transaction poller in milliseconds.
1215
const POLL_INTERVAL_MS: u64 = 1000;
@@ -56,6 +59,40 @@ impl TxPoller {
5659
Duration::from_millis(self.poll_interval_ms)
5760
}
5861

62+
fn spawn_check_nonce(&self, tx: TxEnvelope, outbound: mpsc::UnboundedSender<TxEnvelope>) {
63+
tokio::spawn(async move {
64+
let Ok(ru_provider) = crate::config().connect_ru_provider().await else {
65+
warn!("Failed to connect to RU provider, stopping noncecheck task.");
66+
return;
67+
};
68+
let span = info_span!("check_nonce", tx_id = %tx.tx_hash());
69+
70+
let Ok(sender) = tx.recover_signer() else {
71+
span_warn!(span, "Failed to recover sender from transaction");
72+
return;
73+
};
74+
75+
let Ok(tx_count) = ru_provider
76+
.get_transaction_count(sender)
77+
.into_future()
78+
.instrument(span.clone())
79+
.await
80+
else {
81+
span_warn!(span, %sender, "Failed to fetch nonce for sender");
82+
return;
83+
};
84+
85+
if tx.nonce() < tx_count {
86+
span_debug!(span, %sender, tx_nonce = %tx.nonce(), ru_nonce = %tx_count, "Dropping transaction with stale nonce");
87+
return;
88+
}
89+
90+
if outbound.send(tx).is_err() {
91+
span_warn!(span, "Outbound channel closed, stopping NonceChecker task.");
92+
}
93+
});
94+
}
95+
5996
/// Polls the transaction cache for transactions.
6097
pub async fn check_tx_cache(&mut self) -> Result<Vec<TxEnvelope>, Error> {
6198
let url: Url = self.config.tx_pool_url.join("transactions")?;
@@ -94,11 +131,7 @@ impl TxPoller {
94131
let _guard = span.entered();
95132
debug!(count = ?transactions.len(), "found transactions");
96133
for tx in transactions.into_iter() {
97-
if outbound.send(tx).is_err() {
98-
// If there are no receivers, we can shut down
99-
trace!("No receivers left, shutting down");
100-
break;
101-
}
134+
self.spawn_check_nonce(tx, outbound.clone());
102135
}
103136
}
104137

0 commit comments

Comments
 (0)