Skip to content

Commit 30fd847

Browse files
Optimize EOA executor by combining pending transactions and nonce fetching
1 parent a328989 commit 30fd847

File tree

2 files changed

+85
-21
lines changed

2 files changed

+85
-21
lines changed

executors/src/eoa/store/mod.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -628,6 +628,81 @@ impl EoaExecutorStore {
628628
Ok(pending_transactions)
629629
}
630630

631+
/// Peek at pending transactions and get optimistic nonce in a single operation
632+
/// This is optimized for the send flow to reduce Redis round-trips
633+
pub async fn peek_pending_transactions_with_optimistic_nonce(
634+
&self,
635+
limit: u64,
636+
) -> Result<(Vec<PendingTransaction>, u64), TransactionStoreError> {
637+
if limit == 0 {
638+
let optimistic = self.get_optimistic_transaction_count().await?;
639+
return Ok((Vec::new(), optimistic));
640+
}
641+
642+
let pending_key = self.pending_transactions_zset_name();
643+
let optimistic_key = self.optimistic_transaction_count_key_name();
644+
let mut conn = self.redis.clone();
645+
646+
// First pipeline: Get transaction IDs and optimistic nonce together
647+
let start = 0isize;
648+
let stop = (limit - 1) as isize;
649+
650+
let (transaction_ids, optimistic_nonce): (
651+
Vec<PendingTransactionStringWithQueuedAt>,
652+
Option<u64>,
653+
) = twmq::redis::pipe()
654+
.zrange_withscores(&pending_key, start, stop)
655+
.get(&optimistic_key)
656+
.query_async(&mut conn)
657+
.await?;
658+
659+
let optimistic = optimistic_nonce.ok_or_else(|| self.nonce_sync_required_error())?;
660+
661+
if transaction_ids.is_empty() {
662+
return Ok((Vec::new(), optimistic));
663+
}
664+
665+
// Second pipeline: Get transaction data
666+
let mut pipe = twmq::redis::pipe();
667+
for (transaction_id, _) in &transaction_ids {
668+
let tx_data_key = self.transaction_data_key_name(transaction_id);
669+
pipe.hget(&tx_data_key, "user_request");
670+
}
671+
672+
let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;
673+
674+
let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
675+
let mut deletion_pipe = twmq::redis::pipe();
676+
677+
for ((transaction_id, queued_at), user_request) in
678+
transaction_ids.into_iter().zip(user_requests)
679+
{
680+
match user_request {
681+
Some(user_request) => {
682+
let user_request_parsed = serde_json::from_str(&user_request)?;
683+
pending_transactions.push(PendingTransaction {
684+
transaction_id,
685+
queued_at,
686+
user_request: user_request_parsed,
687+
});
688+
}
689+
None => {
690+
tracing::warn!(
691+
"Transaction {} data was missing, deleting transaction from redis",
692+
transaction_id
693+
);
694+
deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id);
695+
}
696+
}
697+
}
698+
699+
if !deletion_pipe.is_empty() {
700+
deletion_pipe.query_async::<()>(&mut conn).await?;
701+
}
702+
703+
Ok((pending_transactions, optimistic))
704+
}
705+
631706
/// Get inflight budget (how many new transactions can be sent)
632707
pub async fn get_inflight_budget(
633708
&self,

executors/src/eoa/worker/send.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -399,42 +399,31 @@ impl<C: Chain> EoaExecutorWorker<C> {
399399

400400
let iteration_start = current_timestamp_ms();
401401

402-
// Get pending transactions
403-
let pending_start = current_timestamp_ms();
404-
let pending_txs = self
402+
// Get pending transactions and optimistic nonce in one operation
403+
let fetch_start = current_timestamp_ms();
404+
let (pending_txs, optimistic_nonce) = self
405405
.store
406-
.peek_pending_transactions(remaining_budget)
406+
.peek_pending_transactions_with_optimistic_nonce(remaining_budget)
407407
.await?;
408408

409+
let batch_size = pending_txs.len().min(remaining_budget as usize);
410+
409411
tracing::info!(
410-
duration_seconds = calculate_duration_seconds(pending_start, current_timestamp_ms()),
412+
duration_seconds = calculate_duration_seconds(fetch_start, current_timestamp_ms()),
411413
iteration = iteration,
412414
pending_count = pending_txs.len(),
415+
optimistic_nonce = optimistic_nonce,
416+
batch_size = batch_size,
413417
eoa = ?self.eoa,
414418
chain_id = self.chain_id,
415419
worker_id = %self.store.worker_id,
416-
"JOB_LIFECYCLE - process_new_transactions: Got pending transactions"
420+
"JOB_LIFECYCLE - process_new_transactions: Got pending transactions and optimistic nonce"
417421
);
418422

419423
if pending_txs.is_empty() {
420424
break;
421425
}
422426

423-
let nonce_start = current_timestamp_ms();
424-
let optimistic_nonce = self.store.get_optimistic_transaction_count().await?;
425-
let batch_size = pending_txs.len().min(remaining_budget as usize);
426-
427-
tracing::info!(
428-
duration_seconds = calculate_duration_seconds(nonce_start, current_timestamp_ms()),
429-
iteration = iteration,
430-
optimistic_nonce = optimistic_nonce,
431-
batch_size = batch_size,
432-
eoa = ?self.eoa,
433-
chain_id = self.chain_id,
434-
worker_id = %self.store.worker_id,
435-
"JOB_LIFECYCLE - process_new_transactions: Got optimistic nonce"
436-
);
437-
438427
tracing::debug!(
439428
iteration = iteration,
440429
batch_size = batch_size,

0 commit comments

Comments
 (0)