diff --git a/executors/src/eoa/store/atomic.rs b/executors/src/eoa/store/atomic.rs index 3f94c11..bde595d 100644 --- a/executors/src/eoa/store/atomic.rs +++ b/executors/src/eoa/store/atomic.rs @@ -574,39 +574,54 @@ impl AtomicEoaExecutorStore { error: EoaExecutorWorkerError, webhook_queue: Arc>, ) -> Result<(), TransactionStoreError> { - self.with_lock_check(|pipeline| { - let pending_key = self.pending_transactions_zset_name(); - let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id); - let now = chrono::Utc::now().timestamp_millis().max(0) as u64; - - // Remove from pending state - pipeline.zrem(&pending_key, &pending_transaction.transaction_id); - - // Update transaction data with failure - pipeline.hset(&tx_data_key, "completed_at", now); - pipeline.hset(&tx_data_key, "failure_reason", error.to_string()); - pipeline.hset(&tx_data_key, "status", "failed"); - - let event = EoaExecutorEvent { - transaction_id: pending_transaction.transaction_id.clone(), - address: pending_transaction.user_request.from, - }; - - let fail_envelope = event.transaction_failed_envelope(error.clone(), 1); - - if !pending_transaction.user_request.webhook_options.is_empty() { - let mut tx_context = webhook_queue.transaction_context_from_pipeline(pipeline); - if let Err(e) = queue_webhook_envelopes( - fail_envelope, - pending_transaction.user_request.webhook_options.clone(), - &mut tx_context, - webhook_queue.clone(), - ) { - tracing::error!("Failed to queue webhook for fail: {}", e); - } + let mut pipeline = twmq::redis::pipe(); + pipeline.atomic(); + + let pending_key = self.pending_transactions_zset_name(); + let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id); + let now = chrono::Utc::now().timestamp_millis().max(0) as u64; + + // Remove from pending state + pipeline.zrem(&pending_key, &pending_transaction.transaction_id); + + // Update transaction data with failure + pipeline.hset(&tx_data_key, "completed_at", now); + pipeline.hset(&tx_data_key, "failure_reason", error.to_string()); + pipeline.hset(&tx_data_key, "status", "failed"); + + let event = EoaExecutorEvent { + transaction_id: pending_transaction.transaction_id.clone(), + address: pending_transaction.user_request.from, + }; + + let fail_envelope = event.transaction_failed_envelope(error.clone(), 1); + + if !pending_transaction.user_request.webhook_options.is_empty() { + let mut tx_context = webhook_queue.transaction_context_from_pipeline(&mut pipeline); + if let Err(e) = queue_webhook_envelopes( + fail_envelope, + pending_transaction.user_request.webhook_options.clone(), + &mut tx_context, + webhook_queue.clone(), + ) { + tracing::error!("Failed to queue webhook for fail: {}", e); } - }) - .await + } + + let mut conn = self.redis.clone(); + pipeline + .query_async::>(&mut conn) + .await?; + + tracing::info!( + transaction_id = %pending_transaction.transaction_id, + eoa = ?self.eoa(), + chain_id = self.chain_id(), + error = %error, + "JOB_LIFECYCLE - Deleted failed pending transaction from EOA" + ); + + Ok(()) } pub async fn clean_submitted_transactions(