diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 20795b4..e780b23 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -625,6 +625,84 @@ impl AtomicEoaExecutorStore { Ok(()) } + /// Fail multiple transactions that are in the pending state in a single batch operation + /// This is more efficient than calling fail_pending_transaction multiple times + /// when there are many failures at once + pub async fn fail_pending_transactions_batch( + &self, + failures: Vec<(&PendingTransaction, EoaExecutorWorkerError)>, + webhook_queue: Arc>, + ) -> Result<(), TransactionStoreError> { + if failures.is_empty() { + return Ok(()); + } + + let mut pipeline = twmq::redis::pipe(); + pipeline.atomic(); + + let pending_key = self.pending_transactions_zset_name(); + let now = chrono::Utc::now().timestamp_millis().max(0) as u64; + + // Remove all transaction IDs from pending state in a single ZREM operation + let transaction_ids: Vec<&str> = failures + .iter() + .map(|(p, _)| p.transaction_id.as_str()) + .collect(); + pipeline.zrem(&pending_key, &transaction_ids); + + // Update transaction data with failure for each transaction + for (pending_transaction, error) in &failures { + let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id); + + pipeline.hset(&tx_data_key, "completed_at", now); + pipeline.hset(&tx_data_key, "failure_reason", error.to_string()); + pipeline.hset(&tx_data_key, "status", "failed"); + } + + // Queue webhooks for all failures + let mut tx_context = webhook_queue.transaction_context_from_pipeline(&mut pipeline); + for (pending_transaction, error) in &failures { + let event = EoaExecutorEvent { + transaction_id: pending_transaction.transaction_id.clone(), + address: pending_transaction.user_request.from, + }; + + let fail_envelope = event.transaction_failed_envelope(error.clone(), 1); + + if !pending_transaction.user_request.webhook_options.is_empty() { + if let Err(e) = queue_webhook_envelopes( + fail_envelope, + pending_transaction.user_request.webhook_options.clone(), + &mut tx_context, + webhook_queue.clone(), + ) { + tracing::error!( + transaction_id = %pending_transaction.transaction_id, + error = ?e, + "Failed to queue webhook for batch fail" + ); + } + } + } + + // Execute the pipeline once + let mut conn = self.redis.clone(); + pipeline + .query_async::>(&mut conn) + .await?; + + tracing::info!( + count = failures.len(), + eoa = ?self.eoa(), + chain_id = self.chain_id(), + worker_id = %self.worker_id(), + "JOB_LIFECYCLE - Batch deleted {} failed pending transactions from EOA", + failures.len() + ); + + Ok(()) + } + pub async fn clean_submitted_transactions( &self, confirmed_transactions: &[ConfirmedTransaction], diff --git a/executors/src/eoa/worker/send.rs b/executors/src/eoa/worker/send.rs index d90eb52..50a855a 100644 --- a/executors/src/eoa/worker/send.rs +++ b/executors/src/eoa/worker/send.rs @@ -95,12 +95,25 @@ impl EoaExecutorWorker { ); // 3. Only proceed to new nonces if we successfully used all recycled nonces + let clean_start = current_timestamp_ms(); let remaining_recycled = self.store.clean_and_get_recycled_nonces().await?.len(); + + tracing::info!( + duration_seconds = calculate_duration_seconds(clean_start, current_timestamp_ms()), + remaining_recycled = remaining_recycled, + eoa = ?self.eoa, + chain_id = self.chain_id, + worker_id = %self.store.worker_id, + "JOB_LIFECYCLE - send_flow: Cleaned and got recycled nonces" + ); + if remaining_recycled == 0 { + let budget_start = current_timestamp_ms(); let inflight_budget = self.store.get_inflight_budget(self.max_inflight).await?; tracing::info!( - duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), + duration_seconds = calculate_duration_seconds(budget_start, current_timestamp_ms()), + total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()), inflight_budget = inflight_budget, eoa = ?self.eoa, chain_id = self.chain_id, @@ -304,6 +317,7 @@ impl EoaExecutorWorker { let mut cleaned_results = Vec::new(); let mut balance_threshold_update_needed = false; let mut failure_occurred = false; + let mut non_retryable_failures = Vec::new(); for (pending, result) in results.into_iter() { match (failure_occurred, result) { @@ -330,35 +344,35 @@ impl EoaExecutorWorker { balance_threshold_update_needed = true; } - // For deterministic build failures, fail the transaction immediately + // For deterministic build failures, collect for batch processing if !is_retryable_preparation_error(&e) { tracing::error!( error = ?e, transaction_id = pending.transaction_id, "Transaction permanently failed due to non-retryable preparation error", ); - if let Err(e) = self - .store - .fail_pending_transaction( - pending, - e.clone(), - self.webhook_queue.clone(), - ) - .await - { - tracing::error!( - error = ?e, - transaction_id = pending.transaction_id, - "Failed to mark transaction as failed - transaction may be stuck in pending state" - ); - // Don't propagate the error, continue processing - } + non_retryable_failures.push((pending, e.clone())); } } (true, Ok(_)) => continue, } } + // Batch fail all non-retryable failures in a single Redis pipeline + if !non_retryable_failures.is_empty() { + if let Err(e) = self + .store + .fail_pending_transactions_batch(non_retryable_failures, self.webhook_queue.clone()) + .await + { + tracing::error!( + error = ?e, + "Failed to batch mark transactions as failed - some transactions may be stuck in pending state" + ); + // Don't propagate the error, continue processing + } + } + if balance_threshold_update_needed && let Err(e) = self.update_balance_threshold().await { tracing::error!(error = ?e, "Failed to update balance threshold"); }