1- //! Bundler service responsible for managing bundles.
2- use std:: sync:: Arc ;
3-
4- use super :: oauth:: Authenticator ;
5-
1+ //! Bundler service responsible for fetching bundles and sending them to the simulator.
62pub use crate :: config:: BuilderConfig ;
7-
3+ use crate :: tasks :: oauth :: Authenticator ;
84use oauth2:: TokenResponse ;
95use reqwest:: Url ;
106use serde:: { Deserialize , Serialize } ;
117use tokio:: sync:: mpsc:: { unbounded_channel, UnboundedReceiver } ;
128use tokio:: task:: JoinHandle ;
139use zenith_types:: ZenithEthBundle ;
1410
15- /// Holds a Signet bundle from the cache that has a unique identifier
16- /// and a Zenith bundle
11+ /// Holds a bundle from the cache with a unique ID and a Zenith bundle.
1712#[ derive( Debug , Clone , Serialize , Deserialize ) ]
1813pub struct Bundle {
1914 /// Cache identifier for the bundle
@@ -22,38 +17,37 @@ pub struct Bundle {
2217 pub bundle : ZenithEthBundle ,
2318}
2419
25- impl PartialEq for Bundle {
26- fn eq ( & self , other : & Self ) -> bool {
27- self . id == other. id
28- }
29- }
30-
31- impl Eq for Bundle { }
32-
3320/// Response from the tx-pool containing a list of bundles.
3421#[ derive( Debug , Clone , Serialize , Deserialize ) ]
3522pub struct TxPoolBundleResponse {
36- /// Bundle responses are availabel on the bundles property
23+ /// Bundle responses are available on the bundles property.
3724 pub bundles : Vec < Bundle > ,
3825}
3926
40- /// The BundlePoller polls the tx-pool for bundles and manages the seen bundles .
27+ /// The BundlePoller polls the tx-pool for bundles.
4128#[ derive( Debug , Clone ) ]
4229pub struct BundlePoller {
4330 /// The builder configuration values.
4431 pub config : BuilderConfig ,
4532 /// Authentication module that periodically fetches and stores auth tokens.
4633 pub authenticator : Authenticator ,
34+ /// Defines the interval at which the bundler polls the tx-pool for bundles.
35+ pub poll_interval_ms : u64 ,
4736}
4837
49- /// Implements a poller for the block builder to pull bundles from the tx cache .
38+ /// Implements a poller for the block builder to pull bundles from the tx-pool .
5039impl BundlePoller {
5140 /// Creates a new BundlePoller from the provided builder config.
5241 pub fn new ( config : & BuilderConfig , authenticator : Authenticator ) -> Self {
53- Self { config : config. clone ( ) , authenticator }
42+ Self { config : config. clone ( ) , authenticator, poll_interval_ms : 1000 }
5443 }
5544
56- /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
45+ /// Creates a new BundlePoller from the provided builder config and with the specified poll interval in ms.
46+ pub fn new_with_poll_interval_ms ( config : & BuilderConfig , authenticator : Authenticator ) -> Self {
47+ Self { config : config. clone ( ) , authenticator, poll_interval_ms : 1000 }
48+ }
49+
50+ /// Fetches bundles from the transaction cache and returns them.
5751 pub async fn check_bundle_cache ( & mut self ) -> eyre:: Result < Vec < Bundle > > {
5852 let bundle_url: Url = Url :: parse ( & self . config . tx_pool_url ) ?. join ( "bundles" ) ?;
5953 let token = self . authenticator . fetch_oauth_token ( ) . await ?;
@@ -71,23 +65,31 @@ impl BundlePoller {
7165 Ok ( resp. bundles )
7266 }
7367
74- /// Spawns a task that simply sends out any bundles it ever finds
75- pub fn spawn ( mut self ) -> ( UnboundedReceiver < Arc < Bundle > > , JoinHandle < ( ) > ) {
68+ /// Spawns a task that sends bundles it finds to its channel sender.
69+ pub fn spawn ( mut self ) -> ( UnboundedReceiver < Bundle > , JoinHandle < ( ) > ) {
7670 let ( outbound, inbound) = unbounded_channel ( ) ;
7771 let jh = tokio:: spawn ( async move {
7872 loop {
7973 if let Ok ( bundles) = self . check_bundle_cache ( ) . await {
8074 tracing:: debug!( count = ?bundles. len( ) , "found bundles" ) ;
81- for bundle in bundles. iter ( ) {
82- if let Err ( err) = outbound. send ( Arc :: new ( bundle. clone ( ) ) ) {
83- tracing:: error!( err = ?err, "Failed to send bundle" ) ;
75+ for bundle in bundles. into_iter ( ) {
76+ if let Err ( err) = outbound. send ( bundle) {
77+ tracing:: error!( err = ?err, "Failed to send bundle - channel full " ) ;
8478 }
8579 }
8680 }
87- tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
81+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_millis ( self . poll_interval_ms ) ) . await ;
8882 }
8983 } ) ;
9084
9185 ( inbound, jh)
9286 }
9387}
88+
89+ impl PartialEq for Bundle {
90+ fn eq ( & self , other : & Self ) -> bool {
91+ self . id == other. id
92+ }
93+ }
94+
95+ impl Eq for Bundle { }
0 commit comments