11//! Bundler service responsible for managing bundles.
2+ use std:: sync:: Arc ;
3+
24use super :: oauth:: Authenticator ;
35
46pub use crate :: config:: BuilderConfig ;
57
8+ use alloy:: consensus:: TxEnvelope ;
9+ use alloy:: eips:: eip2718:: Encodable2718 ;
10+ use alloy:: rpc:: types:: mev:: EthSendBundle ;
611use oauth2:: TokenResponse ;
712use reqwest:: Url ;
813use serde:: { Deserialize , Serialize } ;
9- use std :: collections :: HashMap ;
10- use std :: time :: { Duration , Instant } ;
14+ use tokio :: sync :: mpsc :: { unbounded_channel , UnboundedReceiver } ;
15+ use tokio :: task :: JoinHandle ;
1116use zenith_types:: ZenithEthBundle ;
1217
13- /// A bundle response from the tx-pool endpoint, containing a UUID and a
14- /// [`ZenithEthBundle`].
18+ /// Holds a Signet bundle from the cache that has a unique identifier
19+ /// and a Zenith bundle
1520#[ derive( Debug , Clone , Serialize , Deserialize ) ]
1621pub struct Bundle {
17- /// The bundle id (a UUID)
22+ /// Cache identifier for the bundle
1823 pub id : String ,
19- /// The bundle itself
24+ /// The Zenith bundle for this bundle
2025 pub bundle : ZenithEthBundle ,
2126}
2227
28+ impl PartialEq for Bundle {
29+ fn eq ( & self , other : & Self ) -> bool {
30+ self . id == other. id
31+ }
32+ }
33+
34+ impl Eq for Bundle { }
35+
36+ impl From < TxEnvelope > for Bundle {
37+ fn from ( tx : TxEnvelope ) -> Self {
38+ let tx_vec = vec ! [ tx. encoded_2718( ) . into( ) ] ;
39+ Self {
40+ id : tx. tx_hash ( ) . to_string ( ) ,
41+ bundle : ZenithEthBundle {
42+ bundle : EthSendBundle {
43+ txs : tx_vec,
44+ reverting_tx_hashes : vec ! [ * tx. tx_hash( ) ] ,
45+ block_number : 0 , // TODO: This needs to be set properly somewhere after into() is called
46+ min_timestamp : None ,
47+ max_timestamp : None ,
48+ replacement_uuid : None ,
49+ } ,
50+ host_fills : None ,
51+ } ,
52+ }
53+ }
54+ }
55+
2356/// Response from the tx-pool containing a list of bundles.
2457#[ derive( Debug , Clone , Serialize , Deserialize ) ]
2558pub struct TxPoolBundleResponse {
26- /// the list of bundles
59+ /// Bundle responses are availabel on the bundles property
2760 pub bundles : Vec < Bundle > ,
2861}
2962
3063/// The BundlePoller polls the tx-pool for bundles and manages the seen bundles.
31- #[ derive( Debug ) ]
64+ #[ derive( Debug , Clone ) ]
3265pub struct BundlePoller {
33- /// Configuration
66+ /// The builder configuration values.
3467 pub config : BuilderConfig ,
35- /// [`Authenticator`] for fetching OAuth tokens
68+ /// Authentication module that periodically fetches and stores auth tokens.
3669 pub authenticator : Authenticator ,
37- /// Already seen bundle UUIDs
38- pub seen_uuids : HashMap < String , Instant > ,
3970}
4071
4172/// Implements a poller for the block builder to pull bundles from the tx cache.
4273impl BundlePoller {
4374 /// Creates a new BundlePoller from the provided builder config.
4475 pub fn new ( config : & BuilderConfig , authenticator : Authenticator ) -> Self {
45- Self { config : config. clone ( ) , authenticator, seen_uuids : HashMap :: new ( ) }
76+ Self { config : config. clone ( ) , authenticator }
4677 }
4778
4879 /// Fetches bundles from the transaction cache and returns the (oldest? random?) bundle in the cache.
4980 pub async fn check_bundle_cache ( & mut self ) -> eyre:: Result < Vec < Bundle > > {
50- let mut unique: Vec < Bundle > = Vec :: new ( ) ;
51-
5281 let bundle_url: Url = Url :: parse ( & self . config . tx_pool_url ) ?. join ( "bundles" ) ?;
5382 let token = self . authenticator . fetch_oauth_token ( ) . await ?;
5483
55- // Add the token to the request headers
5684 let result = reqwest:: Client :: new ( )
5785 . get ( bundle_url)
5886 . bearer_auth ( token. access_token ( ) . secret ( ) )
@@ -61,42 +89,28 @@ impl BundlePoller {
6189 . error_for_status ( ) ?;
6290
6391 let body = result. bytes ( ) . await ?;
64- let bundles: TxPoolBundleResponse = serde_json:: from_slice ( & body) ?;
65-
66- bundles. bundles . iter ( ) . for_each ( |bundle| {
67- self . check_seen_bundles ( bundle. clone ( ) , & mut unique) ;
68- } ) ;
92+ let resp: TxPoolBundleResponse = serde_json:: from_slice ( & body) ?;
6993
70- Ok ( unique )
94+ Ok ( resp . bundles )
7195 }
7296
73- /// Checks if the bundle has been seen before and if not, adds it to the unique bundles list.
74- fn check_seen_bundles ( & mut self , bundle : Bundle , unique : & mut Vec < Bundle > ) {
75- self . seen_uuids . entry ( bundle. id . clone ( ) ) . or_insert_with ( || {
76- // add to the set of unique bundles
77- unique. push ( bundle. clone ( ) ) ;
78- Instant :: now ( ) + Duration :: from_secs ( self . config . tx_pool_cache_duration )
79- } ) ;
80- }
81-
82- /// Evicts expired bundles from the cache.
83- pub fn evict ( & mut self ) {
84- let expired_keys: Vec < String > = self
85- . seen_uuids
86- . iter ( )
87- . filter_map (
88- |( key, expiry) | {
89- if expiry. elapsed ( ) . is_zero ( ) {
90- Some ( key. clone ( ) )
91- } else {
92- None
97+ /// Spawns a task that simply sends out any bundles it ever finds
98+ pub fn spawn ( mut self ) -> ( UnboundedReceiver < Arc < Bundle > > , JoinHandle < ( ) > ) {
99+ let ( outbound, inbound) = unbounded_channel ( ) ;
100+ let jh = tokio:: spawn ( async move {
101+ loop {
102+ if let Ok ( bundles) = self . check_bundle_cache ( ) . await {
103+ tracing:: debug!( count = ?bundles. len( ) , "found bundles" ) ;
104+ for bundle in bundles. iter ( ) {
105+ if let Err ( err) = outbound. send ( Arc :: new ( bundle. clone ( ) ) ) {
106+ tracing:: error!( err = ?err, "Failed to send bundle" ) ;
107+ }
93108 }
94- } ,
95- )
96- . collect ( ) ;
109+ }
110+ tokio:: time:: sleep ( tokio:: time:: Duration :: from_secs ( 1 ) ) . await ;
111+ }
112+ } ) ;
97113
98- for key in expired_keys {
99- self . seen_uuids . remove ( & key) ;
100- }
114+ ( inbound, jh)
101115 }
102116}
0 commit comments