Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions executors/src/eoa/store/atomic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,84 @@ impl AtomicEoaExecutorStore {
Ok(())
}

/// Fail multiple transactions that are in the pending state in a single batch operation
/// This is more efficient than calling fail_pending_transaction multiple times
/// when there are many failures at once
pub async fn fail_pending_transactions_batch(
&self,
failures: Vec<(&PendingTransaction, EoaExecutorWorkerError)>,
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
) -> Result<(), TransactionStoreError> {
if failures.is_empty() {
return Ok(());
}

let mut pipeline = twmq::redis::pipe();
pipeline.atomic();

let pending_key = self.pending_transactions_zset_name();
let now = chrono::Utc::now().timestamp_millis().max(0) as u64;

// Remove all transaction IDs from pending state in a single ZREM operation
let transaction_ids: Vec<&str> = failures
.iter()
.map(|(p, _)| p.transaction_id.as_str())
.collect();
pipeline.zrem(&pending_key, &transaction_ids);

// Update transaction data with failure for each transaction
for (pending_transaction, error) in &failures {
let tx_data_key = self.transaction_data_key_name(&pending_transaction.transaction_id);

pipeline.hset(&tx_data_key, "completed_at", now);
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
pipeline.hset(&tx_data_key, "status", "failed");
}

// Queue webhooks for all failures
let mut tx_context = webhook_queue.transaction_context_from_pipeline(&mut pipeline);
for (pending_transaction, error) in &failures {
let event = EoaExecutorEvent {
transaction_id: pending_transaction.transaction_id.clone(),
address: pending_transaction.user_request.from,
};

let fail_envelope = event.transaction_failed_envelope(error.clone(), 1);

if !pending_transaction.user_request.webhook_options.is_empty() {
if let Err(e) = queue_webhook_envelopes(
fail_envelope,
pending_transaction.user_request.webhook_options.clone(),
&mut tx_context,
webhook_queue.clone(),
) {
tracing::error!(
transaction_id = %pending_transaction.transaction_id,
error = ?e,
"Failed to queue webhook for batch fail"
);
}
}
}

// Execute the pipeline once
let mut conn = self.redis.clone();
pipeline
.query_async::<Vec<twmq::redis::Value>>(&mut conn)
.await?;

tracing::info!(
count = failures.len(),
eoa = ?self.eoa(),
chain_id = self.chain_id(),
worker_id = %self.worker_id(),
"JOB_LIFECYCLE - Batch deleted {} failed pending transactions from EOA",
failures.len()
);

Ok(())
}

pub async fn clean_submitted_transactions(
&self,
confirmed_transactions: &[ConfirmedTransaction],
Expand Down
50 changes: 32 additions & 18 deletions executors/src/eoa/worker/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,25 @@ impl<C: Chain> EoaExecutorWorker<C> {
);

// 3. Only proceed to new nonces if we successfully used all recycled nonces
let clean_start = current_timestamp_ms();
let remaining_recycled = self.store.clean_and_get_recycled_nonces().await?.len();

tracing::info!(
duration_seconds = calculate_duration_seconds(clean_start, current_timestamp_ms()),
remaining_recycled = remaining_recycled,
eoa = ?self.eoa,
chain_id = self.chain_id,
worker_id = %self.store.worker_id,
"JOB_LIFECYCLE - send_flow: Cleaned and got recycled nonces"
);

if remaining_recycled == 0 {
let budget_start = current_timestamp_ms();
let inflight_budget = self.store.get_inflight_budget(self.max_inflight).await?;

tracing::info!(
duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()),
duration_seconds = calculate_duration_seconds(budget_start, current_timestamp_ms()),
total_duration_seconds = calculate_duration_seconds(start_time, current_timestamp_ms()),
inflight_budget = inflight_budget,
eoa = ?self.eoa,
chain_id = self.chain_id,
Expand Down Expand Up @@ -304,6 +317,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
let mut cleaned_results = Vec::new();
let mut balance_threshold_update_needed = false;
let mut failure_occurred = false;
let mut non_retryable_failures = Vec::new();

for (pending, result) in results.into_iter() {
match (failure_occurred, result) {
Expand All @@ -330,35 +344,35 @@ impl<C: Chain> EoaExecutorWorker<C> {
balance_threshold_update_needed = true;
}

// For deterministic build failures, fail the transaction immediately
// For deterministic build failures, collect for batch processing
if !is_retryable_preparation_error(&e) {
tracing::error!(
error = ?e,
transaction_id = pending.transaction_id,
"Transaction permanently failed due to non-retryable preparation error",
);
if let Err(e) = self
.store
.fail_pending_transaction(
pending,
e.clone(),
self.webhook_queue.clone(),
)
.await
{
tracing::error!(
error = ?e,
transaction_id = pending.transaction_id,
"Failed to mark transaction as failed - transaction may be stuck in pending state"
);
// Don't propagate the error, continue processing
}
non_retryable_failures.push((pending, e.clone()));
}
}
(true, Ok(_)) => continue,
}
}

// Batch fail all non-retryable failures in a single Redis pipeline
if !non_retryable_failures.is_empty() {
if let Err(e) = self
.store
.fail_pending_transactions_batch(non_retryable_failures, self.webhook_queue.clone())
.await
{
tracing::error!(
error = ?e,
"Failed to batch mark transactions as failed - some transactions may be stuck in pending state"
);
// Don't propagate the error, continue processing
}
}

if balance_threshold_update_needed && let Err(e) = self.update_balance_threshold().await {
tracing::error!(error = ?e, "Failed to update balance threshold");
}
Expand Down