Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,6 +628,81 @@ impl EoaExecutorStore {
Ok(pending_transactions)
}

/// Peek at pending transactions and get optimistic nonce in a single operation
/// This is optimized for the send flow to reduce Redis round-trips
pub async fn peek_pending_transactions_with_optimistic_nonce(
&self,
limit: u64,
) -> Result<(Vec<PendingTransaction>, u64), TransactionStoreError> {
if limit == 0 {
let optimistic = self.get_optimistic_transaction_count().await?;
return Ok((Vec::new(), optimistic));
}

let pending_key = self.pending_transactions_zset_name();
let optimistic_key = self.optimistic_transaction_count_key_name();
let mut conn = self.redis.clone();

// First pipeline: Get transaction IDs and optimistic nonce together
let start = 0isize;
let stop = (limit - 1) as isize;

let (transaction_ids, optimistic_nonce): (
Vec<PendingTransactionStringWithQueuedAt>,
Option<u64>,
) = twmq::redis::pipe()
.zrange_withscores(&pending_key, start, stop)
.get(&optimistic_key)
.query_async(&mut conn)
.await?;

let optimistic = optimistic_nonce.ok_or_else(|| self.nonce_sync_required_error())?;

if transaction_ids.is_empty() {
return Ok((Vec::new(), optimistic));
}

// Second pipeline: Get transaction data
let mut pipe = twmq::redis::pipe();
for (transaction_id, _) in &transaction_ids {
let tx_data_key = self.transaction_data_key_name(transaction_id);
pipe.hget(&tx_data_key, "user_request");
}

let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;

let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
let mut deletion_pipe = twmq::redis::pipe();

for ((transaction_id, queued_at), user_request) in
transaction_ids.into_iter().zip(user_requests)
{
match user_request {
Some(user_request) => {
let user_request_parsed = serde_json::from_str(&user_request)?;
pending_transactions.push(PendingTransaction {
transaction_id,
queued_at,
user_request: user_request_parsed,
});
}
None => {
tracing::warn!(
"Transaction {} data was missing, deleting transaction from redis",
transaction_id
);
deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id);
}
}
}

if !deletion_pipe.is_empty() {
deletion_pipe.query_async::<()>(&mut conn).await?;
}

Ok((pending_transactions, optimistic))
}

/// Get inflight budget (how many new transactions can be sent)
pub async fn get_inflight_budget(
&self,
Expand Down
31 changes: 10 additions & 21 deletions executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -399,42 +399,31 @@ impl<C: Chain> EoaExecutorWorker<C> {

let iteration_start = current_timestamp_ms();

// Get pending transactions
let pending_start = current_timestamp_ms();
let pending_txs = self
// Get pending transactions and optimistic nonce in one operation
let fetch_start = current_timestamp_ms();
let (pending_txs, optimistic_nonce) = self
.store
.peek_pending_transactions(remaining_budget)
.peek_pending_transactions_with_optimistic_nonce(remaining_budget)
.await?;

let batch_size = pending_txs.len().min(remaining_budget as usize);

tracing::info!(
duration_seconds = calculate_duration_seconds(pending_start, current_timestamp_ms()),
duration_seconds = calculate_duration_seconds(fetch_start, current_timestamp_ms()),
iteration = iteration,
pending_count = pending_txs.len(),
optimistic_nonce = optimistic_nonce,
batch_size = batch_size,
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = %self.store.worker_id,
"JOB_LIFECYCLE - process_new_transactions: Got pending transactions"
"JOB_LIFECYCLE - process_new_transactions: Got pending transactions and optimistic nonce"
);

if pending_txs.is_empty() {
break;
}

let nonce_start = current_timestamp_ms();
let optimistic_nonce = self.store.get_optimistic_transaction_count().await?;
let batch_size = pending_txs.len().min(remaining_budget as usize);

tracing::info!(
duration_seconds = calculate_duration_seconds(nonce_start, current_timestamp_ms()),
iteration = iteration,
optimistic_nonce = optimistic_nonce,
batch_size = batch_size,
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = %self.store.worker_id,
"JOB_LIFECYCLE - process_new_transactions: Got optimistic nonce"
);

tracing::debug!(
iteration = iteration,
batch_size = batch_size,
Expand Down