Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions executors/src/eoa/store/hydrate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,11 +80,39 @@ impl EoaExecutorStore {
);
}

let results: Vec<String> = pipe.query_async(&mut self.redis.clone()).await?;
let results: Vec<Option<String>> = pipe.query_async(&mut self.redis.clone()).await?;

let mut hydrated = Vec::with_capacity(dehydrated.len());

let mut deletion_pipe = twmq::redis::pipe();

for (d, r) in dehydrated.into_iter().zip(results.iter()) {
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?));
match r {
Some(r) => {
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?))
}
None => {
// delete this transaction entry from pending, borrowed, submitted
deletion_pipe.zrem(
self.keys.pending_transactions_zset_name(),
d.transaction_id(),
);
deletion_pipe.hdel(
self.keys.borrowed_transactions_hashmap_name(),
d.transaction_id(),
);
tracing::warn!(
"Transaction {} data was missing, deleting transaction from redis",
d.transaction_id()
);
}
}
}
Comment on lines 89 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Critical issues with deletion logic require immediate attention.

The deletion logic has several critical concerns:

  1. Incomplete cleanup: The comment on line 95 mentions deleting from "pending, borrowed, submitted", but the code only deletes from pending_transactions_zset_name() (line 96-99) and borrowed_transactions_hashmap_name() (line 100-103). If there's a submitted set/hash, it should also be cleaned up to prevent orphaned entries.

  2. Missing transaction data hash cleanup: The transaction data hash itself (keyed by transaction_data_key_name) is not deleted. This could leave partial/corrupted transaction data in Redis. Consider whether this hash should also be cleaned up.

  3. Race condition risk: The deletion pipeline executes without any locking mechanism (no WATCH/UNWATCH as seen in other parts of the codebase per the relevant snippets). Between reading the missing data and deleting entries, another process could modify these entries, leading to data races or unintended deletions.

Apply this diff to address the incomplete cleanup (assuming a submitted structure exists):

                 None => {
                     // delete this transaction entry from pending, borrowed, submitted
                     deletion_pipe.zrem(
                         self.keys.pending_transactions_zset_name(),
                         d.transaction_id(),
                     );
                     deletion_pipe.hdel(
                         self.keys.borrowed_transactions_hashmap_name(),
                         d.transaction_id(),
                     );
+                    // Add cleanup for submitted set/hash if it exists
+                    deletion_pipe.zrem(
+                        self.keys.submitted_transactions_zset_name(),  // Adjust key name as appropriate
+                        d.transaction_id(),
+                    );
+                    // Also consider deleting the transaction data hash itself
+                    deletion_pipe.del(
+                        self.keys.transaction_data_key_name(d.transaction_id()),
+                    );
                     tracing::warn!(
                         "Transaction {} data was missing, deleting transaction from redis",
                         d.transaction_id()
                     );
                 }
             }

For the race condition, consider implementing optimistic locking similar to patterns in the relevant code snippets:

// Before reading the data
redis::cmd("WATCH")
    .arg(self.keys.pending_transactions_zset_name())
    .arg(self.keys.borrowed_transactions_hashmap_name())
    .query_async::<()>(&mut self.redis.clone())
    .await?;

// ... existing read logic ...

// Execute deletion_pipe as a transaction
if !deletion_pipe.is_empty() {
    deletion_pipe
        .atomic()  // Make it a MULTI/EXEC transaction
        .query_async::<()>(&mut self.redis.clone())
        .await
        .or_else(|_| {
            // If transaction fails due to WATCH, log and continue
            redis::cmd("UNWATCH")
                .query_async::<()>(&mut self.redis.clone())
                .await
        })?;
}
🤖 Prompt for AI Agents
executors/src/eoa/store/hydrate.rs around lines 89 to 110: the current branch
that handles missing transaction data only removes the pending zset and borrowed
hashmap entries but not the submitted set/hash nor the per-transaction data key,
and it performs deletions without optimistic locking; update the deletion logic
to also remove the submitted entry (use
self.keys.submitted_transactions_set/hash name as appropriate) and the
transaction data hash (e.g.,
self.keys.transaction_data_key_name(d.transaction_id())), collect those commands
into the existing deletion_pipe, then wrap the read+delete sequence with a WATCH
on the involved keys (pending, borrowed, submitted, and the transaction data
key), and execute deletion_pipe as an atomic MULTI/EXEC (using
deletion_pipe.atomic().query_async(...)), handling EXEC failure by UNWATCH and
logging/continuing to avoid race-condition driven accidental deletes.


if !deletion_pipe.is_empty() {
deletion_pipe
.query_async::<()>(&mut self.redis.clone())
.await?;
}

Ok(hydrated)
Expand Down