Harden event record compatibility#14
Conversation
Implements [[tasks/harden-event-record-compatibility]]
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughRefactors the codebase so encoding, upcasting, procedural macros, entity digest/snapshot APIs, outbox construction/claims, worker processing, and repository hydration return and propagate explicit SourcedResult/EventRecordError; adds Postgres event-store contract docs and updates tests to handle fallible APIs. ChangesFallible Error Handling Architecture
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~60 minutes Poem
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 18
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
tests/bomberman/commands.rs (1)
202-211:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winRemove panic in outbox payload encoding path.
Line 210 still uses
unwrap(), which can panic inside a fallible command flow that otherwise uses?.Proposed fix
let outbox = OutboxMessage::create( format!("outbox:killed:{}", killed_id), "PlayerKilled", serde_json::to_vec(&serde_json::json!({ "player_id": killed_id, "killed_by_bomb": det.map(|d| d.bomb_id.as_str()).unwrap_or("unknown"), "bomb_owner": det.map(|d| d.owner.as_str()).unwrap_or("unknown"), })) - .unwrap(), + .map_err(|e| GameError::Repository(e.into()))?, )?;🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/bomberman/commands.rs` around lines 202 - 211, The outbox payload encoding currently uses serde_json::to_vec(...).unwrap() which can panic; change this to propagate the serde_json::Error instead of unwrapping: call serde_json::to_vec(...)?, and ensure the surrounding function (the command handler that calls OutboxMessage::create) returns a compatible Result so the ? can propagate. Update any return type or error conversion (e.g., impl From<serde_json::Error> for the command error or use map_err) so OutboxMessage::create(...)? compiles without panics.
🧹 Nitpick comments (1)
docs/postgres-event-store.md (1)
26-27: ⚡ Quick winRemove the duplicate stream index recommendation.
Line 27 duplicates the
PRIMARY KEYindex from Line 26 in Postgres, so keeping both would add unnecessary write/storage overhead. Document only the PK (or clarify a different index shape if intended).🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/postgres-event-store.md` around lines 26 - 27, The docs currently recommend both a PRIMARY KEY (aggregate_type, aggregate_id, sequence) and a separate index (aggregate_type, aggregate_id, sequence), which is redundant; remove the duplicate index recommendation or replace it with a clarified different index shape (e.g., for reverse ordering or partial/covering index) — update the lines referencing PRIMARY KEY (aggregate_type, aggregate_id, sequence) and the duplicate Index (aggregate_type, aggregate_id, sequence) so only the PK is documented (or explicitly describe the distinct index purpose and columns if a different index was intended).
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@sourced_rust_macros/src/lib.rs`:
- Around line 81-98: The current block_returns_result check is insufficient;
change wrap_result_body_with_guard to accept a boolean flag indicating whether
ensure_sourced_result_signature synthesized a Result signature (i.e., the
method's return type was converted from ReturnType::Default to SourcedResult),
stop relying on block_returns_result to detect tail Ok/Err calls, and use that
synthesized-signature flag to decide whether to wrap or append Ok(())/Err
handling; update callers in the codepaths for #[digest], #[event], and
#[enqueue] so they pass the synthesized-signature boolean from
ensure_sourced_result_signature to wrap_result_body_with_guard and generate code
based on that flag instead of inspecting the last statement.
In `@src/entity/entity.rs`:
- Around line 259-266: set_snapshot replaces the entire events vector with a
single snapshot record but does not update committed_version, breaking the
invariant committed_version <= events.len() and causing new_events() to panic;
after clearing and pushing the snapshot (in set_snapshot) set committed_version
to the current events length (e.g. self.committed_version = self.events.len();
or the appropriately typed equivalent) so committed_version is re-established
relative to self.events, ensuring new_events() will not slice past the end.
In `@src/entity/upcaster.rs`:
- Around line 78-93: Replace the current guard that only checks equality with a
two-branch check rejecting any non-forward jump: if u.to_version <=
event.event_version then return an error; if equal return
UpcastError::SameVersionTransition { event_type: event.event_name, version:
event.event_version } else return a new UpcastError::BackwardTransition {
event_type: event.event_name, from: event.event_version, to: u.to_version }.
Keep the subsequent assignment of next_version/event.payload/seen_versions logic
unchanged so only strictly increasing transitions (u.to_version >
event.event_version) proceed.
In `@src/outbox_worker/worker.rs`:
- Around line 147-154: process_batch is incrementing DrainResult.claimed
whenever process_message returns did_work, which counts messages that were
merely observed in-flight rather than actually transitioned to InFlight; modify
the flow so ProcessOneResult (or the return value of process_message) includes a
separate boolean like claimed_in_call (true only when this call performed a
Pending→InFlight claim), update process_message to set that flag when it
performs the claim on an OutboxMessage, and in process_batch only increment
DrainResult.claimed when that new claimed_in_call flag is true (leave did_work
for work-detection/routing logic). Ensure types and calls using
ProcessOneResult/process_message are updated accordingly.
In `@src/outbox/message.rs`:
- Around line 250-252: In claim_for, avoid panicking on SystemTime overflow by
replacing now + lease with SystemTime::checked_add(lease) on the computed now
(in function claim_for) and handle the None case by returning an appropriate
SourcedResult error instead of panicking; update the function to call
SystemTime::now(), use checked_add to compute until, and return a descriptive
SourcedResult (e.g., an error variant indicating timestamp overflow) when
checked_add returns None so the worker fails gracefully.
In `@src/read_model/in_memory.rs`:
- Around line 231-237: In find_one_model, stop performing an early return on the
first predicate match; instead attempt to deserialize every stored entry (use
the same serde_json::from_slice::<M>(&stored.bytes) logic for each stored) to
surface any deserialization errors deterministically. While iterating, keep the
first successful predicate match in a temporary variable (e.g., matched
Versioned { data, version }) but continue validating remaining entries; if any
deserialization fails return ReadModelError::Serde with that error, otherwise
after the loop return the stored matched result (or Ok(None) if no match).
Ensure you reference stored.bytes, predicate, Versioned and the same error
mapping to preserve existing behavior.
In `@tests/bomberman/domain/game_map.rs`:
- Around line 44-55: The collect_power_up function currently calls
self.entity.digest("PowerUpCollected", &(x, y)) unconditionally; change it to
only call self.entity.digest when a power-up is actually found and removed: move
the digest call into the Some branch (the branch where you call
self.power_ups.remove(idx) and return Ok(Some(power_up))) so that
PowerUpCollected is recorded only when power_ups.remove(idx) succeeds; keep the
function signature (collect_power_up -> SourcedResult<Option<PowerUp>>) and
behavior for the None case unchanged (return Ok(None) without calling
self.entity.digest).
In `@tests/microsvc/handlers/counter_create.rs`:
- Line 38: The test currently calls counter.create(input.id.clone()).unwrap(),
which can panic; change it to propagate errors via the function's Result by
replacing .unwrap() with either the ? operator or mapping to HandlerError (e.g.,
counter.create(input.id.clone()).map_err(HandlerError::from)?), so that failures
return Err(HandlerError) instead of panicking; locate the call to counter.create
and adjust the signature/returns accordingly to propagate the error through the
existing Result-returning handler.
In `@tests/microsvc/handlers/counter_increment.rs`:
- Line 31: The call to counter.increment(input.amount).unwrap() should not
panic; change the handler to propagate the error into the handler's Result
return type instead of unwrapping. Replace unwrap() with proper error
propagation using the ? operator (or map_err to convert the domain error into
your HandlerError) on the increment call in the handler function so the function
returns Err(HandlerError::...) on failure; ensure the handler's signature
returns Result<..., HandlerError> and use From or an explicit conversion to map
the counter.increment error into HandlerError.
In `@tests/sagas/handlers/inventory/init.rs`:
- Line 13: The test handler currently calls inv.initialize(input.sku.clone(),
input.stock).unwrap(), which will panic on failure; instead propagate the
initialization error through the handler's Result return (e.g., replace the
unwrap with the try operator `?` or map the error into the handler's error type)
so initialization failures are returned rather than panicking—update the call
site of inv.initialize in the handler in tests/sagas/handlers/inventory/init.rs
and convert any necessary return/error types to match the propagated error.
In `@tests/sagas/handlers/inventory/reserve.rs`:
- Line 20: The test currently calls inv.reserve(...).unwrap(), which panics on
failure; replace the unwrap with proper error propagation so failures return a
HandlerError instead of aborting the test. Change the invocation of inv.reserve
in tests/sagas/handlers/inventory/reserve.rs to propagate the error (use the ?
operator if the surrounding function returns Result<_, HandlerError>, or use
.map_err(|e| HandlerError::from(e))/.map_err(|e| HandlerError::new(...)) to
convert the reserve error into a HandlerError) so the handler surface (the test
function) returns Err(HandlerError) on reserve failure rather than panicking.
In `@tests/sagas/handlers/orders/create.rs`:
- Around line 13-15: The call to order.create(...) currently uses .unwrap()
which will panic; replace it so the error is propagated through the handler's
Result by mapping the EventRecordError into the handler's error type and
returning it (e.g., use .map_err(|e| HandlerError::from(e))? or
.map_err(HandlerError::from) and use the ? operator) so that order.create(...)
returns a propagated Result instead of panicking; update the handler in
tests/sagas/handlers/orders/create.rs accordingly.
In `@tests/sagas/handlers/payments/process.rs`:
- Around line 14-24: The three unwrap() calls on payment.initiate,
payment.authorize, and payment.capture must be changed to propagate errors by
mapping the EventRecordError into the HandlerError chain; replace each .unwrap()
with ?-style propagation using .map_err(RepositoryError::from)? so the
SourcedResult<()> returned by Payment::initiate, Payment::authorize, and
Payment::capture is converted from EventRecordError into the HandlerError path
via RepositoryError::from before returning.
In `@tests/sagas/handlers/saga/on_inventory_reserved.rs`:
- Line 16: The call to saga.inventory_reserved() is being unwrapped and will
panic on error; since the #[digest] macro makes inventory_reserved() return
Result<(), EventRecordError>, change the code to propagate the error instead of
unwrap—replace the `.unwrap()` usage with the `?` operator (or otherwise return
Err) so the handler function returns the EventRecordError via its Result,
keeping error handling consistent with the rest of the function and other
handlers.
In `@tests/sagas/handlers/saga/on_payment_succeeded.rs`:
- Line 16: The test currently calls saga.payment_succeeded().unwrap() which
panics because payment_succeeded() returns Result<(), EventRecordError>; fix by
converting the EventRecordError into the test's HandlerError type instead of
unwrapping: either implement impl From<EventRecordError> for HandlerError so
saga.payment_succeeded()? compiles, or change the call in
on_payment_succeeded.rs to propagate/convert the error (e.g.,
saga.payment_succeeded().map_err(|e| HandlerError::from(e))? or use .map_err(|e|
HandlerError::Custom(e.to_string()))?) so the test no longer uses unwrap();
reference saga.payment_succeeded(), HandlerError, and EventRecordError when
applying the change.
In `@tests/sagas/handlers/saga/start.rs`:
- Around line 19-20: The handler currently calls .unwrap() on a fallible
operation; replace the unwrap with propagation into the handler's Result by
using ? and mapping the error into the crate's HandlerError type (e.g., replace
`.something().unwrap()` with `.something().map_err(|e| HandlerError::from(e))?`
or the equivalent `?` conversion used in counter_create.rs/counter_increment.rs)
so the saga state transition failure is returned as HandlerError instead of
panicking.
---
Outside diff comments:
In `@tests/bomberman/commands.rs`:
- Around line 202-211: The outbox payload encoding currently uses
serde_json::to_vec(...).unwrap() which can panic; change this to propagate the
serde_json::Error instead of unwrapping: call serde_json::to_vec(...)?, and
ensure the surrounding function (the command handler that calls
OutboxMessage::create) returns a compatible Result so the ? can propagate.
Update any return type or error conversion (e.g., impl From<serde_json::Error>
for the command error or use map_err) so OutboxMessage::create(...)? compiles
without panics.
---
Nitpick comments:
In `@docs/postgres-event-store.md`:
- Around line 26-27: The docs currently recommend both a PRIMARY KEY
(aggregate_type, aggregate_id, sequence) and a separate index (aggregate_type,
aggregate_id, sequence), which is redundant; remove the duplicate index
recommendation or replace it with a clarified different index shape (e.g., for
reverse ordering or partial/covering index) — update the lines referencing
PRIMARY KEY (aggregate_type, aggregate_id, sequence) and the duplicate Index
(aggregate_type, aggregate_id, sequence) so only the PK is documented (or
explicitly describe the distinct index purpose and columns if a different index
was intended).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 8869bad0-5ef7-418d-94cf-bceaa5522cfc
📒 Files selected for processing (48)
docs/postgres-event-store.mddocs/research-and-roadmap.mdsourced_rust_macros/src/lib.rssrc/aggregate/aggregate.rssrc/commit_builder/mod.rssrc/entity/entity.rssrc/entity/event_record.rssrc/entity/mod.rssrc/entity/upcaster.rssrc/hashmap_repo/repository.rssrc/lib.rssrc/outbox/commit.rssrc/outbox/message.rssrc/outbox_worker/repository_ext.rssrc/outbox_worker/worker.rssrc/read_model/in_memory.rssrc/repository/error.rssrc/snapshot/repository.rstests/blob_game/main.rstests/bomberman/commands.rstests/bomberman/domain/game_map.rstests/bomberman/error.rstests/enqueue/main.rstests/event_store/main.rstests/microsvc/basic.rstests/microsvc/handlers/counter_create.rstests/microsvc/handlers/counter_increment.rstests/read_models/main.rstests/sagas/distributed.rstests/sagas/handlers/inventory/init.rstests/sagas/handlers/inventory/reserve.rstests/sagas/handlers/messages.rstests/sagas/handlers/orders/complete.rstests/sagas/handlers/orders/create.rstests/sagas/handlers/payments/process.rstests/sagas/handlers/saga/on_inventory_reserved.rstests/sagas/handlers/saga/on_order_completed.rstests/sagas/handlers/saga/on_payment_succeeded.rstests/sagas/handlers/saga/start.rstests/sagas/microsvc_saga.rstests/sagas/orchestration.rstests/snapshots/main.rstests/sourced/main.rstests/sourced_enqueue/main.rstests/sourced_snapshot/main.rstests/sourced_upcasting/main.rstests/todos/main.rstests/upcasting/main.rs
There was a problem hiding this comment.
♻️ Duplicate comments (1)
src/entity/upcaster.rs (1)
69-84:⚠️ Potential issue | 🟠 Major | ⚡ Quick winReject backward version transitions in upcaster chains.
Line 69 only blocks same-version transitions; backward jumps (for example
3 -> 2) are still accepted and silently downgrade events.Suggested patch
pub enum UpcastError { SameVersionTransition { event_type: String, version: u64 }, + BackwardTransition { event_type: String, from: u64, to: u64 }, CycleDetected { event_type: String, version: u64 }, } impl fmt::Display for UpcastError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { UpcastError::SameVersionTransition { event_type, version, } => write!( f, "upcaster for event {event_type} does not advance version {version}" ), + UpcastError::BackwardTransition { + event_type, + from, + to, + } => write!( + f, + "upcaster for event {event_type} regresses version from {from} to {to}" + ), UpcastError::CycleDetected { event_type, version, } => write!( f, "upcaster chain for event {event_type} cycles back to version {version}" ), } } } @@ for u in upcasters { if u.event_type == event.event_name && u.from_version == event.event_version { - if u.to_version == event.event_version { - return Err(UpcastError::SameVersionTransition { - event_type: event.event_name, - version: event.event_version, - }); + if u.to_version <= event.event_version { + return if u.to_version == event.event_version { + Err(UpcastError::SameVersionTransition { + event_type: event.event_name.clone(), + version: event.event_version, + }) + } else { + Err(UpcastError::BackwardTransition { + event_type: event.event_name.clone(), + from: event.event_version, + to: u.to_version, + }) + }; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/entity/upcaster.rs` around lines 69 - 84, The upcaster currently only rejects same-version transitions but permits backward jumps (e.g., 3 -> 2); add an explicit check before applying the transform to reject any u.to_version < event.event_version and return a clear error (e.g., add UpcastError::BackwardVersionTransition { event_type, from_version: event.event_version, to_version: u.to_version }). Update the upcast loop in upcaster.rs (the block that checks u.to_version == event.event_version, sets next_version, applies u.transform, and uses seen_versions) to perform this descending-version check first and return the new error instead of silently downgrading; add the new UpcastError variant and adjust tests/uses accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Duplicate comments:
In `@src/entity/upcaster.rs`:
- Around line 69-84: The upcaster currently only rejects same-version
transitions but permits backward jumps (e.g., 3 -> 2); add an explicit check
before applying the transform to reject any u.to_version < event.event_version
and return a clear error (e.g., add UpcastError::BackwardVersionTransition {
event_type, from_version: event.event_version, to_version: u.to_version }).
Update the upcast loop in upcaster.rs (the block that checks u.to_version ==
event.event_version, sets next_version, applies u.transform, and uses
seen_versions) to perform this descending-version check first and return the new
error instead of silently downgrading; add the new UpcastError variant and
adjust tests/uses accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 8d9572a8-9902-4300-8973-dc4666f08ff2
📒 Files selected for processing (21)
src/aggregate/aggregate.rssrc/entity/mod.rssrc/entity/upcaster.rssrc/lib.rssrc/microsvc/error.rssrc/snapshot/repository.rstests/microsvc/basic.rstests/microsvc/handlers/counter_create.rstests/microsvc/handlers/counter_increment.rstests/sagas/handlers/inventory/init.rstests/sagas/handlers/inventory/reserve.rstests/sagas/handlers/messages.rstests/sagas/handlers/orders/complete.rstests/sagas/handlers/orders/create.rstests/sagas/handlers/payments/process.rstests/sagas/handlers/saga/on_inventory_reserved.rstests/sagas/handlers/saga/on_order_completed.rstests/sagas/handlers/saga/on_order_created.rstests/sagas/handlers/saga/on_payment_succeeded.rstests/sagas/handlers/saga/start.rstests/upcasting/main.rs
Implements [[tasks/review-macro-tail-result-wrapping]]
Implements [[tasks/review-set-snapshot-committed-version]]
Implements [[tasks/review-event-record-codec-defaults]]
Implements [[tasks/review-upcaster-backward-transition]]
Implements [[tasks/review-outbox-claimed-count]]
Implements [[tasks/review-in-memory-find-one-validation]]
Implements [[tasks/review-power-up-digest-condition]]
Implements [[tasks/review-bomberman-outbox-serialization]]
Implements [[tasks/review-postgres-duplicate-index-doc]]
|
@coderabbitai resume |
✅ Actions performedReviews resumed. |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@tests/bomberman/domain/game_map.rs`:
- Around line 50-52: The code currently removes the power-up from self.power_ups
before calling self.entity.digest("PowerUpCollected", &(x, y)), so if digest
fails the item is lost; fix by recording the event first or by cloning/borrowing
the power-up and calling self.entity.digest("PowerUpCollected", &(x, y)) before
mutating self.power_ups (or only remove the entry after digest returns Ok).
Update the logic around self.power_ups.remove(idx) and the call to
self.entity.digest so that PowerUpCollected is emitted prior to modifying
self.power_ups to preserve state/event consistency.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro Plus
Run ID: 589bce39-ef55-4ff0-955b-5b351a9e2a8e
📒 Files selected for processing (14)
docs/postgres-event-store.mdsourced_rust_macros/src/lib.rssrc/emitter/entity_ext.rssrc/entity/entity.rssrc/entity/event_record.rssrc/entity/upcaster.rssrc/outbox_worker/worker.rssrc/read_model/in_memory.rstests/bomberman/commands.rstests/bomberman/domain/game_map.rstests/enqueue/aggregate.rstests/enqueue/main.rstests/sourced/aggregate.rstests/sourced/main.rs
💤 Files with no reviewable changes (1)
- docs/postgres-event-store.md
Refs [[tasks/review-power-up-digest-condition]]
Summary
digest/digest_v/digest_empty/set_snapshotthe fallible event-recording API, returningSourcedResult/EventRecordErrorinstead of panicking or splitting intotry_*variants#[digest],#[event], and#[enqueue]infersourced_rust::SourcedResult<()>for plain command methods, including inferred tail?expressionsupcast_eventsthe fallible upcaster API and reject same-version, cyclic, and backward version transitionsEntityEmitter::enqueue_withJSON serialization errors through#[enqueue]and sourced enqueue paths, while preserving replay-mode no-op behaviorBitcodePayloadCodecand unsupported-codec decode errorsVerification
cargo fmt --checkgit diff --checkcargo testcargo test --all-featuresNote:
cargo clippy --all-targets --all-features --locked -- -D warningswas also run and currently fails on existing unrelated workspace lints (module inception, needless lifetimes, type complexity, and similar).Implements [[tasks/harden-event-record-compatibility]]
Includes [[tasks/remove-emitter-serialization-panic]], [[tasks/audit-event-macro-option-fallbacks]], and the completed CodeRabbit review tasks under
tasks/review-*.Summary by CodeRabbit
New Features
Improvements
Documentation
Tests