feat: multi-part AOF persistence + fast RDB loader#63
Conversation
Ports the multi-part AOF persistence work from feat/persistence-overhaul (PR #37) as a fresh squash onto post-disk-offload main, dropping the stale branch's obsolete accept-loop / SO_REUSEPORT / cfg-gate changes. Additive content: - src/persistence/aof_manifest.rs — appendonlydir/ manifest + multi-part replay - src/persistence/rdb.rs — save_to_bytes / load_from_bytes + fast bulk loader (count_entries_per_db + Database::reserve + insert_for_load) - src/persistence/aof.rs — writer rewrite: waits for manifest, handles AofMessage::Rewrite{,Sharded} via do_rewrite_{single,sharded}, detects MOON magic prefix for RDB-preamble replay - src/command/persistence.rs — bgrewriteaof_start_sharded - src/storage/compact_value.rs — heap_string_owned / heap_string_vec_direct - src/storage/db.rs — insert_for_load / reserve / recalculate_memory - src/shard/shared_databases.rs — all_shard_dbs() - src/server/conn/handler_{sharded,monoio}.rs — BGREWRITEAOF dispatch - src/main.rs — replay_multi_part layered on v2/v3 recovery, manifest initialized after recovery so the writer thread can unblock Coexistence rule: when appendonlydir/ manifest is present it is authoritative; legacy appendonly.aof fallback (handled by v2 recovery inside restore_from_persistence) only fires when no manifest exists — covering first-upgrade from pre-multi-part moon. Known limitation: multi-part replay is single-shard only; multi-shard clusters log a warning and fall back to v2/v3 recovery. Validation (moon-dev OrbStack, Linux aarch64): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass cargo test --no-default-features --features runtime-tokio,jemalloc --lib 1877 pass Two failures in cargo test --release --lib (test_inline_set, test_inline_set_with_aof) and tests/replication_test.rs reproduce on clean main — pre-existing, not introduced by this change.
Review Summary by QodoMulti-part AOF persistence with fast RDB loader and BGREWRITEAOF support
WalkthroughsDescription• Multi-part AOF persistence with RDB base + incremental RESP format • Fast RDB bulk loader with 2–30× recovery speedup via pre-sizing and zero-copy • BGREWRITEAOF support for both single-shard and sharded modes • AOF manifest tracking (appendonlydir/) with atomic sequence advancement • Optimized string handling eliminating Vec→Bytes→Vec round-trips during load Diagramflowchart LR
A["AOF Writer Task"] -->|"Manifest load/wait"| B["AofManifest"]
B -->|"Advance seq"| C["RDB Base + Incr"]
D["BGREWRITEAOF"] -->|"Snapshot dbs"| E["RDB Serialization"]
E -->|"Write base"| C
F["Recovery"] -->|"Load base RDB"| G["Databases"]
G -->|"Replay RESP"| H["Final State"]
I["AOF Preamble"] -->|"Detect MOON magic"| J["RDB Parser"]
J -->|"Tail RESP"| K["Command Replay"]
File Changes1. src/persistence/aof_manifest.rs
|
📝 WalkthroughWalkthroughAdds Redis‑7 style multi‑part AOF manifest, RDB preamble support, in‑memory RDB save/load, sharded BGREWRITEAOF control and writer flows, startup multi‑part AOF replay/initialization, and bulk‑load DB helpers for sharded rewrites. Changes
Sequence Diagram(s)sequenceDiagram
participant Main as Main Process
participant Manifest as AofManifest
participant RDB as RDB Module
participant Engine as ReplayEngine
participant ShardDB as ShardDatabases
Main->>Manifest: load(persistence_dir)
Manifest-->>Main: manifest or None
alt manifest exists and num_shards == 1
Main->>Manifest: replay_multi_part(manifest)
Manifest->>RDB: load_from_bytes(base.rdb)
RDB-->>Manifest: (rdb_keys, bytes_consumed)
Manifest->>Engine: replay_incr_resp(incr.aof)
Engine->>ShardDB: apply commands
Manifest-->>Main: total_keys_loaded
else manifest missing
Main->>Manifest: initialize / initialize_with_base(...)
Manifest-->>Main: created manifest
end
sequenceDiagram
participant Client
participant Handler as Connection Handler
participant AofTx as AOF Channel
participant Writer as AOF Writer
participant Manifest as AofManifest
participant ShardDB as ShardDatabases
participant RDB as RDB Module
Client->>Handler: BGREWRITEAOF
Handler->>AofTx: send RewriteSharded(ShardDB)
Handler-->>Client: +OK
AofTx->>Writer: receive RewriteSharded
Writer->>ShardDB: iterate all_shard_dbs() & acquire locks
Writer->>RDB: save_to_bytes(snapshot)
RDB-->>Writer: rdb_bytes
Writer->>Manifest: advance(rdb_bytes)
Manifest-->>Writer: new_seq and create next incr file
Writer->>Writer: reopen incr file for appends
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Code Review by Qodo
1.
|
Addresses senior-rust-engineer review of #63. Six fixes across P0/P1: 1. AofManifest::load returns Result<Option<Self>, io::Error> Previous silent-None-on-corruption caused main.rs to call initialize() and overwrite the corrupt manifest, destroying the reference to the real base RDB and losing all persisted data. Corrupt manifest is now fatal at startup. 2. Orphan cleanup on manifest load Scans appendonlydir/ for stray moon.aof.{N}.{base.rdb,incr.aof, *.tmp} with N != current seq and deletes them. Previously a crash between advance() phases 1-3 left zombie base RDBs that never got referenced by any manifest, filling disk over repeated failures. 3. replay_incr_resp: fail-hard on parse error Previous impl did buf.split_to(1) + scan for next '*' byte, silently dropping runs of corrupt commands. '*' can legitimately appear inside bulk-string payloads, so resync was unsound. Recovery of a corrupt incr log is now an error, not silent data loss. 4. Rewrite ordering: drain + lock + snapshot + drain P0 bug: non-idempotent commands (INCR, LPUSH, SADD, ZADD, HINCRBY, APPEND, etc.) were double-applied on recovery after BGREWRITEAOF. The handler applies the write to the DB synchronously, then sends an AofMessage::Append asynchronously. During rewrite, appends queue in the channel while the writer thread is in do_rewrite_*. After rewrite, queued appends are processed into the NEW incr. If the write happened BEFORE the snapshot captured its shard, the write is both in base AND in new incr → replay double-applies. Fix: in do_rewrite_single/sharded, (a) drain the channel to the old incr and fsync, (b) acquire write locks on all (shard, db) pairs simultaneously, (c) drain once more to catch appends completed between step a and step b, (d) snapshot under the locks, (e) release locks, (f) write new base + advance manifest + reopen. Invariant: any write captured in the new base is NOT in the new incr (handlers were blocked), and any write NOT in the new base IS in the new incr (queued after lock release). Creates a brief global write pause during snapshot — acceptable cost for correctness. 5. AOF writer honors corrupt-manifest error Writer thread exits with a loud log instead of spinning on load() forever when the manifest is corrupt, so server startup fails fast. 6. Database::reserve debug_assert empty table Previous impl silently replaced the DashTable regardless of current contents — caller who misused reserve() on a populated database would lose all data without warning. Debug assertion catches the misuse in tests. Validation (moon-dev OrbStack): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass (test_inline_set* pre-existing failures on main, unrelated)
There was a problem hiding this comment.
Actionable comments posted: 5
🧹 Nitpick comments (3)
src/persistence/aof.rs (1)
747-761:current_time_ms()called inside inner loop — inefficient.Line 751 calls
current_time_ms()for each database in each shard. This syscall overhead is unnecessary; the timestamp for expiry filtering can be computed once before the outer loop, as done correctly inrewrite_aof_sharded_syncat line 836.♻️ Hoist timestamp outside loops
let db_count = shard_dbs.db_count(); + let now_ms = current_time_ms(); let mut merged: Vec<(/* ... */)> = (0..db_count).map(|_| (Vec::new(), 0u32)).collect(); for shard_locks in shard_dbs.all_shard_dbs() { for (db_idx, lock) in shard_locks.iter().enumerate() { let guard = lock.read(); let base_ts = guard.base_timestamp(); - let now_ms = current_time_ms(); if merged[db_idx].0.is_empty() {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/aof.rs` around lines 747 - 761, The code calls current_time_ms() inside the inner loop; compute now_ms once before iterating shard_dbs instead and reuse it for all expiry checks; update the block that iterates shard_dbs.all_shard_dbs() so now_ms is defined once above the outer for, keep using guard.base_timestamp() and merged[db_idx].1 = base_ts as before, and call entry.is_expired_at(base_ts, now_ms) with the hoisted now_ms (mirrors the approach in rewrite_aof_sharded_sync).src/main.rs (1)
272-276: Redundant manifest load.
AofManifest::load(&base_dir)is called twice: first at line 251 and again at line 272. Store the result from line 251 and reuse it.♻️ Suggested refactor
- if AofManifest::load(&base_dir).is_none() { + // Manifest was already checked above — initialize only if not present + } else { + // No manifest existed at line 251 — initialize for writer thread if let Err(e) = AofManifest::initialize(&base_dir) { tracing::error!("Failed to initialize AOF manifest: {}", e); } }Or restructure with an
elsebranch on the originalif let Some(manifest)at line 251.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/main.rs` around lines 272 - 276, You call AofManifest::load(&base_dir) twice; capture its result once (the earlier if let Some(manifest) at line 251) and reuse that Option instead of re-calling load. Replace the second load check with logic that inspects the stored Option (or use an else branch on the original if let Some(manifest)) and call AofManifest::initialize(&base_dir) only when the stored value is None, keeping the existing error logging on Err from AofManifest::initialize.src/persistence/rdb.rs (1)
523-529: Unused_shared_bufparameter — zero-copy optimization incomplete.The
_shared_bufparameter is prefixed with underscore indicating it's unused, yetload_from_bytes(line 806) allocates a fullBytes::copy_from_slice(data)to pass here. The function still usesread_bytes(which copies viaBytes::copy_from_slice) andread_bytes_vec(which allocates a new Vec).Either:
- Complete the zero-copy implementation by using
shared_buf.slice()for string values- Or remove the unused parameter and the allocation at line 806
♻️ Option 1: Remove unused allocation
fn read_entry_zero_copy( cursor: &mut Cursor<&[u8]>, type_tag: u8, - _shared_buf: &Bytes, cached_secs: u32, ) -> Result<(Bytes, Entry), MoonError> {And in
load_from_bytes:- let shared_buf = Bytes::copy_from_slice(data); // ... - type_tag => match read_entry_zero_copy(&mut cursor, type_tag, &shared_buf, now_secs) { + type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/rdb.rs` around lines 523 - 529, The `_shared_buf` parameter on read_entry_zero_copy is unused while load_from_bytes currently allocates a full Bytes to pass in; either implement true zero-copy or remove the dead parameter and allocation. To fix, either (A) complete the zero-copy path inside read_entry_zero_copy (and the helpers it calls) by using the provided shared_buf.slice(...) / shared_buf.slice_range(...) for string/byte payloads instead of calling read_bytes/read_bytes_vec/Bytes::copy_from_slice, ensuring you handle offsets/lengths from the cursor and return slices into shared_buf; or (B) remove the `_shared_buf` argument from read_entry_zero_copy and stop creating Bytes::copy_from_slice(...) in load_from_bytes, updating load_from_bytes and any callers (e.g., load_from_bytes) to pass no shared buffer and keep the existing copying behavior. Ensure function signatures and all call sites (read_entry_zero_copy, load_from_bytes) are updated consistently.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/main.rs`:
- Around line 243-278: The v2 playback and multi-part manifest replay can both
run and duplicate records; modify the v2 fallback path (the
restore_from_persistence_v2 call) to first check for an existing manifest via
AofManifest::load(&base_dir) and skip the legacy restore if a manifest exists,
or alternatively, if you prefer the current ordering, clear the target databases
before calling moon::persistence::aof_manifest::replay_multi_part; update the
logic around restore_from_persistence_v2 and/or before replay_multi_part to use
AofManifest::load to decide which path runs so records are not replayed twice.
In `@src/persistence/aof_manifest.rs`:
- Around line 302-309: In the Err(_) branch that currently drops bytes searching
for the next b'*' marker (the code using buf.split_to(1),
buf.iter().position(|&b| b == b'*'), etc.), emit a warning that includes how
many bytes were skipped and any parse error context instead of silently
swallowing it; modify that branch to calculate skipped_count (bytes removed
before finding '*' or until break), log via the project's logger (e.g., warn! or
tracing::warn!) with identifying info (skipped_count and that we advanced to the
next '*' marker), then perform the same buf.split_to(...) behaviour so replay
still advances but now records the corruption skip.
- Around line 227-229: The code currently only warns when the referenced base
RDB (base_path) is missing, which allows replaying deltas onto an empty DB;
update the logic at the spot with warn!("AOF base RDB not found: {}",
base_path.display()) so that if the base RDB file does not exist you return an
Err with a clear error (instead of continuing), unless the corresponding
incremental file is present and has zero length; implement this by checking the
incremental file's metadata (e.g., incremental_path or the incremental file
handle/size available in the manifest parsing function) and only proceed when
its size == 0, otherwise return a Manifest/IO error via the function's Result
(use the same error type used elsewhere in this module).
In `@src/persistence/aof.rs`:
- Around line 129-135: The current infinite busy-wait that repeatedly calls
AofManifest::load and sleeps can hang forever; replace it with a bounded wait
that either times out or respects a cancellation signal: record a start Instant,
loop calling AofManifest::load with a short sleep, and if Instant::elapsed
exceeds a configurable timeout (or if a shared shutdown/cancellation
flag/channel is set) return an Err or propagate a failure instead of blocking
indefinitely; update the code around the manifest variable initialization (where
AofManifest::load is polled and the loop sleeps) to use the timeout constant
and/or a Receiver/AtomicBool shutdown check so the AOF writer can fail fast and
allow graceful shutdown.
In `@src/persistence/rdb.rs`:
- Around line 804-806: Unnecessary allocation: remove the
Bytes::copy_from_slice(data) allocation that creates shared_buf and stop
creating/capturing that full-copy; instead pass the original data slice (or
None) into read_entry_zero_copy (or update its call sites) so zero-copy behavior
is preserved — delete the shared_buf binding and adjust any read_entry_zero_copy
invocation to use the original data (&[u8]) or omit the unused _shared_buf
parameter.
---
Nitpick comments:
In `@src/main.rs`:
- Around line 272-276: You call AofManifest::load(&base_dir) twice; capture its
result once (the earlier if let Some(manifest) at line 251) and reuse that
Option instead of re-calling load. Replace the second load check with logic that
inspects the stored Option (or use an else branch on the original if let
Some(manifest)) and call AofManifest::initialize(&base_dir) only when the stored
value is None, keeping the existing error logging on Err from
AofManifest::initialize.
In `@src/persistence/aof.rs`:
- Around line 747-761: The code calls current_time_ms() inside the inner loop;
compute now_ms once before iterating shard_dbs instead and reuse it for all
expiry checks; update the block that iterates shard_dbs.all_shard_dbs() so
now_ms is defined once above the outer for, keep using guard.base_timestamp()
and merged[db_idx].1 = base_ts as before, and call entry.is_expired_at(base_ts,
now_ms) with the hoisted now_ms (mirrors the approach in
rewrite_aof_sharded_sync).
In `@src/persistence/rdb.rs`:
- Around line 523-529: The `_shared_buf` parameter on read_entry_zero_copy is
unused while load_from_bytes currently allocates a full Bytes to pass in; either
implement true zero-copy or remove the dead parameter and allocation. To fix,
either (A) complete the zero-copy path inside read_entry_zero_copy (and the
helpers it calls) by using the provided shared_buf.slice(...) /
shared_buf.slice_range(...) for string/byte payloads instead of calling
read_bytes/read_bytes_vec/Bytes::copy_from_slice, ensuring you handle
offsets/lengths from the cursor and return slices into shared_buf; or (B) remove
the `_shared_buf` argument from read_entry_zero_copy and stop creating
Bytes::copy_from_slice(...) in load_from_bytes, updating load_from_bytes and any
callers (e.g., load_from_bytes) to pass no shared buffer and keep the existing
copying behavior. Ensure function signatures and all call sites
(read_entry_zero_copy, load_from_bytes) are updated consistently.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 7fae0539-707a-401e-b744-e9d2a3c2d056
📒 Files selected for processing (11)
src/command/persistence.rssrc/main.rssrc/persistence/aof.rssrc/persistence/aof_manifest.rssrc/persistence/mod.rssrc/persistence/rdb.rssrc/server/conn/handler_monoio.rssrc/server/conn/handler_sharded.rssrc/shard/shared_databases.rssrc/storage/compact_value.rssrc/storage/db.rs
👮 Files not reviewed due to content moderation or server errors (6)
- src/storage/db.rs
- src/storage/compact_value.rs
- src/persistence/mod.rs
- src/shard/shared_databases.rs
- src/server/conn/handler_monoio.rs
- src/server/conn/handler_sharded.rs
…guard Addresses four real issues from PR #63 review (qodo + coderabbit). Three were P0/P1 correctness bugs; one is correctness-adjacent. 1. Legacy AOF double-replay on upgrade (qodo #2 / coderabbit critical) On first upgrade from legacy single-file AOF, restore_from_persistence replays appendonly.aof into the databases, then main.rs used to call initialize() which created an empty manifest (no base). On the NEXT boot the multi-part replay path ran clear() and then had nothing to load → all legacy state lost. Additionally the legacy file remained on disk, so v2 recovery kept replaying it on subsequent boots, double-applying on top of whatever multi-part state existed. Fix: at first upgrade, if restore_from_persistence loaded any state, serialize it via rdb::save_to_bytes and create the manifest with a real base seq 1 via AofManifest::initialize_with_base(). Rename the legacy appendonly.aof to appendonly.aof.legacy so v2 recovery on the next boot can't find it. Also retire the legacy file after a successful replay_multi_part for the second-boot case. 2. Base RDB not fsynced before manifest publish (qodo #6) AofManifest::advance used std::fs::write + rename, which renames an open file whose contents aren't guaranteed to be on disk. A crash after the manifest write could publish a seq pointing at a base whose contents weren't durable. Fix: explicit File::create + write_all + sync_data + rename. Same pattern applied to initialize_with_base. 3. Multi-part replay clears databases before loading Prevents the double-apply of non-idempotent commands from any state that earlier recovery phases (WAL, legacy AOF) may have loaded. The multi-part AOF is the authoritative source. 4. Missing base + non-empty incr is now an error (coderabbit minor) Previously warned and continued, which would apply deltas on empty state. Now returns an error. Empty-incr case (fresh initialize) is still tolerated. Already addressed in earlier commits (noted in review): - coderabbit: infinite busy-wait on missing manifest — ca2ec51 - coderabbit: silent corruption skip in replay_incr_resp — ca2ec51 - qodo #3: monoio manifest wait can hang on corrupt — ca2ec51 - qodo #4: corrupt manifest reinitialized — ca2ec51 Validation on moon-dev (Linux aarch64): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok cargo test --release --lib 1858 pass Manual smoke tests passed: 1. BGREWRITEAOF with mixed R/W load — seq advanced, manifest correct 2. Crash-recovery kill -9 mid-rewrite — 3000/3000 keys recovered 3. Double-apply regression: 2000 concurrent INCRs during BGREWRITEAOF — base had snapshot state, incr had remainder, restart counter=2000 4. First-upgrade from legacy appendonly.aof — state captured as base seq 1, legacy file retired, all keys survive next boot without BGREWRITEAOF 5. Corrupt manifest (seq 0) — server refuses to start with clear error
Addresses the last two cosmetic PR #63 comments: - qodo: #[allow(dead_code)] on read_bytes_zero_copy lacked justification - coderabbit: unnecessary Bytes::copy_from_slice(data) full-buffer copy fed into read_entry_zero_copy's ignored _shared_buf parameter The "zero-copy" path through read_entry_zero_copy was never actually zero-copy — read_bytes_zero_copy was defined but never called, and read_entry_zero_copy ignored the buffer it was given. This commit: - Deletes read_bytes_zero_copy (truly dead) - Removes the unused _shared_buf parameter from read_entry_zero_copy - Removes the Bytes::copy_from_slice(data) allocation in load_from_bytes that existed solely to feed that parameter - Updates the two call sites - Documents the rationale so a future zero-copy revival adds the plumbing as part of a landed change, not as vestigial code No behavior change; smaller binary, one fewer full-buffer allocation on RDB load_from_bytes. Validation (moon-dev): cargo fmt --check ok cargo clippy --release -- -D warnings ok cargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warnings ok
…rvival
An RDB file (both standalone .rdb and AOF RDB-preamble) is a full
point-in-time snapshot. Loading it must replace the in-memory state,
not merge into it. Previously both rdb::load() and rdb::load_from_bytes()
called insert_for_load() directly on the live databases, so keys that
existed before the load but were absent from the RDB snapshot silently
survived — producing mixed state.
Fix: both load paths now create temporary Database instances, load
entries into them, then swap the temps into the live slots on success.
This provides:
- Correctness: old keys not in the snapshot are gone after load.
- Atomicity: if the load fails partway, original state is untouched.
- Consistent metadata: recalculate_memory runs on temps before swap,
so used_memory reflects exactly the loaded state.
The swap is safe w.r.t. cold_index/cold_shard_dir because main.rs
initializes those fields after restore_from_persistence completes.
Validation (moon-dev):
cargo fmt --check ok
cargo clippy --release / --runtime-tokio,jemalloc ok
cargo test --release --lib 1858 pass
Manual: 150 keys through BGREWRITEAOF + incr + restart PASS
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
src/persistence/aof.rs (1)
132-148:⚠️ Potential issue | 🟠 MajorBound the manifest wait loop.
If startup never creates the manifest, this branch blocks forever before the writer can process shutdown. Please gate it on
cancelor a timeout and fail fast instead of spinning indefinitely.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/aof.rs` around lines 132 - 148, The loop that waits for AofManifest::load(&base_dir) can spin forever; change the loop inside the writer initialization (the block assigning manifest) to bound waiting by either observing the provided cancellation flag (e.g. a cancel token/Receiver) or a hard timeout/retry limit and then return an error/exit so the writer doesn't block forever. Specifically, wrap the loop that currently calls AofManifest::load and sleeps in a gated wait that checks the cancel signal each iteration and/or tracks elapsed time/retry count, and on timeout or cancel log a clear error and return instead of spinning.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@src/persistence/aof_manifest.rs`:
- Around line 132-161: The load() function currently trusts any parsed seq and
immediately calls cleanup_orphans(); instead, validate base/incr records parsed
from content before running cleanup_orphans(): while iterating content.lines()
(where you parse "seq"), also collect and validate records like "base <n>
<name>" and "incr <n> <name>" (or whatever manifest record format is used) by
checking they parse to a u64 sequence and that the referenced file exists under
dir (use dir.join(filename) and metadata or file existence checks); if no valid
base/incr records are found or the records do not match expected sequences, do
not call cleanup_orphans() and return an error (or skip cleanup) so
corrupted/truncated manifests (e.g., only "seq 2") cannot trigger deletion;
update the code around load(), seq variable, manifest_path, and the
manifest.cleanup_orphans() call to perform this validation first.
In `@src/persistence/rdb.rs`:
- Around line 324-351: In the match handling the result of read_entry_zero_copy,
do not just break on Err(e); instead propagate/return an error from the RDB
loader so we never swap the partially-filled temp_dbs into the live databases.
Concretely, replace the Err(e) arm that calls tracing::warn! and break with a
return Err(...) (or use the ? operator after mapping the error) so the outer
function exits with an error; keep read_entry_zero_copy, temp_dbs, databases,
and the final swap/recalculate_memory lines as the anchors to locate the change.
Apply the same change to the other identical error branch mentioned (around the
857-878 region) so corrupted-but-checksummed RDBs do not commit partial
snapshots.
- Around line 752-776: The current loop recomputes the CRC over data[..=i] for
every EOF_MARKER candidate which is O(n^2); instead, instantiate the CRC Hasher
once before the scan, iterate bytes from 0 upward updating hasher per byte, and
only when i >= 5 and data[i] == EOF_MARKER read the 4 stored CRC bytes and
compare stored to hasher.clone().finalize() (or use the hasher type's
snapshot/clone API) to avoid re-hashing prefixes; on match set rdb_end = Some(i
+ 5) and break. Ensure you reference EOF_MARKER, rdb_end, Hasher and data so the
new single-pass check replaces the inner re-hash of payload.
- Around line 356-881: The file is too large; split loader and helper logic into
submodules: create src/persistence/rdb/load.rs and src/persistence/rdb/save.rs
(or equivalent) and move the functions and helpers responsible for
reading/parsing RDB data (count_entries_per_db, skip_entry, read_u32_raw,
skip_bytes_field, read_entry_zero_copy, and load_from_bytes plus any directly
related stream/entry helpers) into load.rs while keeping public types and shared
utilities in rdb.rs or a new rdb/mod.rs; update module declarations (mod load;
mod save;) and adjust imports/paths where these functions are referenced so
compilation passes (e.g., crate::persistence::rdb::load::read_entry_zero_copy or
re-export with pub use), ensure visibility (pub(crate) / pub) is set
appropriately and run cargo build/tests to verify no missing symbols.
---
Duplicate comments:
In `@src/persistence/aof.rs`:
- Around line 132-148: The loop that waits for AofManifest::load(&base_dir) can
spin forever; change the loop inside the writer initialization (the block
assigning manifest) to bound waiting by either observing the provided
cancellation flag (e.g. a cancel token/Receiver) or a hard timeout/retry limit
and then return an error/exit so the writer doesn't block forever. Specifically,
wrap the loop that currently calls AofManifest::load and sleeps in a gated wait
that checks the cancel signal each iteration and/or tracks elapsed time/retry
count, and on timeout or cancel log a clear error and return instead of
spinning.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: d00f2c5b-1bc4-4e77-9729-6d58d5cc4ccb
📒 Files selected for processing (5)
src/main.rssrc/persistence/aof.rssrc/persistence/aof_manifest.rssrc/persistence/rdb.rssrc/storage/db.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- src/storage/db.rs
- src/main.rs
| /// Fast first-pass: count entries per database without parsing values. | ||
| /// Scans type tags and skips over entry payloads to count keys per db_idx. | ||
| fn count_entries_per_db(cursor: &Cursor<&[u8]>, db_count: usize) -> Vec<usize> { | ||
| let mut counts = vec![0usize; db_count]; | ||
| let data = cursor.get_ref(); | ||
| let mut pos = cursor.position() as usize; | ||
| let mut current_db = 0usize; | ||
|
|
||
| while pos < data.len() { | ||
| let tag = data[pos]; | ||
| pos += 1; | ||
|
|
||
| match tag { | ||
| EOF_MARKER => break, | ||
| DB_SELECTOR => { | ||
| if pos < data.len() { | ||
| current_db = data[pos] as usize; | ||
| pos += 1; | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| TYPE_STRING | TYPE_HASH | TYPE_LIST | TYPE_SET | TYPE_SORTED_SET | TYPE_STREAM => { | ||
| if current_db < db_count { | ||
| counts[current_db] += 1; | ||
| } | ||
| // Skip over the entry payload without parsing | ||
| if let Some(new_pos) = skip_entry(data, pos, tag) { | ||
| pos = new_pos; | ||
| } else { | ||
| break; | ||
| } | ||
| } | ||
| _ => break, | ||
| } | ||
| } | ||
|
|
||
| counts | ||
| } | ||
|
|
||
| /// Skip over an RDB entry's bytes without allocating or parsing values. | ||
| /// Returns the new position after the entry, or None if data is truncated. | ||
| fn skip_entry(data: &[u8], mut pos: usize, type_tag: u8) -> Option<usize> { | ||
| // Skip key | ||
| pos = skip_bytes_field(data, pos)?; | ||
| // Skip TTL (8 bytes) | ||
| pos = pos.checked_add(8)?; | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
|
|
||
| match type_tag { | ||
| TYPE_STRING => { | ||
| pos = skip_bytes_field(data, pos)?; | ||
| } | ||
| TYPE_HASH => { | ||
| let count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..count { | ||
| pos = skip_bytes_field(data, pos)?; // field | ||
| pos = skip_bytes_field(data, pos)?; // value | ||
| } | ||
| } | ||
| TYPE_LIST | TYPE_SET => { | ||
| let count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..count { | ||
| pos = skip_bytes_field(data, pos)?; | ||
| } | ||
| } | ||
| TYPE_SORTED_SET => { | ||
| let count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..count { | ||
| pos = skip_bytes_field(data, pos)?; // member | ||
| pos = pos.checked_add(8)?; // f64 score | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| } | ||
| } | ||
| TYPE_STREAM => { | ||
| // entry_count(8) + last_id(16) | ||
| pos = pos.checked_add(24)?; | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| let entry_count = | ||
| u64::from_le_bytes(data[pos - 24..pos - 16].try_into().ok()?) as usize; | ||
| for _ in 0..entry_count { | ||
| pos = pos.checked_add(16)?; // StreamId (ms + seq) | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| let field_count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..field_count { | ||
| pos = skip_bytes_field(data, pos)?; | ||
| pos = skip_bytes_field(data, pos)?; | ||
| } | ||
| } | ||
| // Consumer groups | ||
| let group_count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..group_count { | ||
| pos = skip_bytes_field(data, pos)?; // group name | ||
| pos = pos.checked_add(16)?; // last_delivered_id | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| let pel_count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..pel_count { | ||
| pos = pos.checked_add(16)?; // StreamId | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| pos = skip_bytes_field(data, pos)?; // consumer name | ||
| pos = pos.checked_add(16)?; // delivery_time + delivery_count | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| } | ||
| let consumer_count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..consumer_count { | ||
| pos = skip_bytes_field(data, pos)?; // consumer name | ||
| pos = pos.checked_add(8)?; // seen_time | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| let pending_count = read_u32_raw(data, pos)?; | ||
| pos += 4; | ||
| for _ in 0..pending_count { | ||
| pos = pos.checked_add(16)?; // StreamId | ||
| if pos > data.len() { | ||
| return None; | ||
| } | ||
| } | ||
| Err(e) => { | ||
| let offset = cursor.position(); | ||
| tracing::warn!( | ||
| "RDB load: skipping corrupted entry at offset {}: {}", | ||
| offset, | ||
| e | ||
| ); | ||
| skipped_entries += 1; | ||
| // Cannot reliably skip to next entry in a variable-length | ||
| // format without framing, so break out of the loop. | ||
| // Entries loaded so far are valid (checksum passed). | ||
| tracing::warn!( | ||
| "RDB load: stopping mid-stream recovery after {} skipped entries; \ | ||
| {} keys loaded successfully", | ||
| skipped_entries, | ||
| total_keys | ||
| } | ||
| } | ||
| } | ||
| _ => return None, | ||
| } | ||
|
|
||
| Some(pos) | ||
| } | ||
|
|
||
| /// Read u32 LE from raw bytes without cursor overhead. | ||
| #[inline] | ||
| fn read_u32_raw(data: &[u8], pos: usize) -> Option<usize> { | ||
| if pos + 4 > data.len() { | ||
| return None; | ||
| } | ||
| Some(u32::from_le_bytes(data[pos..pos + 4].try_into().ok()?) as usize) | ||
| } | ||
|
|
||
| /// Skip a length-prefixed bytes field (4-byte LE length + payload). | ||
| #[inline] | ||
| fn skip_bytes_field(data: &[u8], pos: usize) -> Option<usize> { | ||
| let len = read_u32_raw(data, pos)?; | ||
| let new_pos = pos.checked_add(4)?.checked_add(len)?; | ||
| if new_pos > data.len() { | ||
| None | ||
| } else { | ||
| Some(new_pos) | ||
| } | ||
| } | ||
|
|
||
| /// Variant of read_entry using cached timestamps to avoid per-entry syscalls. | ||
| /// | ||
| /// Earlier revisions threaded a `shared_buf: &Bytes` through this path for | ||
| /// zero-copy slicing via `read_bytes_zero_copy`, but that helper was never | ||
| /// wired up — `read_bytes` currently always heap-allocates. The parameter | ||
| /// and the caller-side `Bytes::copy_from_slice(data)` that fed it have been | ||
| /// removed; restoring true zero-copy should add it back as part of a single | ||
| /// landed change, not as vestigial plumbing. | ||
| fn read_entry_zero_copy( | ||
| cursor: &mut Cursor<&[u8]>, | ||
| type_tag: u8, | ||
| cached_secs: u32, | ||
| ) -> Result<(Bytes, Entry), MoonError> { | ||
| let key = read_bytes(cursor)?; | ||
|
|
||
| let mut ttl_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut ttl_buf)?; | ||
| let ttl_ms = i64::from_le_bytes(ttl_buf); | ||
| let expires_at_ms = if ttl_ms > 0 { ttl_ms as u64 } else { 0 }; | ||
|
|
||
| let value = match type_tag { | ||
| TYPE_STRING => { | ||
| // Fast path: build CompactValue directly from Vec, skipping RedisValue intermediate. | ||
| // This avoids: Vec → Bytes → RedisValue::String → from_redis_value → heap_string_vec | ||
| // and instead does: Vec → CompactValue directly (one Box alloc, zero copy). | ||
| let vec = read_bytes_vec(cursor)?; | ||
| let cv = if vec.len() <= 12 { | ||
| crate::storage::compact_value::CompactValue::from_redis_value(RedisValue::String( | ||
| Bytes::from(vec), | ||
| )) | ||
| } else { | ||
| crate::storage::compact_value::CompactValue::heap_string_vec_direct(vec) | ||
| }; | ||
| let mut entry = Entry::new_string(Bytes::new()); | ||
| entry.value = cv; | ||
| if expires_at_ms > 0 { | ||
| entry.set_expires_at_ms(cached_secs, expires_at_ms); | ||
| } | ||
| entry.set_last_access(cached_secs); | ||
| entry.set_access_counter(5); | ||
| return Ok((key, entry)); | ||
| } | ||
| TYPE_HASH => { | ||
| let count = read_u32(cursor)? as usize; | ||
| validate_count(cursor, count, 8, "hash")?; | ||
| let mut map = HashMap::with_capacity(count); | ||
| for _ in 0..count { | ||
| let field = read_bytes(cursor)?; | ||
| let val = read_bytes(cursor)?; | ||
| map.insert(field, val); | ||
| } | ||
| RedisValue::Hash(map) | ||
| } | ||
| TYPE_LIST => { | ||
| let count = read_u32(cursor)? as usize; | ||
| validate_count(cursor, count, 4, "list")?; | ||
| let mut list = VecDeque::with_capacity(count); | ||
| for _ in 0..count { | ||
| list.push_back(read_bytes(cursor)?); | ||
| } | ||
| RedisValue::List(list) | ||
| } | ||
| TYPE_SET => { | ||
| let count = read_u32(cursor)? as usize; | ||
| validate_count(cursor, count, 4, "set")?; | ||
| let mut set = HashSet::with_capacity(count); | ||
| for _ in 0..count { | ||
| set.insert(read_bytes(cursor)?); | ||
| } | ||
| RedisValue::Set(set) | ||
| } | ||
| TYPE_SORTED_SET => { | ||
| let count = read_u32(cursor)? as usize; | ||
| validate_count(cursor, count, 12, "sorted_set")?; | ||
| let mut members = HashMap::with_capacity(count); | ||
| let mut tree = BPTree::new(); | ||
| for _ in 0..count { | ||
| let member = read_bytes(cursor)?; | ||
| let mut score_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut score_buf)?; | ||
| let score = f64::from_le_bytes(score_buf); | ||
| members.insert(member.clone(), score); | ||
| tree.insert(OrderedFloat(score), member); | ||
| } | ||
| RedisValue::SortedSetBPTree { tree, members } | ||
| } | ||
| TYPE_STREAM => { | ||
| // Stream parsing: reuse read_bytes (not zero-copy for this rare type) | ||
| let mut entry_count_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut entry_count_buf)?; | ||
| let entry_count = u64::from_le_bytes(entry_count_buf) as usize; | ||
| let mut last_id_ms_buf = [0u8; 8]; | ||
| let mut last_id_seq_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut last_id_ms_buf)?; | ||
| cursor.read_exact(&mut last_id_seq_buf)?; | ||
| let last_id = StreamId { | ||
| ms: u64::from_le_bytes(last_id_ms_buf), | ||
| seq: u64::from_le_bytes(last_id_seq_buf), | ||
| }; | ||
| let mut stream = StreamData::new(); | ||
| stream.last_id = last_id; | ||
| validate_count(cursor, entry_count, 20, "stream_entries")?; | ||
| for _ in 0..entry_count { | ||
| let mut ms_buf = [0u8; 8]; | ||
| let mut seq_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut ms_buf)?; | ||
| cursor.read_exact(&mut seq_buf)?; | ||
| let id = StreamId { | ||
| ms: u64::from_le_bytes(ms_buf), | ||
| seq: u64::from_le_bytes(seq_buf), | ||
| }; | ||
| let field_count = read_u32(cursor)? as usize; | ||
| validate_count(cursor, field_count, 8, "stream_fields")?; | ||
| let mut fields = Vec::with_capacity(field_count); | ||
| for _ in 0..field_count { | ||
| fields.push((read_bytes(cursor)?, read_bytes(cursor)?)); | ||
| } | ||
| stream.entries.insert(id, fields); | ||
| stream.length += 1; | ||
| } | ||
| let group_count = read_u32(cursor)? as usize; | ||
| for _ in 0..group_count { | ||
| let group_name = read_bytes(cursor)?; | ||
| let mut gld_ms = [0u8; 8]; | ||
| let mut gld_seq = [0u8; 8]; | ||
| cursor.read_exact(&mut gld_ms)?; | ||
| cursor.read_exact(&mut gld_seq)?; | ||
| let last_delivered_id = StreamId { | ||
| ms: u64::from_le_bytes(gld_ms), | ||
| seq: u64::from_le_bytes(gld_seq), | ||
| }; | ||
| let pel_count = read_u32(cursor)? as usize; | ||
| let mut pel = BTreeMap::new(); | ||
| for _ in 0..pel_count { | ||
| let mut pid_ms = [0u8; 8]; | ||
| let mut pid_seq = [0u8; 8]; | ||
| cursor.read_exact(&mut pid_ms)?; | ||
| cursor.read_exact(&mut pid_seq)?; | ||
| let pid = StreamId { | ||
| ms: u64::from_le_bytes(pid_ms), | ||
| seq: u64::from_le_bytes(pid_seq), | ||
| }; | ||
| let consumer_name = read_bytes(cursor)?; | ||
| let mut dt_buf = [0u8; 8]; | ||
| let mut dc_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut dt_buf)?; | ||
| cursor.read_exact(&mut dc_buf)?; | ||
| pel.insert( | ||
| pid, | ||
| crate::storage::stream::PendingEntry { | ||
| consumer: consumer_name, | ||
| delivery_time: u64::from_le_bytes(dt_buf), | ||
| delivery_count: u64::from_le_bytes(dc_buf), | ||
| }, | ||
| ); | ||
| } | ||
| let consumer_count = read_u32(cursor)? as usize; | ||
| let mut consumers = HashMap::new(); | ||
| for _ in 0..consumer_count { | ||
| let cname = read_bytes(cursor)?; | ||
| let mut st_buf = [0u8; 8]; | ||
| cursor.read_exact(&mut st_buf)?; | ||
| let seen_time = u64::from_le_bytes(st_buf); | ||
| let pending_count = read_u32(cursor)? as usize; | ||
| let mut pending = BTreeMap::new(); | ||
| for _ in 0..pending_count { | ||
| let mut cid_ms = [0u8; 8]; | ||
| let mut cid_seq = [0u8; 8]; | ||
| cursor.read_exact(&mut cid_ms)?; | ||
| cursor.read_exact(&mut cid_seq)?; | ||
| pending.insert( | ||
| StreamId { | ||
| ms: u64::from_le_bytes(cid_ms), | ||
| seq: u64::from_le_bytes(cid_seq), | ||
| }, | ||
| (), | ||
| ); | ||
| break; | ||
| } | ||
| consumers.insert( | ||
| cname.clone(), | ||
| crate::storage::stream::Consumer { | ||
| name: cname, | ||
| pending, | ||
| seen_time, | ||
| }, | ||
| ); | ||
| } | ||
| stream.groups.insert( | ||
| group_name, | ||
| crate::storage::stream::ConsumerGroup { | ||
| last_delivered_id, | ||
| pel, | ||
| consumers, | ||
| }, | ||
| ); | ||
| } | ||
| RedisValue::Stream(Box::new(stream)) | ||
| } | ||
| _ => return Err(RdbError::UnsupportedType { type_tag }.into()), | ||
| }; | ||
|
|
||
| let mut entry = Entry::new_string(Bytes::new()); | ||
| entry.value = crate::storage::compact_value::CompactValue::from_redis_value(value); | ||
| if expires_at_ms > 0 { | ||
| entry.set_expires_at_ms(cached_secs, expires_at_ms); | ||
| } | ||
| entry.set_last_access(cached_secs); | ||
| entry.set_access_counter(5); | ||
|
|
||
| Ok((key, entry)) | ||
| } | ||
|
|
||
| /// Load an RDB snapshot from a byte slice (for AOF RDB-preamble format). | ||
| /// | ||
| /// Returns `(keys_loaded, bytes_consumed)`. The caller can use `bytes_consumed` | ||
| /// to find the start of any RESP commands appended after the RDB preamble. | ||
| pub fn load_from_bytes( | ||
| databases: &mut [Database], | ||
| data: &[u8], | ||
| ) -> Result<(usize, usize), MoonError> { | ||
| if data.len() < RDB_MAGIC.len() + 1 + 1 + 4 { | ||
| return Err(RdbError::Corrupted { | ||
| detail: "RDB preamble too small".into(), | ||
| } | ||
| .into()); | ||
| } | ||
|
|
||
| // Find EOF_MARKER to determine RDB section length. | ||
| // The RDB section is: header + entries + EOF_MARKER(1) + CRC32(4). | ||
| // We scan for EOF_MARKER (0xFF) — the first one after the header that's | ||
| // immediately followed by a valid CRC32 of the preceding bytes. | ||
| let mut rdb_end = None; | ||
| // Start scanning after header (MOON + version = 5 bytes) | ||
| for i in 5..data.len().saturating_sub(4) { | ||
| if data[i] == EOF_MARKER { | ||
| let payload = &data[..=i]; // everything up to and including EOF_MARKER | ||
| if let Some(checksum_bytes) = data.get(i + 1..i + 5) { | ||
| let stored = u32::from_le_bytes([ | ||
| checksum_bytes[0], | ||
| checksum_bytes[1], | ||
| checksum_bytes[2], | ||
| checksum_bytes[3], | ||
| ]); | ||
| let mut hasher = Hasher::new(); | ||
| hasher.update(payload); | ||
| if hasher.finalize() == stored { | ||
| rdb_end = Some(i + 5); // past CRC32 | ||
| break; | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| let rdb_len = rdb_end.ok_or_else(|| { | ||
| MoonError::from(RdbError::Corrupted { | ||
| detail: "RDB preamble: no valid EOF+CRC found".into(), | ||
| }) | ||
| })?; | ||
|
|
||
| // Load using the same logic as `load`, but from the byte slice | ||
| let payload = &data[..rdb_len - 4]; // exclude CRC32 | ||
| let mut cursor = Cursor::new(payload); | ||
|
|
||
| // Skip magic + version | ||
| let mut magic = [0u8; 4]; | ||
| cursor.read_exact(&mut magic).map_err(|e| RdbError::Io { | ||
| path: std::path::PathBuf::from("<aof-preamble>"), | ||
| source: e, | ||
| })?; | ||
| if &magic != RDB_MAGIC { | ||
| return Err(RdbError::Corrupted { | ||
| detail: "invalid RDB magic in AOF preamble".into(), | ||
| } | ||
| .into()); | ||
| } | ||
| let mut version = [0u8; 1]; | ||
| cursor.read_exact(&mut version).map_err(|e| RdbError::Io { | ||
| path: std::path::PathBuf::from("<aof-preamble>"), | ||
| source: e, | ||
| })?; | ||
| if version[0] != RDB_VERSION { | ||
| return Err(RdbError::UnsupportedVersion { | ||
| version: version[0] as u32, | ||
| } | ||
| .into()); | ||
| } | ||
|
|
||
| let now_ms = current_time_ms(); | ||
| let now_secs = (now_ms / 1000) as u32; | ||
| let mut total_keys = 0usize; | ||
| let mut current_db: usize = 0; | ||
|
|
||
| // Load into temporary databases so that: | ||
| // (a) If the load fails partway, original state is untouched. | ||
| // (b) Old keys not present in the RDB snapshot don't survive — an RDB | ||
| // preamble is a full point-in-time snapshot and must replace state, | ||
| // not merge into it. | ||
| let db_count = databases.len(); | ||
| let mut temp_dbs: Vec<Database> = (0..db_count).map(|_| Database::new()).collect(); | ||
|
|
||
| // Pre-size DashTables on the temporary databases | ||
| let entry_counts = count_entries_per_db(&cursor, db_count); | ||
| for (db_idx, &count) in entry_counts.iter().enumerate() { | ||
| if count > 0 && db_idx < db_count { | ||
| temp_dbs[db_idx].reserve(count); | ||
| } | ||
| } | ||
|
|
||
| loop { | ||
| let mut tag = [0u8; 1]; | ||
| if cursor.read_exact(&mut tag).is_err() { | ||
| break; | ||
| } | ||
| match tag[0] { | ||
| EOF_MARKER => break, | ||
| DB_SELECTOR => { | ||
| let mut db_idx = [0u8; 1]; | ||
| cursor.read_exact(&mut db_idx).map_err(|e| RdbError::Io { | ||
| path: std::path::PathBuf::from("<aof-preamble>"), | ||
| source: e, | ||
| })?; | ||
| current_db = db_idx[0] as usize; | ||
| if current_db >= db_count { | ||
| return Err(RdbError::Corrupted { | ||
| detail: format!( | ||
| "RDB preamble references database {} but only {} configured", | ||
| current_db, db_count | ||
| ), | ||
| } | ||
| .into()); | ||
| } | ||
| } | ||
| type_tag => match read_entry_zero_copy(&mut cursor, type_tag, now_secs) { | ||
| Ok((key, entry)) => { | ||
| if entry.has_expiry() && entry.is_expired_at(now_secs, now_ms) { | ||
| continue; | ||
| } | ||
| if current_db < db_count { | ||
| temp_dbs[current_db].insert_for_load(key, entry); | ||
| total_keys += 1; | ||
| } | ||
| } | ||
| Err(_) => break, | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| if skipped_entries > 0 { | ||
| tracing::warn!( | ||
| "RDB load completed with {} entries skipped due to corruption, {} keys loaded", | ||
| skipped_entries, | ||
| total_keys | ||
| ); | ||
| // Recalculate memory on temp databases, then swap into the live ones. | ||
| // This replaces the entire in-memory state for each database — old keys | ||
| // that were not in the RDB snapshot are discarded. | ||
| for (live, mut temp) in databases.iter_mut().zip(temp_dbs.into_iter()) { | ||
| temp.recalculate_memory(); | ||
| *live = temp; | ||
| } | ||
|
|
||
| Ok(total_keys) | ||
| Ok((total_keys, rdb_len)) | ||
| } |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Split this module before adding more loader logic here.
src/persistence/rdb.rs is now well past the repo's 1500-line limit. The new save/load entrypoints and the scan/skip helpers are good candidates for rdb/save.rs and rdb/load.rs.
As per coding guidelines, "No single .rs file should exceed 1500 lines. Split into submodules if approaching this limit."
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/persistence/rdb.rs` around lines 356 - 881, The file is too large; split
loader and helper logic into submodules: create src/persistence/rdb/load.rs and
src/persistence/rdb/save.rs (or equivalent) and move the functions and helpers
responsible for reading/parsing RDB data (count_entries_per_db, skip_entry,
read_u32_raw, skip_bytes_field, read_entry_zero_copy, and load_from_bytes plus
any directly related stream/entry helpers) into load.rs while keeping public
types and shared utilities in rdb.rs or a new rdb/mod.rs; update module
declarations (mod load; mod save;) and adjust imports/paths where these
functions are referenced so compilation passes (e.g.,
crate::persistence::rdb::load::read_entry_zero_copy or re-export with pub use),
ensure visibility (pub(crate) / pub) is set appropriately and run cargo
build/tests to verify no missing symbols.
…riter timeout Four fixes from code review: 1. aof_manifest::load — validate base/incr records before orphan cleanup A truncated manifest containing only "seq 2" (no base/incr lines) would pass the seq > 0 check, then cleanup_orphans would delete files matching the PREVIOUS valid seq — destroying the actual recovery data. Now load() requires all three records (seq, base, incr) to be present. Truncated manifests return Err, which callers already treat as fatal. 2. rdb::load + rdb::load_from_bytes — return Err on corrupted entries Both load paths now load into temp_dbs, but on Err from read_entry_zero_copy they used to break and fall through to the swap, committing partially-loaded temp databases into live state. Now both paths return Err immediately, leaving live databases untouched. The error includes byte offset and key count for diagnosis. 3. rdb::load_from_bytes — single-pass CRC scan (O(n) vs O(n²)) The EOF_MARKER search was re-hashing data[0..=i] from scratch for each candidate byte. Now maintains a running crc32fast::Hasher updated byte-by-byte, cloning at each candidate position. On a 10MB RDB preamble with k candidates, this reduces from O(n*k) hash bytes to O(n) total. 4. aof::aof_writer_task — bound manifest wait with cancel + timeout The monoio writer's manifest wait loop now checks the CancellationToken each iteration and enforces a 60s hard timeout. Previously if main.rs failed to create the manifest (disk full, permission error), the writer would spin forever, blocking graceful shutdown. Now exits cleanly with a diagnostic log. Skipped: rdb.rs submodule split (1726 lines, above 1500 limit but high-risk churn for a correctness PR — tracked for follow-up). Validation (moon-dev): cargo fmt --check ok cargo clippy --release / --runtime-tokio,jemalloc ok cargo test --release --lib 1858 pass Smoke: 700 keys + INCR counter through BGREWRITEAOF + restart PASS
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/persistence/rdb.rs (2)
75-76:⚠️ Potential issue | 🟠 MajorReject database indices above
u8::MAXinstead of wrapping them.
db_idx as u8silently truncates, so database 256 is serialized as database 0 and will round-trip into the wrong slot. Please convert withu8::try_from(...)and return an error here; ideally share that check withsave_from_snapshot()too.💡 Suggested fix
- buf.write_all(&[DB_SELECTOR])?; - buf.write_all(&[db_idx as u8])?; + let db_idx_u8 = u8::try_from(db_idx).map_err(|_| RdbError::Corrupted { + detail: format!("RDB format supports at most 256 databases, got index {}", db_idx), + })?; + buf.write_all(&[DB_SELECTOR, db_idx_u8])?;Also applies to: 190-191
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/rdb.rs` around lines 75 - 76, Replace the silent truncation of database indices when serializing (the buf.write_all(&[db_idx as u8]) calls) with a checked conversion using u8::try_from(...) and return a clear error on conversion failure; factor this validation into a small helper so both the serialization site (where DB_SELECTOR is written) and save_from_snapshot() reuse the same check and error handling instead of allowing indices > u8::MAX to wrap.
295-306:⚠️ Potential issue | 🔴 CriticalFail hard if parsing reaches the end before a top-level EOF.
Line 297 and Line 852 still treat
read_exactexhaustion as success. A checksum-valid but structurally invalid RDB whoseEOF_MARKERis missing or consumed by a bad length field can therefore commit whatever keys were parsed before the short read.💡 Suggested fix
- if cursor.read_exact(&mut tag).is_err() { - tracing::warn!( - "RDB load: truncated tail after {} keys (no EOF marker)", - total_keys - ); - break; - } + if cursor.read_exact(&mut tag).is_err() { + return Err(RdbError::Corrupted { + detail: format!("RDB load: missing EOF marker after {} keys", total_keys), + } + .into()); + }Apply the same pattern in
load_from_bytes()with the preamble-specific error text.Also applies to: 850-856
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/rdb.rs` around lines 295 - 306, The loop that reads tags currently treats cursor.read_exact(&mut tag).is_err() as a truncated tail warning and continues; change this to fail hard by propagating an error (return Err) when read_exact fails instead of logging and breaking, using the same preamble-specific error message pattern already used elsewhere; update both the top-level tag loop (the block referencing total_keys, EOF_MARKER and cursor.read_exact) and the analogous code in load_from_bytes() so any short read before encountering EOF_MARKER produces an immediate Err with a clear message about unexpected EOF/truncated RDB rather than silently committing parsed keys.
♻️ Duplicate comments (1)
src/persistence/rdb.rs (1)
361-907: 🛠️ Refactor suggestion | 🟠 MajorThis module is still over the repo's 1500-line cap.
The new save/load entrypoints and helpers keep
src/persistence/rdb.rsat roughly 1.7k lines, so persistence fixes now need to be maintained in a file that's already beyond the repository limit. Please follow through on the earlier split intordb/save.rsandrdb/load.rsbefore adding more behavior here. As per coding guidelines, "No single.rsfile should exceed 1500 lines. Split into submodules if approaching this limit."🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/persistence/rdb.rs` around lines 361 - 907, The file exceeds the 1500-line limit—split persistence logic into rdb/load.rs and rdb/save.rs and move load-related functions into rdb/load.rs: count_entries_per_db, skip_entry, read_u32_raw, skip_bytes_field, read_entry_zero_copy, and load_from_bytes (and any small helpers they call); leave any save/write-specific helpers in rdb/save.rs. Create a rdb/mod.rs that declares pub mod load; pub mod save; and re-export necessary symbols (e.g., pub use load::load_from_bytes;) and update use paths in callers to reference crate::persistence::rdb::load::read_entry_zero_copy or the re-exports. Ensure visibility (pub(crate)/pub) is adjusted so tests and callers compile and update Cargo module imports accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@src/persistence/rdb.rs`:
- Around line 75-76: Replace the silent truncation of database indices when
serializing (the buf.write_all(&[db_idx as u8]) calls) with a checked conversion
using u8::try_from(...) and return a clear error on conversion failure; factor
this validation into a small helper so both the serialization site (where
DB_SELECTOR is written) and save_from_snapshot() reuse the same check and error
handling instead of allowing indices > u8::MAX to wrap.
- Around line 295-306: The loop that reads tags currently treats
cursor.read_exact(&mut tag).is_err() as a truncated tail warning and continues;
change this to fail hard by propagating an error (return Err) when read_exact
fails instead of logging and breaking, using the same preamble-specific error
message pattern already used elsewhere; update both the top-level tag loop (the
block referencing total_keys, EOF_MARKER and cursor.read_exact) and the
analogous code in load_from_bytes() so any short read before encountering
EOF_MARKER produces an immediate Err with a clear message about unexpected
EOF/truncated RDB rather than silently committing parsed keys.
---
Duplicate comments:
In `@src/persistence/rdb.rs`:
- Around line 361-907: The file exceeds the 1500-line limit—split persistence
logic into rdb/load.rs and rdb/save.rs and move load-related functions into
rdb/load.rs: count_entries_per_db, skip_entry, read_u32_raw, skip_bytes_field,
read_entry_zero_copy, and load_from_bytes (and any small helpers they call);
leave any save/write-specific helpers in rdb/save.rs. Create a rdb/mod.rs that
declares pub mod load; pub mod save; and re-export necessary symbols (e.g., pub
use load::load_from_bytes;) and update use paths in callers to reference
crate::persistence::rdb::load::read_entry_zero_copy or the re-exports. Ensure
visibility (pub(crate)/pub) is adjusted so tests and callers compile and update
Cargo module imports accordingly.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: bf82ee02-3dcb-4f7e-b188-f550e5328568
📒 Files selected for processing (3)
src/persistence/aof.rssrc/persistence/aof_manifest.rssrc/persistence/rdb.rs
✅ Files skipped from review due to trivial changes (1)
- src/persistence/aof.rs
🚧 Files skipped from review as they are similar to previous changes (1)
- src/persistence/aof_manifest.rs
Summary
Resurrects PR #37 as a fresh squash onto post-disk-offload
main, then fixes all correctness issues found in post-merge review and automated PR comments.appendonlydir/manifest + RDB base + RESP incr)BGREWRITEAOFwired for bothhandler_shardedandhandler_monoiopathsDatabase::{reserve, insert_for_load, recalculate_memory}) — 2–30× faster recoveryCompactValue::heap_string_{owned,vec_direct}eliminatesVec→Bytes→Vecround-trip during loadCommits
b975c46— Port additive content from PR feat: multi-part AOF persistence, BGREWRITEAOF, optimized RDB loader #37 on a fresh branch off post-disk-offload main. Drops obsolete accept-loop / SO_REUSEPORT / cfg-gate changes that disk-offload (feat: disk offload #43) superseded.ca2ec51— Fix six correctness issues from senior-rust-engineer review (P0: double-apply, parse-error resync; P1: manifest corruption, orphan cleanup, writer hang, reserve misuse).a4f51e2— Fix four issues from qodo + coderabbit review (P0: legacy double-replay on upgrade; P1: base RDB fsync, multi-part clears databases before load; P2: missing-base + non-empty-incr guard).a92a006— Remove dead zero-copy plumbing (deadread_bytes_zero_copy, unused_shared_bufparameter, unnecessaryBytes::copy_from_sliceallocation).All review comments addressed
ca2ec51replay_incr_respsilently resynced past parse errors via*-byte scanning (unsound —*inside bulk strings)ca2ec51restore_from_persistencereplaysappendonly.aof, then multi-part replays on top, double-applying on subsequent bootsinitialize_with_basecaptures legacy state as base seq 1; retireappendonly.aof→.legacy;clear()databases before multi-part replaya4f51e2AofManifest::loadreturnedNoneon corruption →initialize()overwrote corrupt manifest → data lossResult<Option<Self>, io::Error>; startup aborts withContextca2ec51advance()phases 1-3cleanup_orphans()sweeps stray files on manifest loadca2ec51Errca2ec51File::create + write_all + sync_data + renamea4f51e2replay_aofon MOON magic inserts into existing entriesclear()first; authoritative sourcea4f51e2Database::reserve()silently wiped populated tablesdebug_assert!(self.data.is_empty())ca2ec51a4f51e2#[allow(dead_code)]onread_bytes_zero_copywithout justificationa92a006Bytes::copy_from_slice(data)allocation inload_from_bytes_shared_bufparameter also removed fromread_entry_zero_copya92a006Rewrite ordering (fix 1) in detail
Invariant: any write captured in the new base is NOT in the new incr (handlers blocked between drain and snapshot), and any write NOT in the new base IS in the new incr (queued after lock release). Creates a brief global write pause during snapshot — acceptable cost for correctness.
First-upgrade path
Dropped from PR #37 (obsolete post-disk-offload)
src/shard/event_loop.rsmonoio::spawn accept task — main has io_uring multishot acceptsrc/server/listener.rsSO_REUSEPORT fix — disk-offload disabled per-shard listener differentlysrc/shard/dispatch.rs+tests/*.rscfg gates — already in mainKnown limitations
Validation (moon-dev OrbStack, Linux aarch64)
cargo fmt --checkcargo clippy --release -- -D warningscargo clippy --no-default-features --features runtime-tokio,jemalloc -- -D warningscargo test --release --lib— 1858 passcargo test --no-default-features --features runtime-tokio,jemalloc --lib— 1877 passTwo release-lib failures (
test_inline_set,test_inline_set_with_aof) andtests/replication_test.rsbuild errors reproduce on cleanmain— pre-existing, not introduced here.Manual smoke tests (all pass)
kill -9mid-rewrite)INCRduring BGREWRITEAOFappendonly.aof.legacy, all keys survive next boot without BGREWRITEAOFseq 0)Summary by CodeRabbit
New Features
Improvements