-
Notifications
You must be signed in to change notification settings - Fork 7
Enhance transaction hydration logic to handle missing data by deletin… #64
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…g entries from Redis. Update results type to accommodate optional values and implement deletion pipeline for missing transactions.
Walkthroughhydrate_all now retrieves Redis user_request values as Option entries, queues deletions for missing data, logs warnings, and executes a cleanup Redis pipeline after iterating. Successful entries hydrate as before; missing ones trigger removal from the pending set and borrowed hashmap. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant H as Hydrator::hydrate_all
participant R as Redis
participant P as DeletionPipe
Note over H: Fetch pending items and related user_request keys
H->>R: MGET user_request[...]
R-->>H: Vec<Option<value>>
loop For each item
alt user_request is Some(value)
H->>H: Hydrate item using value
else user_request is None
H->>P: Queue DEL borrowed_hashmap[item], SREM pending_set item
H->>H: Log warning for missing Redis data
end
end
alt DeletionPipe has ops
H->>R: EXEC pipeline (queued deletions)
R-->>H: Pipeline result (async)
else No queued deletions
Note over H: No cleanup required
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
executors/src/eoa/store/hydrate.rs (1)
135-186
: Align error handling between hydrate_all and hydrate_all_submitted
hydrate_all
usesVec<Option<String>>
, deletes missing entries and logs a warning, whereashydrate_all_submitted
usesVec<String>
and fails withTransactionNotFound
on missing data. Either switchhydrate_all_submitted
to the same graceful‐degradation pattern or document why a missing submitted transaction must surface as an error.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
Disabled knowledge base sources:
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
executors/src/eoa/store/hydrate.rs
(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
executors/src/eoa/store/hydrate.rs (2)
twmq/src/multilane.rs (4)
redis
(1271-1273)redis
(1278-1278)redis
(1378-1380)redis
(1385-1385)twmq/src/lib.rs (6)
redis
(1221-1223)redis
(1229-1229)redis
(1321-1323)redis
(1329-1329)redis
(1359-1361)redis
(1367-1369)
🔇 Additional comments (2)
executors/src/eoa/store/hydrate.rs (2)
112-116
: Good optimization: conditional pipeline execution.The check for
!deletion_pipe.is_empty()
before executing the pipeline is a good optimization that avoids unnecessary Redis round-trips when there are no deletions.
83-83
: No further action required: call site handles variable-length return correctlyrecover_borrowed_state operates solely on the returned Vec—checks for empty, sorts, and iterates—without assuming a 1:1 correspondence with the input.
for (d, r) in dehydrated.into_iter().zip(results.iter()) { | ||
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?)); | ||
match r { | ||
Some(r) => { | ||
hydrated.push(d.hydrate(serde_json::from_str::<EoaTransactionRequest>(r)?)) | ||
} | ||
None => { | ||
// delete this transaction entry from pending, borrowed, submitted | ||
deletion_pipe.zrem( | ||
self.keys.pending_transactions_zset_name(), | ||
d.transaction_id(), | ||
); | ||
deletion_pipe.hdel( | ||
self.keys.borrowed_transactions_hashmap_name(), | ||
d.transaction_id(), | ||
); | ||
tracing::warn!( | ||
"Transaction {} data was missing, deleting transaction from redis", | ||
d.transaction_id() | ||
); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Critical issues with deletion logic require immediate attention.
The deletion logic has several critical concerns:
-
Incomplete cleanup: The comment on line 95 mentions deleting from "pending, borrowed, submitted", but the code only deletes from
pending_transactions_zset_name()
(line 96-99) andborrowed_transactions_hashmap_name()
(line 100-103). If there's a submitted set/hash, it should also be cleaned up to prevent orphaned entries. -
Missing transaction data hash cleanup: The transaction data hash itself (keyed by
transaction_data_key_name
) is not deleted. This could leave partial/corrupted transaction data in Redis. Consider whether this hash should also be cleaned up. -
Race condition risk: The deletion pipeline executes without any locking mechanism (no
WATCH
/UNWATCH
as seen in other parts of the codebase per the relevant snippets). Between reading the missing data and deleting entries, another process could modify these entries, leading to data races or unintended deletions.
Apply this diff to address the incomplete cleanup (assuming a submitted structure exists):
None => {
// delete this transaction entry from pending, borrowed, submitted
deletion_pipe.zrem(
self.keys.pending_transactions_zset_name(),
d.transaction_id(),
);
deletion_pipe.hdel(
self.keys.borrowed_transactions_hashmap_name(),
d.transaction_id(),
);
+ // Add cleanup for submitted set/hash if it exists
+ deletion_pipe.zrem(
+ self.keys.submitted_transactions_zset_name(), // Adjust key name as appropriate
+ d.transaction_id(),
+ );
+ // Also consider deleting the transaction data hash itself
+ deletion_pipe.del(
+ self.keys.transaction_data_key_name(d.transaction_id()),
+ );
tracing::warn!(
"Transaction {} data was missing, deleting transaction from redis",
d.transaction_id()
);
}
}
For the race condition, consider implementing optimistic locking similar to patterns in the relevant code snippets:
// Before reading the data
redis::cmd("WATCH")
.arg(self.keys.pending_transactions_zset_name())
.arg(self.keys.borrowed_transactions_hashmap_name())
.query_async::<()>(&mut self.redis.clone())
.await?;
// ... existing read logic ...
// Execute deletion_pipe as a transaction
if !deletion_pipe.is_empty() {
deletion_pipe
.atomic() // Make it a MULTI/EXEC transaction
.query_async::<()>(&mut self.redis.clone())
.await
.or_else(|_| {
// If transaction fails due to WATCH, log and continue
redis::cmd("UNWATCH")
.query_async::<()>(&mut self.redis.clone())
.await
})?;
}
🤖 Prompt for AI Agents
executors/src/eoa/store/hydrate.rs around lines 89 to 110: the current branch
that handles missing transaction data only removes the pending zset and borrowed
hashmap entries but not the submitted set/hash nor the per-transaction data key,
and it performs deletions without optimistic locking; update the deletion logic
to also remove the submitted entry (use
self.keys.submitted_transactions_set/hash name as appropriate) and the
transaction data hash (e.g.,
self.keys.transaction_data_key_name(d.transaction_id())), collect those commands
into the existing deletion_pipe, then wrap the read+delete sequence with a WATCH
on the involved keys (pending, borrowed, submitted, and the transaction data
key), and execute deletion_pipe as an atomic MULTI/EXEC (using
deletion_pipe.atomic().query_async(...)), handling EXEC failure by UNWATCH and
logging/continuing to avoid race-condition driven accidental deletes.
…g entries from Redis. Update results type to accommodate optional values and implement deletion pipeline for missing transactions.
Summary by CodeRabbit