From 30fd847fbdcfe2aec30dcdb43ad9a2221f290fd6 Mon Sep 17 00:00:00 2001 From: Joaquim Verges Date: Fri, 21 Nov 2025 22:36:03 +1300 Subject: [PATCH] Optimize EOA executor by combining pending transactions and nonce fetching --- executors/src/eoa/store/mod.rs | 75 ++++++++++++++++++++++++++++++++ executors/src/eoa/worker/send.rs | 31 +++++-------- 2 files changed, 85 insertions(+), 21 deletions(-) diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index c1fa275..876253c 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -628,6 +628,81 @@ impl EoaExecutorStore { Ok(pending_transactions) } + /// Peek at pending transactions and get optimistic nonce in a single operation + /// This is optimized for the send flow to reduce Redis round-trips + pub async fn peek_pending_transactions_with_optimistic_nonce( + &self, + limit: u64, + ) -> Result<(Vec, u64), TransactionStoreError> { + if limit == 0 { + let optimistic = self.get_optimistic_transaction_count().await?; + return Ok((Vec::new(), optimistic)); + } + + let pending_key = self.pending_transactions_zset_name(); + let optimistic_key = self.optimistic_transaction_count_key_name(); + let mut conn = self.redis.clone(); + + // First pipeline: Get transaction IDs and optimistic nonce together + let start = 0isize; + let stop = (limit - 1) as isize; + + let (transaction_ids, optimistic_nonce): ( + Vec, + Option, + ) = twmq::redis::pipe() + .zrange_withscores(&pending_key, start, stop) + .get(&optimistic_key) + .query_async(&mut conn) + .await?; + + let optimistic = optimistic_nonce.ok_or_else(|| self.nonce_sync_required_error())?; + + if transaction_ids.is_empty() { + return Ok((Vec::new(), optimistic)); + } + + // Second pipeline: Get transaction data + let mut pipe = twmq::redis::pipe(); + for (transaction_id, _) in &transaction_ids { + let tx_data_key = self.transaction_data_key_name(transaction_id); + pipe.hget(&tx_data_key, "user_request"); + } + + let user_requests: Vec> = pipe.query_async(&mut conn).await?; + + let mut pending_transactions: Vec = Vec::new(); + let mut deletion_pipe = twmq::redis::pipe(); + + for ((transaction_id, queued_at), user_request) in + transaction_ids.into_iter().zip(user_requests) + { + match user_request { + Some(user_request) => { + let user_request_parsed = serde_json::from_str(&user_request)?; + pending_transactions.push(PendingTransaction { + transaction_id, + queued_at, + user_request: user_request_parsed, + }); + } + None => { + tracing::warn!( + "Transaction {} data was missing, deleting transaction from redis", + transaction_id + ); + deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id); + } + } + } + + if !deletion_pipe.is_empty() { + deletion_pipe.query_async::<()>(&mut conn).await?; + } + + Ok((pending_transactions, optimistic)) + } + /// Get inflight budget (how many new transactions can be sent) pub async fn get_inflight_budget( &self, diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index 50a855a..9742ee6 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -399,42 +399,31 @@ impl EoaExecutorWorker { let iteration_start = current_timestamp_ms(); - // Get pending transactions - let pending_start = current_timestamp_ms(); - let pending_txs = self + // Get pending transactions and optimistic nonce in one operation + let fetch_start = current_timestamp_ms(); + let (pending_txs, optimistic_nonce) = self .store - .peek_pending_transactions(remaining_budget) + .peek_pending_transactions_with_optimistic_nonce(remaining_budget) .await?; + let batch_size = pending_txs.len().min(remaining_budget as usize); + tracing::info!( - duration_seconds = calculate_duration_seconds(pending_start, current_timestamp_ms()), + duration_seconds = calculate_duration_seconds(fetch_start, current_timestamp_ms()), iteration = iteration, pending_count = pending_txs.len(), + optimistic_nonce = optimistic_nonce, + batch_size = batch_size, eoa = ?self.eoa, chain_id = self.chain_id, worker_id = %self.store.worker_id, - "JOB_LIFECYCLE - process_new_transactions: Got pending transactions" + "JOB_LIFECYCLE - process_new_transactions: Got pending transactions and optimistic nonce" ); if pending_txs.is_empty() { break; } - let nonce_start = current_timestamp_ms(); - let optimistic_nonce = self.store.get_optimistic_transaction_count().await?; - let batch_size = pending_txs.len().min(remaining_budget as usize); - - tracing::info!( - duration_seconds = calculate_duration_seconds(nonce_start, current_timestamp_ms()), - iteration = iteration, - optimistic_nonce = optimistic_nonce, - batch_size = batch_size, - eoa = ?self.eoa, - chain_id = self.chain_id, - worker_id = %self.store.worker_id, - "JOB_LIFECYCLE - process_new_transactions: Got optimistic nonce" - ); - tracing::debug!( iteration = iteration, batch_size = batch_size,