1- use std :: sync :: Arc ;
2-
1+ //! Transaction service responsible for fetching and sending trasnsactions to the simulator.
2+ use crate :: config :: BuilderConfig ;
33use alloy:: consensus:: TxEnvelope ;
44use eyre:: Error ;
55use reqwest:: { Client , Url } ;
66use serde:: { Deserialize , Serialize } ;
77use serde_json:: from_slice;
88use tokio:: { sync:: mpsc, task:: JoinHandle } ;
99
10- pub use crate :: config:: BuilderConfig ;
11-
1210/// Models a response from the transaction pool.
1311#[ derive( Debug , Clone , Serialize , Deserialize ) ]
1412pub struct TxPoolResponse {
@@ -21,40 +19,47 @@ pub struct TxPoolResponse {
2119pub struct TxPoller {
2220 /// Config values from the Builder.
2321 pub config : BuilderConfig ,
24- /// Reqwest Client for fetching transactions from the tx-pool .
22+ /// Reqwest Client for fetching transactions from the cache .
2523 pub client : Client ,
24+ /// Defines the interval at which the service should poll the cache.
25+ pub poll_interval_ms : u64 ,
2626}
2727
28- /// TxPoller implements a poller task that fetches unique transactions from the transaction pool.
28+ /// [` TxPoller`] implements a poller task that fetches unique transactions from the transaction pool.
2929impl TxPoller {
30- /// Returns a new TxPoller with the given config.
30+ /// Returns a new [`TxPoller`] with the given config.
31+ /// * Defaults to 1000ms poll interval (1s).
3132 pub fn new ( config : & BuilderConfig ) -> Self {
32- Self { config : config. clone ( ) , client : Client :: new ( ) }
33+ Self { config : config. clone ( ) , client : Client :: new ( ) , poll_interval_ms : 1000 }
34+ }
35+
36+ /// Returns a new [`TxPoller`] with the given config and cache polling interval in milliseconds.
37+ pub fn new_with_poll_interval_ms ( config : & BuilderConfig , poll_interval_ms : u64 ) -> Self {
38+ Self { config : config. clone ( ) , client : Client :: new ( ) , poll_interval_ms }
3339 }
3440
35- /// Polls the tx-pool for unique transactions and evicts expired transactions.
36- /// unique transactions that haven't been seen before are sent into the builder pipeline.
41+ /// Polls the transaction cache for transactions.
3742 pub async fn check_tx_cache ( & mut self ) -> Result < Vec < TxEnvelope > , Error > {
3843 let url: Url = Url :: parse ( & self . config . tx_pool_url ) ?. join ( "transactions" ) ?;
3944 let result = self . client . get ( url) . send ( ) . await ?;
4045 let response: TxPoolResponse = from_slice ( result. text ( ) . await ?. as_bytes ( ) ) ?;
4146 Ok ( response. transactions )
4247 }
4348
44- /// Spawns a task that trawls the cache for transactions and sends along anything it finds
45- pub fn spawn ( mut self ) -> ( mpsc:: UnboundedReceiver < Arc < TxEnvelope > > , JoinHandle < ( ) > ) {
49+ /// Spawns a task that continuously polls the cache for transactions and sends any it finds to its sender.
50+ pub fn spawn ( mut self ) -> ( mpsc:: UnboundedReceiver < TxEnvelope > , JoinHandle < ( ) > ) {
4651 let ( outbound, inbound) = mpsc:: unbounded_channel ( ) ;
4752 let jh = tokio:: spawn ( async move {
4853 loop {
4954 if let Ok ( transactions) = self . check_tx_cache ( ) . await {
5055 tracing:: debug!( count = ?transactions. len( ) , "found transactions" ) ;
51- for tx in transactions. iter ( ) {
52- if let Err ( err) = outbound. send ( Arc :: new ( tx . clone ( ) ) ) {
53- tracing:: error!( err = ?err, "failed to send transaction outbound " ) ;
56+ for tx in transactions. into_iter ( ) {
57+ if let Err ( err) = outbound. send ( tx ) {
58+ tracing:: error!( err = ?err, "failed to send transaction - channel is full. " ) ;
5459 }
5560 }
5661 }
57- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
62+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
5863 }
5964 } ) ;
6065 ( inbound, jh)
0 commit comments