From 130eb9c7a9b7f20082d9b589709c04f7a44d5bc1 Mon Sep 17 00:00:00 2001 From: Prithvish Baidya Date: Sat, 4 Oct 2025 15:42:31 +0530 Subject: [PATCH] cleanup bad transactions from pending in peek --- executors/src/eoa/store/mod.rs | 63 ++++++++++++++++++++++++---------- 1 file changed, 45 insertions(+), 18 deletions(-) diff --git a/executors/src/eoa/store/mod.rs b/executors/src/eoa/store/mod.rs index 9e4c84b..ef9db42 100644 --- a/executors/src/eoa/store/mod.rs +++ b/executors/src/eoa/store/mod.rs @@ -577,24 +577,51 @@ impl EoaExecutorStore { pipe.hget(&tx_data_key, "user_request"); } - let user_requests: Vec = pipe.query_async(&mut conn).await?; - - let user_requests: Vec = user_requests - .into_iter() - .map(|user_request_json| serde_json::from_str(&user_request_json)) - .collect::, serde_json::Error>>()?; - - let pending_transactions: Vec = transaction_ids - .into_iter() - .zip(user_requests) - .map( - |((transaction_id, queued_at), user_request)| PendingTransaction { - transaction_id, - queued_at, - user_request, - }, - ) - .collect(); + let user_requests: Vec> = pipe.query_async(&mut conn).await?; + + let mut pending_transactions: Vec = Vec::new(); + let mut deletion_pipe = twmq::redis::pipe(); + + for ((transaction_id, queued_at), user_request) in transaction_ids.into_iter().zip(user_requests) { + match user_request { + Some(user_request) => { + let user_request_parsed = serde_json::from_str(&user_request)?; + pending_transactions.push(PendingTransaction { + transaction_id, + queued_at, + user_request: user_request_parsed, + }); + } + None => { + tracing::warn!( + "Transaction {} data was missing, deleting transaction from redis", + transaction_id + ); + deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id); + } + } + } + + if !deletion_pipe.is_empty() { + deletion_pipe.query_async::<()>(&mut conn).await?; + } + + // let user_requests: Vec = user_requests + // .into_iter() + // .map(|user_request_json| serde_json::from_str(&user_request_json)) + // .collect::, serde_json::Error>>()?; + + // let pending_transactions: Vec = transaction_ids + // .into_iter() + // .zip(user_requests) + // .map( + // |((transaction_id, queued_at), user_request)| PendingTransaction { + // transaction_id, + // queued_at, + // user_request, + // }, + // ) + // .collect(); Ok(pending_transactions) }