Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 45 additions & 18 deletions executors/src/eoa/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,24 +577,51 @@ impl EoaExecutorStore {
pipe.hget(&tx_data_key, "user_request");
}

let user_requests: Vec<String> = pipe.query_async(&mut conn).await?;

let user_requests: Vec<EoaTransactionRequest> = user_requests
.into_iter()
.map(|user_request_json| serde_json::from_str(&user_request_json))
.collect::<Result<Vec<EoaTransactionRequest>, serde_json::Error>>()?;

let pending_transactions: Vec<PendingTransaction> = transaction_ids
.into_iter()
.zip(user_requests)
.map(
|((transaction_id, queued_at), user_request)| PendingTransaction {
transaction_id,
queued_at,
user_request,
},
)
.collect();
let user_requests: Vec<Option<String>> = pipe.query_async(&mut conn).await?;

let mut pending_transactions: Vec<PendingTransaction> = Vec::new();
let mut deletion_pipe = twmq::redis::pipe();

for ((transaction_id, queued_at), user_request) in transaction_ids.into_iter().zip(user_requests) {
match user_request {
Some(user_request) => {
let user_request_parsed = serde_json::from_str(&user_request)?;
pending_transactions.push(PendingTransaction {
transaction_id,
queued_at,
user_request: user_request_parsed,
});
}
None => {
tracing::warn!(
"Transaction {} data was missing, deleting transaction from redis",
transaction_id
);
deletion_pipe.zrem(self.keys.pending_transactions_zset_name(), transaction_id);
}
}
}

if !deletion_pipe.is_empty() {
deletion_pipe.query_async::<()>(&mut conn).await?;
}
Comment on lines +605 to +607
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix deletion pipeline result type to avoid runtime failure

query_async::<()> expects a Redis Nil, but a pipeline of ZREM commands returns a bulk of integer counts, so this call will throw a ResponseError as soon as cleanup runs. Capture the results as a vector (or Value) instead.

-            deletion_pipe.query_async::<()>(&mut conn).await?;
+            let _: Vec<i64> = deletion_pipe.query_async(&mut conn).await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if !deletion_pipe.is_empty() {
deletion_pipe.query_async::<()>(&mut conn).await?;
}
if !deletion_pipe.is_empty() {
let _: Vec<i64> = deletion_pipe.query_async(&mut conn).await?;
}
🤖 Prompt for AI Agents
In executors/src/eoa/store/mod.rs around lines 605 to 607, the pipeline call
uses query_async::<()> which expects a Redis Nil but the pipeline of ZREM
commands returns integer counts; change the query type to capture the actual
results (e.g., use query_async::<Vec<redis::Value>>() or
query_async::<Vec<i64>>()) and bind the returned value to a variable (or let _ =
...) instead of expecting unit, so the pipeline response is consumed without
causing a ResponseError.


// let user_requests: Vec<EoaTransactionRequest> = user_requests
// .into_iter()
// .map(|user_request_json| serde_json::from_str(&user_request_json))
// .collect::<Result<Vec<EoaTransactionRequest>, serde_json::Error>>()?;

// let pending_transactions: Vec<PendingTransaction> = transaction_ids
// .into_iter()
// .zip(user_requests)
// .map(
// |((transaction_id, queued_at), user_request)| PendingTransaction {
// transaction_id,
// queued_at,
// user_request,
// },
// )
// .collect();

Ok(pending_transactions)
}
Expand Down