Skip to content

fix(persistence): multi-shard AOF gate + per-shard AOF foundation (Option B step 1)#129

Open
pilotspacex-byte wants to merge 16 commits into
mainfrom
fix/p0-multishard-aof-gate
Open

fix(persistence): multi-shard AOF gate + per-shard AOF foundation (Option B step 1)#129
pilotspacex-byte wants to merge 16 commits into
mainfrom
fix/p0-multishard-aof-gate

Conversation

@pilotspacex-byte
Copy link
Copy Markdown
Contributor

@pilotspacex-byte pilotspacex-byte commented May 26, 2026

Summary

Three commits on this branch:

  1. e0bb658 — P0 gate (P0-FIX-01a/b). BGREWRITEAOF refusal + startup refusal for --shards >= 2 + --appendonly yes, with --unsafe-multishard-aof escape hatch. Empirical loss matrix below.
  2. 7b61898 — docs/changelog. v0.1.12 launch posture, 3-way Moon/Redis/Valkey comparison, alpha-leak qualifiers, missing v0.1.9/v0.1.10/v0.1.12 CHANGELOG entries.
  3. 3bb4790 — per-shard AOF manifest format (Option B step 1). First step of the 9-step RFC at tmp/rfc-per-shard-aof-v02.md that eventually lifts the gate in step 9.

Why the gate (commit 1)

Empirical re-verification on HEAD 6e49050 (2026-05-26) found the durability bug is in the multi-shard AOF path itself, not the rewrite path the older bug memory blamed:

Configuration Recovered
--shards 1 --appendonly yes --appendfsync always (control) 5000 / 5000
--shards 1 --disk-offload enable --appendonly yes (control) 12714 / 12714
--shards 2 --disk-offload enable --appendonly yes (BGREWRITEAOF + SIGKILL) 7892 / 12662 (-38%)
--shards 2 --disk-offload enable --appendonly yes (plain SIGKILL) 7888 / 12655 (-38%)
--shards 2 --disk-offload enable --appendonly yes --appendfsync always 2474 / 5000 (-50%)
--shards 2 --disk-offload disable --appendonly yes --appendfsync always 2453 / 5000 (-50%)

Root cause investigation in tmp/P0-INVEST-01-multishard-aof-rootcause.md identified two complementary bugs:

  • H2 (structural): src/main.rs:562-563 literally skips multi-part AOF replay for num_shards >= 2. Closed by step 4.
  • H1 (in-flight): try_send(AofMessage::Append) is fire-and-forget; +OK returns before the writer thread fsyncs; channel buffer is lost on SIGKILL. Closed by step 7.

Why step 1 lands here (commit 3)

The decision tree:

  1. Build Option B (per-shard AOF, 16d + 1wk soak) vs make the gate permanent → user chose Option B.
  2. First obstacle is that the current AofManifest has no place to describe per-shard segments. Step 1 introduces that structure additively.

Strictly additive at the file-system level:

  • v1 manifests continue to load as AofLayout::TopLevel single-shard (shard_id=0).
  • No in-place migration triggered. No behavior change for any existing deployment.
  • The --unsafe-multishard-aof gate from commit 1 remains the load-bearing safety net until step 9.

What step 1 adds

  • AofLayout { TopLevel, PerShard } discriminator.
  • ShardManifest { shard_id: u16, max_lsn: u64 }max_lsn semantics deferred to step 3.
  • AofManifest.layout + AofManifest.shards: Vec<ShardManifest>.
  • initialize_multi(dir, num_shards) — v2 PerShard constructor.
  • shard_dir, shard_base_path[_seq], shard_incr_path[_seq] — per-shard path helpers.
  • verify_shard_count(expected) — returns the verbatim RFC § 3 error.
  • is_legacy_top_level_layout(dir) — pure detection (no side effects).
  • migrate_top_level_to_per_shard() — explicit in-place rename for RFC § 5 case 1; idempotent.
  • global_max_lsn() — computed accessor, not stored (avoids drift with the per-shard records).

Manifest text format

v1 (unchanged, full backcompat):

seq <N>
base moon.aof.<N>.base.rdb
incr moon.aof.<N>.incr.aof

v2 (new):

version 2
seq <N>
shards <K>
shard 0 max_lsn <lsn0>
shard 1 max_lsn <lsn1>
...

Paths are derived from shard_id + seq rather than stored explicitly — the layout is canonical, so a stored path could drift from the computed location.

Test plan

  • Unit test test_bgrewriteaof_sharded_refuses_under_unsafe_config covers gate-on + gate-off paths.
  • Live OrbStack boundary tests for the P0 gate:
    • PASS --shards 1 + AOF starts cleanly
    • PASS --shards 2 + AOF + --unsafe-multishard-aof starts
    • PASS --shards 2 + --appendonly no starts
    • REFUSED --shards 2 + AOF without escape hatch (exit code 2)
  • 9 new step-1 unit tests (in aof_manifest.rs tests_v2 module):
    • v1_manifest_loads_as_top_level_single_shard
    • v2_manifest_round_trips
    • verify_shard_count_emits_rfc_error_verbatim
    • migrate_top_level_to_per_shard_moves_files_and_rewrites_manifest
    • global_max_lsn_returns_max_across_shards
    • is_legacy_top_level_layout_detects_v1_files
    • is_legacy_top_level_layout_returns_false_for_v2
    • parse_v2_rejects_shard_count_mismatch_in_file
    • parse_v2_rejects_non_contiguous_shard_ids
  • 21 existing persistence::aof tests still green; library cargo check --no-default-features --features runtime-tokio,jemalloc clean.
  • CI green (Rust 1.94, both feature sets, clippy, audit-unsafe, audit-unwrap).
  • CRASH-01-LITE 7-config matrix as #[ignore] tests (step 8 of the RFC).

Operator impact

  • Existing --shards >= 2 + --appendonly yes deployments fail to start after upgrade. Error message is actionable: pick --shards 1, --appendonly no, or --unsafe-multishard-aof. Runbook walks each option.
  • Single-shard deployments unaffected.
  • --appendonly no (any shard count) unaffected.

Next steps on this branch

Per the RFC implementation table (16 days + 1 wk soak):

Step Effort Blocked by
2 — Per-shard AofWriter task; aof_tx: Vec<Sender> 3d 1
3 — LSN tagging in AofMessage::Append 1d 2 + v0.2 S1.3
4 — Replace Multi-part AOF skipped skip branch (closes H2) 2d 1, 2, 3
5 — Cross-shard ordering merge (TXN + SCRIPT) 2d 4
6 — moon migrate-aof subcommand (RFC § 5 case 2) 2d 1
7 — AppendSync { bytes, ack } rendezvous (closes H1) 3d 2 (parallel with 3-6)
8 — CRASH-01-LITE matrix in tests/crash_matrix.rs 1d 4
9 — Lift --unsafe-multishard-aof gate 1h 8 green

Summary by CodeRabbit

  • New Features

    • Server now refuses unsafe multi-shard + appendonly configs unless explicitly acknowledged via a new CLI flag
    • Added multi-shard AOF manifest/layout support and per-shard AOF writer behavior
  • Documentation

    • Updated README (status badges, production-readiness matrix, benchmarks, quick-start TTL examples)
    • Added runbook describing multi-shard AOF safety and recovery procedures
  • Tests

    • Test harnesses updated to opt out of unsafe multi-shard AOF by default

Review Change Stack

TinDang97 added 2 commits May 26, 2026 22:19
…OF (P0-FIX-01a/b)

Empirical re-verification on HEAD 6e49050 (2026-05-26) found that
`--shards >= 2 + --appendonly yes` silently loses ~50 % of writes on
SIGKILL, independent of `--appendfsync` and `--disk-offload`. The
original 33-day-old bug memory had narrowed the loss to
BGREWRITEAOF + disk-offload; the discriminator matrix below shows the
bug is in the multi-shard AOF durability path itself.

| Configuration                                                                  | Recovered      |
|--------------------------------------------------------------------------------|----------------|
| --shards 1 --appendonly yes --appendfsync always                               | 5000 / 5000    |
| --shards 1 --disk-offload enable --appendonly yes                              | 12714 / 12714  |
| --shards 2 --disk-offload enable --appendonly yes (BGREWRITEAOF + SIGKILL)     | 7892 / 12662   |
| --shards 2 --disk-offload enable --appendonly yes (plain SIGKILL, no rewrite)  | 7888 / 12655   |
| --shards 2 --disk-offload enable --appendonly yes --appendfsync always         | 2474 / 5000    |
| --shards 2 --disk-offload disable --appendonly yes --appendfsync always        | 2453 / 5000    |

Two complementary gates ship in this commit; both lift in v2.0 when
multi-shard AOF replay walks every shard's segment manifest on
recovery (see docs/runbooks/multi-shard-aof-rewrite.md):

P0-FIX-01a (defence-in-depth, command-level)
  bgrewriteaof_start_sharded refuses with a clear ERR when the
  multi-shard + disk-offload + AOF combo is active. Gated by
  MULTI_SHARD_AOF_REWRITE_UNSAFE: AtomicBool, set once in main.rs.
  Unit test test_bgrewriteaof_sharded_refuses_under_unsafe_config
  covers gate-on + gate-off paths and asserts the gate does not
  flip AOF_REWRITE_IN_PROGRESS.

P0-FIX-01b (load-bearing, startup)
  main.rs aborts with exit code 2 if `--shards >= 2 + --appendonly
  yes` without `--unsafe-multishard-aof`. The new flag is the
  explicit escape hatch for cache-only deployments where the loss
  window is acceptable. Boundary tests verified live on OrbStack:
    PASS  --shards 1 + AOF starts cleanly (no false positives)
    PASS  --shards 2 + AOF + --unsafe-multishard-aof starts
    PASS  --shards 2 + --appendonly no starts (cache-only)
    REFUSED  --shards 2 + AOF without escape hatch

Files
  src/command/persistence.rs  + gate + unit test
  src/main.rs                 + startup refusal + BGREWRITEAOF gate set
  src/config.rs               + --unsafe-multishard-aof flag
  docs/runbooks/multi-shard-aof-rewrite.md  + operator runbook

Reproducer scripts live in tmp/ (gitignored): p0-repro.sh,
p0-no-rewrite.sh, p0-always.sh, p0-multishard-no-offload.sh,
p0-shards1-exact.sh. Encoding them as #[ignore] crash-matrix tests
is tracked as CRASH-01-LITE in the ship plan.

Multi-shard masters with AOF are now explicitly cache-only in v1.0.
Root-cause investigation P0-INVEST-01 (1-2 wk) is the prerequisite
to lifting the startup gate in v2.0.

author: Tin Dang
…lpha-leak qualifiers

README
  * Bumps version badge v0.1.10 → v0.1.12 and replaces the
    "experimental" status with "single-node production-grade" plus a
    "cluster v0.2 alpha" badge, mirroring the new ship plan posture.
  * Replaces the blanket experimental warning with a "production-grade
    architecture, pre-1.0 maturity" framing that points at the new
    Production readiness section for the honest GA matrix.
  * Reconciles platform support — macOS is a supported development
    platform per the PRODUCTION-CONTRACT Tier table; production
    deployments target Linux.
  * Adds a Valkey 9.1.0 column to the peak-throughput tables (honest
    "not yet benched" placeholders) and a new Moon vs Redis vs Valkey
    section: a three-way comparison table plus "when to choose"
    guidance, all traced to docs/comparison-valkey.md.
  * Rewrites the trailing roadmap into a Production readiness section
    with what's GA today, what's not, operator gotchas, and a roadmap
    table.

  Alpha-leak qualifiers added so v0.1.12 framing does not implicitly
  promise v0.2.0-alpha features:

  * Quick-start HEXPIRE / HTTL lines annotated "(v0.2.0-alpha; build
    from main)".
  * Hash-field TTL benchmark section retitled "v0.2.0-alpha preview"
    with a callout that the latest tag (v0.1.12) does not include it.
  * "What's already in main" list split into v0.1.12 (latest tag,
    single-node production-grade) and v0.2.0-alpha additions
    (hash-field TTL, PITR, CDC, multi-node cluster soak).
  * Comparison-table row for hash-field TTL qualified as
    "v0.2-alpha".

CHANGELOG
  * Adds v0.1.12 entry covering Phase 189 (DashTable pre-sizing +
    --initial-keyspace-hint, PERF-07/09), Phase 190 (moon_memory_bytes
    Prometheus gauge with 7 subsystem kinds, MEMORY DOCTOR schema,
    resident_bytes trait), Phase 191 (jemalloc narenas:8 cap,
    --memory-arenas-cap, mimalloc-alt feature, OPERATOR-GUIDE Memory
    Accounting), Phase 177 dispatch observability, text-index default
    feature, SDK validate.{py,rs}, Python SDK graph parser fix, CI
    hygiene.
  * Adds v0.1.10 entry (single-shard PSYNC2 wired end-to-end).
  * Adds v0.1.9 Lunaris Retriever Gap Closure entry.
  * Consolidates three orphan Unreleased blocks under v0.1.3.
  * Sharpens v0.2.0-alpha entry with TL;DR headline capabilities
    (hash-field TTL stack, PITR, CDC, multi-node cluster soak).
  * Fixes version ordering so v0.1.12 sits above v0.1.11.

No code changes; this is purely documentation framing aligned to the
v1.0-rc1 single-node ship plan in tmp/SHIP-PLAN-v1.0-rc1-single-node.md.

author: Tin Dang
@qodo-code-review
Copy link
Copy Markdown

Qodo reviews are paused for this user.

Troubleshooting steps vary by plan Learn more →

On a Teams plan?
Reviews resume once this user has a paid seat and their Git account is linked in Qodo.
Link Git account →

Using GitHub Enterprise Server, GitLab Self-Managed, or Bitbucket Data Center?
These require an Enterprise plan - Contact us
Contact us →

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 26, 2026

Warning

Review limit reached

@TinDang97, we couldn't start this review because you've reached your PR review rate limit.

More reviews will be available in 3 minutes and 1 second. Learn how PR review limits work.

Your organization has run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After more reviews become available, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available.

Please see our Fair Usage Limits Policy for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 84a55a2f-5d0b-4f98-a45f-88fda6865899

📥 Commits

Reviewing files that changed from the base of the PR and between 5004f4e and e46dc4e.

📒 Files selected for processing (7)
  • src/persistence/aof.rs
  • src/replication/state.rs
  • src/server/conn/blocking.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn/handler_sharded/mod.rs
  • src/server/conn/handler_single.rs
  • src/server/conn/tests.rs
📝 Walkthrough

Walkthrough

This PR enforces a startup/runtime safety gate for a known multi-shard AOF rewrite durability bug (CLI override added), adds v1/v2 AOF manifest layouts with migration, implements an AOF writer-pool and per-shard writer task, migrates connection/handler wiring to the pool, and updates tests, runbook, README, and CHANGELOG.

Changes

Multi-Shard AOF Safety Gates

Layer / File(s) Summary
Release notes and operational runbook
CHANGELOG.md, docs/runbooks/multi-shard-aof-rewrite.md
Reorganizes CHANGELOG into dated releases, adds v0.2.0-alpha/v0.1.12 sections, and adds a runbook documenting startup and BGREWRITEAOF refusal gates, measured data-loss characteristics, recovery steps, telemetry signals, and planned removal.
Configuration flag and startup enforcement
src/config.rs, src/main.rs
Adds --unsafe-multishard-aof to ServerConfig (default false) and implements an early startup refusal when --shards >= 2 + appendonly yes without the override (exits code 2 and prints a data-loss warning).
Runtime BGREWRITEAOF command gating
src/command/persistence.rs, src/main.rs
Introduces MULTI_SHARD_AOF_REWRITE_UNSAFE AtomicBool; maps AOF pool send errors to RESP frames; bgrewriteaof_start_sharded refuses when the gate is set; main sets the gate at runtime for the unsafe config and logs warnings.
Gate validation tests
src/command/persistence.rs
Adds a mutex-protected unit test asserting the unsafe gate returns the documented refusal and keeps AOF_REWRITE_IN_PROGRESS unset, then restores prior global state.
AOF manifest multi-layout and migration
src/persistence/aof_manifest.rs
Adds AofLayout (TopLevel/PerShard) and ShardManifest, extends AofManifest with layout/shards, implements layout-aware path helpers, v1/v2 parse & write, orphan cleanup, in-place TopLevel→PerShard migration with rollback, initialize_multi, verification helpers, and extensive v2 tests.
AOF writer pool & per-shard writer task
src/persistence/aof.rs
Introduces AofWriterPool (top-level/per-shard constructors, routing, try_send_append/try_send_rewrite, broadcast_shutdown) and per_shard_aof_writer_task (PerShard incremental AOF processing with fsync policies and cancellation); includes unit tests for routing, rewrite acceptance/rejection, and shutdown.
Connection context and handler migration
src/server/conn/core.rs, src/server/conn_state.rs, src/server/conn/handler_monoio/mod.rs, src/shard/conn_accept.rs, src/shard/event_loop.rs, src/server/listener.rs, src/server/embedded.rs
Adds optional aof_pool: Option<Arc<AofWriterPool>> to connection context/state, threads a top-level AofWriterPool into shard and connection startup, and migrates handlers/inline-dispatch/write paths to call aof_pool.try_send_append(shard_id, bytes) including cross-shard batching bookkeeping to route remote-write appends to the owning shard.
Handlers, inline dispatch, and tests
src/server/conn/handler_single.rs, src/server/conn/handler_sharded/mod.rs, src/server/conn/blocking.rs, src/server/conn/tests.rs
Updates BGREWRITEAOF/SWAPDB/SUBSCRIBE/graph WAL and inline dispatch to use the pool API and updates tests to construct AofWriterPool::top_level(...) where needed; many integration tests now explicitly set unsafe_multishard_aof: false.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • pilotspace/moon#71: Overlaps inline-dispatch AOF append logic and may conflict with handler append changes.
  • pilotspace/moon#100: Touches BGREWRITEAOF / AOF rewrite concurrency and SWAPDB interactions with rewrite flags.
  • pilotspace/moon#63: Prior work on sharded AOF rewrite paths that this PR extends and guards.

Suggested labels: enhancement

"🐰 I hop where shards and AOF collide,
A careful flag keeps unsafe rewrites aside.
Manifests split and writers find their lane,
Docs, tests, and pools to guide the terrain.
Hooray — safer starts, one careful stride."

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Title check ✅ Passed The title clearly summarizes the main change: adding a multi-shard AOF gate and laying the foundation for per-shard AOF architecture (Option B step 1).
Description check ✅ Passed The PR description is comprehensive, covering three commits with detailed explanations of the gate, manifest format changes, test plan, operator impact, and next steps. All sections are well-documented.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/p0-multishard-aof-gate

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
README.md (1)

229-233: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Quick-start production flags now conflict with startup safety gate.

This command should fail under the new startup refusal (--shards >= 2 + --appendonly yes without override), so the README is currently instructing an invalid config.

Suggested README correction
 # Or with production flags
 ./target/release/moon \
   --port 6379 \
-  --shards 8 \
-  --appendonly yes --appendfsync everysec \
+  --shards 1 \
+  --appendonly yes --appendfsync everysec \
   --maxmemory 8g --maxmemory-policy allkeys-lfu
+
+# Multi-shard cache-only alternative
+# ./target/release/moon --shards 8 --appendonly no ...
+
+# Unsafe override (not recommended; known durability risk)
+# ./target/release/moon --shards 8 --appendonly yes --unsafe-multishard-aof ...
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@README.md` around lines 229 - 233, The README's quick-start example uses
conflicting flags (--shards 8 together with --appendonly yes) which will trigger
the new startup safety gate and refuse to start; update the example command
under the block that contains the flags (--port, --shards, --appendonly,
--appendfsync, --maxmemory, --maxmemory-policy) to a valid configuration (e.g.,
set --shards 1 or remove/disable --appendonly) or explicitly show the required
override flag and text that allows bypassing the safety gate (add a clear
placeholder like --<startup-override> if an override exists) so the documented
command actually starts successfully.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/runbooks/multi-shard-aof-rewrite.md`:
- Around line 10-16: Three fenced code blocks in
docs/runbooks/multi-shard-aof-rewrite.md are missing language identifiers
(markdownlint MD040). Edit the three blocks shown (the startup refusal block
starting "REFUSING TO START: --shards 2 + --appendonly yes...", the BGREWRITEAOF
interaction block containing "BGREWRITEAOF" and "(error) ERR BGREWRITEAOF...",
and the final explanatory block starting "BGREWRITEAOF gated for this
config...") and add the language tags: use ```text for the two plain-text blocks
and ```redis for the BGREWRITEAOF example so markdownlint MD040 is satisfied.

In `@src/main.rs`:
- Around line 273-289: The --check-config path currently returns before the
multishard-AOF safety gate runs, so add the same refusal logic used at startup
into the check_config branch: detect the condition (num_shards >= 2 &&
config.appendonly == "yes" && !config.unsafe_multishard_aof) inside the
check_config handling and print the identical error message and exit non‑zero
(or return an error) so preflight fails the same way real startup would; use the
same symbols/strings (num_shards, config.appendonly,
config.unsafe_multishard_aof) and the same message text used near the startup
gate to keep behavior consistent.

---

Outside diff comments:
In `@README.md`:
- Around line 229-233: The README's quick-start example uses conflicting flags
(--shards 8 together with --appendonly yes) which will trigger the new startup
safety gate and refuse to start; update the example command under the block that
contains the flags (--port, --shards, --appendonly, --appendfsync, --maxmemory,
--maxmemory-policy) to a valid configuration (e.g., set --shards 1 or
remove/disable --appendonly) or explicitly show the required override flag and
text that allows bypassing the safety gate (add a clear placeholder like
--<startup-override> if an override exists) so the documented command actually
starts successfully.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: c11a2da9-b702-43f0-91ac-59786ae9a841

📥 Commits

Reviewing files that changed from the base of the PR and between 6e49050 and 7b61898.

📒 Files selected for processing (6)
  • CHANGELOG.md
  • README.md
  • docs/runbooks/multi-shard-aof-rewrite.md
  • src/command/persistence.rs
  • src/config.rs
  • src/main.rs

Comment on lines +10 to +16
```
REFUSING TO START: --shards 2 + --appendonly yes has a known data-loss
bug on SIGKILL (~50 % loss verified 2026-05-26). Fix: use --shards 1,
or pass --appendonly no for cache-only deployments, or pass
--unsafe-multishard-aof to acknowledge the risk and start anyway. See
docs/runbooks/multi-shard-aof-rewrite.md.
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Add fenced code languages to satisfy markdownlint MD040.

These three fenced blocks are missing language identifiers and will keep markdownlint warnings active.

Suggested doc-only fix
-```
+```text
 REFUSING TO START: --shards 2 + --appendonly yes has a known data-loss
 ...
-```
+```

-```
+```redis
 > BGREWRITEAOF
 (error) ERR BGREWRITEAOF is unsafe with --shards >= 2 + --disk-offload enable
 ...
-```
+```

-```
+```text
 BGREWRITEAOF gated for this config (known data-loss path; see
 docs/runbooks/multi-shard-aof-rewrite.md). Use --shards 1 or
 --disk-offload disable to re-enable rewrite.
-```
+```

Also applies to: 20-26, 88-92

🧰 Tools
🪛 markdownlint-cli2 (0.22.1)

[warning] 10-10: Fenced code blocks should have a language specified

(MD040, fenced-code-language)

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/runbooks/multi-shard-aof-rewrite.md` around lines 10 - 16, Three fenced
code blocks in docs/runbooks/multi-shard-aof-rewrite.md are missing language
identifiers (markdownlint MD040). Edit the three blocks shown (the startup
refusal block starting "REFUSING TO START: --shards 2 + --appendonly yes...",
the BGREWRITEAOF interaction block containing "BGREWRITEAOF" and "(error) ERR
BGREWRITEAOF...", and the final explanatory block starting "BGREWRITEAOF gated
for this config...") and add the language tags: use ```text for the two
plain-text blocks and ```redis for the BGREWRITEAOF example so markdownlint
MD040 is satisfied.

Comment thread src/main.rs
Comment on lines +273 to +289
// P0-FIX-01b: refuse to start under the known durability bug
// (`shards >= 2 + appendonly yes` loses ~50 % of writes on SIGKILL,
// verified 2026-05-26 on HEAD `6e49050`; reproducer in
// `tmp/p0-no-rewrite.sh` and `tmp/p0-always.sh`). The bug is
// independent of `--appendfsync` and `--disk-offload` settings. An
// operator can override via `--unsafe-multishard-aof` if the
// deployment is cache-only and the loss window is acceptable.
if num_shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof {
eprintln!(
"REFUSING TO START: --shards {num_shards} + --appendonly yes has a known data-loss \
bug on SIGKILL (~50 % loss verified 2026-05-26). Fix: use --shards 1, or pass \
--appendonly no for cache-only deployments, or pass --unsafe-multishard-aof to \
acknowledge the risk and start anyway. See \
docs/runbooks/multi-shard-aof-rewrite.md."
);
std::process::exit(2);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Mirror this refusal in --check-config validation.

Line 143 returns from --check-config before Line 280 runs, so preflight can pass a config that real startup immediately refuses. Please enforce the same gate in the check_config branch.

Suggested patch
@@
     if config.check_config {
+        if config.shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof {
+            return Err(anyhow::anyhow!(
+                "--shards {} + --appendonly yes is refused unless --unsafe-multishard-aof is set (or use --shards 1 / --appendonly no)",
+                config.shards
+            ));
+        }
         // Validate shard count is reasonable
         if config.shards == 0 {
             return Err(anyhow::anyhow!("--shards must be >= 1"));
         }
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/main.rs` around lines 273 - 289, The --check-config path currently
returns before the multishard-AOF safety gate runs, so add the same refusal
logic used at startup into the check_config branch: detect the condition
(num_shards >= 2 && config.appendonly == "yes" && !config.unsafe_multishard_aof)
inside the check_config handling and print the identical error message and exit
non‑zero (or return an error) so preflight fails the same way real startup
would; use the same symbols/strings (num_shards, config.appendonly,
config.unsafe_multishard_aof) and the same message text used near the startup
gate to keep behavior consistent.

First implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Closes Hypothesis 2 of the
P0-INVEST-01 root cause: multi-part AOF replay is currently skipped
for num_shards >= 2 because there is no manifest structure that can
describe per-shard segments. This commit lays the foundation by
introducing a manifest v2 format that carries per-shard metadata; the
writer, replay, and lift-the-gate work follows in steps 2-9.

The change is purely additive at the file-system level — v1 manifests
continue to load as TopLevel single-shard with shard_id=0, no
in-place migration is triggered, and no behavior is altered for any
existing deployment. The escape-hatch gate
(--unsafe-multishard-aof) from commit ce05fa9 remains the load-bearing
safety net until step 9 lands.

New types
  AofLayout { TopLevel, PerShard }
    Discriminates v1 top-level layout from v2 per-shard layout.
    A directory holds one layout exclusively — never a mix.

  ShardManifest { shard_id: u16, max_lsn: u64 }
    Per-shard entry. The max_lsn semantics are deliberately deferred
    to step 3 (LSN tagging); until then it is always 0 and recovery
    does not consult it. This avoids locking in an LSN namespace
    contract before v0.2 S1.3 (REPLCONF ACK / WAIT) lands and
    clarifies what LSN MEANS in the multi-shard AOF context.

AofManifest extensions
  + layout: AofLayout
  + shards: Vec<ShardManifest>      // length == num_shards
  + initialize_multi(dir, num_shards) — v2 PerShard constructor
  + shard_dir / shard_base_path / shard_incr_path (+ _seq variants)
  + global_max_lsn() — computed accessor, not stored (per advisor's
    note: a stored mirror invites drift with the per-shard records)
  + verify_shard_count(expected) — returns the exact RFC § 3 verbatim
    error string ("ERR shard count changed (manifest=N, config=M)…")
    so operator-facing wording is uniform across boot, BGREWRITEAOF,
    and the migration tool.
  + is_legacy_top_level_layout(dir) — pure detection helper for
    callers that want to decide whether to migrate. NOT called from
    load() — side effects belong in explicit migrate_* methods.
  + migrate_top_level_to_per_shard() — in-place rename for RFC § 5
    case 1 (single-shard v0.1.x → v2 single-shard). Idempotent.
    Case 2 (legacy multi-shard with the gate engaged) ships in step
    6 as the `moon migrate-aof` subcommand.

Manifest text format
  v1 (unchanged, preserves backcompat):
    seq <N>
    base moon.aof.<N>.base.rdb
    incr moon.aof.<N>.incr.aof

  v2 (new):
    version 2
    seq <N>
    shards <K>
    shard 0 max_lsn <lsn0>
    shard 1 max_lsn <lsn1>
    ...

  Paths are derived from shard_id + seq rather than stored explicitly.
  The layout is canonical, so a stored path could drift from the
  computed location and silently shadow real files on disk.

Tests (9 new, in src/persistence/aof_manifest.rs tests_v2 module)
  PASS  v1_manifest_loads_as_top_level_single_shard
  PASS  v2_manifest_round_trips
  PASS  verify_shard_count_emits_rfc_error_verbatim
  PASS  migrate_top_level_to_per_shard_moves_files_and_rewrites_manifest
  PASS  global_max_lsn_returns_max_across_shards
  PASS  is_legacy_top_level_layout_detects_v1_files
  PASS  is_legacy_top_level_layout_returns_false_for_v2
  PASS  parse_v2_rejects_shard_count_mismatch_in_file
  PASS  parse_v2_rejects_non_contiguous_shard_ids

All 21 existing persistence::aof tests remain green. cargo check
(runtime-tokio,jemalloc) clean.

What this does NOT do (in scope for later steps)
  Step 2 — per-shard AofWriter task; aof_tx becomes Vec<Sender>
  Step 3 — LSN tagging in AofMessage::Append (after v0.2 S1.3)
  Step 4 — Replace `Multi-part AOF skipped` skip branch (closes H2)
  Step 5 — Cross-shard ordering merge (TXN + SCRIPT)
  Step 6 — `moon migrate-aof` subcommand for case 2 migration
  Step 7 — AppendSync rendezvous for appendfsync=always (closes H1)
  Step 8 — CRASH-01-LITE matrix in tests/crash_matrix.rs
  Step 9 — Lift --unsafe-multishard-aof gate

Refs
  tmp/rfc-per-shard-aof-v02.md (RFC)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (root cause)
  PR #129 (P0 escape-hatch gate this work lifts)

author: Tin Dang
@pilotspacex-byte pilotspacex-byte changed the title fix(persistence): refuse multi-shard AOF at startup + gate BGREWRITEAOF (P0-FIX-01a/b) fix(persistence): multi-shard AOF gate + per-shard AOF foundation (Option B step 1) May 27, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/persistence/aof_manifest.rs`:
- Around line 669-775: migrate_top_level_to_per_shard and initialize_multi are
making AofLayout::PerShard visible before the rest of the I/O path
(replay_multi_part, manifest.base_path/incr_path, manifest.incr_path(),
manifest.advance()) understands per-shard locations; this causes subsequent
boots/writes to look in the wrong place. Fix by deferring setting self.layout =
AofLayout::PerShard (and any manifest persisted as PerShard via
write_manifest()) until after you have created/moved the per-shard files and
ensured callers will open the new paths: in migrate_top_level_to_per_shard move
the layout assignment to after the rename/create operations and only call
write_manifest() once layout is set; in initialize_multi avoid persisting a
PerShard manifest or exposing PerShard paths until all shard dirs/files are
created (set layout and call write_manifest() last). Alternatively, make
replay_multi_part, base_path(), incr_path(), manifest.incr_path(), and
manifest.advance() layout-aware so they resolve per-shard paths immediately;
pick one approach and apply it consistently.
- Around line 688-717: After renaming old_base→new_base and optionally
old_incr→new_incr, add a rollback guard so any subsequent error (including
write_manifest() failing) moves the files back and restores self.layout to
AofLayout::TopLevel; implement this by tracking that the base (and possibly
incr) have been moved and on any error attempt std::fs::rename(new_base,
old_base) and, if incr was moved, std::fs::rename(new_incr, old_incr) (or remove
created new_incr if it was created), then set self.layout = AofLayout::TopLevel
before returning the error. Ensure the guard runs for failures after the first
rename but not if everything succeeds (write_manifest() completes), and
reference the existing symbols new_base, new_incr, old_base, old_incr,
self.layout, and write_manifest() to locate where to add the rollback.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5a131377-fab2-4cd3-8f99-12ed9fc7f9ff

📥 Commits

Reviewing files that changed from the base of the PR and between 7b61898 and 3bb4790.

📒 Files selected for processing (1)
  • src/persistence/aof_manifest.rs

Comment thread src/persistence/aof_manifest.rs
Comment thread src/persistence/aof_manifest.rs Outdated
TinDang97 added 6 commits May 27, 2026 10:54
Second implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Step 2 is split into six sub-steps
(2a-2f) to keep the blast radius reviewable; this commit ships 2a.

2a is purely additive — a new public type and tests, zero call-site
changes. The pool's API mirrors the patterns the call sites already
use (try_send append, broadcast Shutdown), so steps 2c-2f reduce to a
mechanical type-plumbing pass.

New type
  AofWriterPool {
      senders: Vec<MpscSender<AofMessage>>,
      layout:  AofLayout,
  }

  Constructors:
    top_level(sender) -> Arc<Self>
      One sender; every shard multiplexes onto it. Used for legacy v1
      deployments and `--shards 1` v2 deployments.

    per_shard(senders) -> Arc<Self>
      One sender per shard. senders[i] MUST be the writer task that
      owns appendonlydir/shard-{i}/. debug_assert rejects a length-1
      vector (use top_level instead).

  Dispatch:
    sender(shard_id) -> &MpscSender<AofMessage>
      TopLevel: ignores shard_id, returns senders[0].
      PerShard: returns senders[shard_id]. debug_assert on out-of-range.

    try_send_append(shard_id, bytes)
      Convenience for the `let _ = tx.try_send(AofMessage::Append(bytes))`
      pattern at 12 call sites today. Fire-and-forget, matches current
      hot-path semantics (H1 fix is step 7's AppendSync rendezvous).

    try_send_rewrite(msg) -> Result<(), AofPoolSendError>
      Only legal for TopLevel pools; PerShard rejects with
      AofPoolSendError::RewriteUnsupportedInPerShard. BGREWRITEAOF in
      the per-shard layout becomes a per-shard operation in step 6 —
      the legacy single-writer rewrite enum variant has no meaning
      once the writer is one-per-shard.

    broadcast_shutdown()
      Sends Shutdown to every writer. Used by orchestrated shutdown
      in main.rs / embedded.rs (wired in step 2f).

New error type
  AofPoolSendError {
      RewriteUnsupportedInPerShard,
      SendFailed,
  }

Tests (5 new, in src/persistence/aof.rs pool_tests module)
  PASS  top_level_pool_routes_all_shards_to_writer_zero
  PASS  per_shard_pool_routes_each_shard_to_its_own_writer
  PASS  per_shard_pool_rejects_rewrite_with_explicit_error
  PASS  top_level_pool_accepts_rewrite
  PASS  broadcast_shutdown_reaches_every_writer

All 21 existing persistence::aof tests + 9 manifest tests from step 1
remain green (26 total in persistence::aof). cargo check + clippy
(runtime-tokio,jemalloc) clean.

What this does NOT do (in scope for later sub-steps)
  Step 2b — per-shard writer task body (reads from
            manifest.shard_incr_path(shard_id) for PerShard,
            manifest.incr_path() for TopLevel)
  Step 2c — type plumbing: aof_tx: Option<MpscSender> →
            aof_pool: Option<Arc<AofWriterPool>> in conn_state.rs
            and conn/core.rs
  Step 2d — handler_monoio call sites use ctx.aof_pool.sender(ctx.shard_id)
  Step 2e — handler_sharded call sites (same pattern)
  Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) build
            the pool via top_level() or per_shard() based on layout

Refs
  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
  PR #129 (P0 escape-hatch gate this work lifts in step 9)
  Commit 3bb4790 (step 1 — manifest v2 format)

author: Tin Dang
Third implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Adds the per-shard writer task body as
an additive function alongside the existing `aof_writer_task`. Zero
call sites changed in this commit — wiring lands in step 2f.

New function
  per_shard_aof_writer_task(rx, base_dir, shard_id, fsync, cancel)

  One instance is spawned per shard in PerShard layout. Each instance
  owns appendonlydir/shard-{shard_id}/moon.aof.{seq}.incr.aof
  exclusively, so there is no per-file locking. Mirrors the production
  monoio path of the existing aof_writer_task (60s bounded wait for
  manifest, hard fail on corrupt manifest, per-fsync-policy cadence).

  Differences from aof_writer_task (TopLevel):
  - Opens manifest.shard_incr_path(shard_id) instead of
    manifest.incr_path(). Defensive `create_dir_all` of the parent
    `shard-{N}/` directory in case a manual deletion or older binary
    left it missing.
  - Rejects Rewrite/RewriteSharded variants with a `warn!` and drops
    the message. The legacy single-writer rewrite enum has no meaning
    when each shard owns its own files; per-shard BGREWRITEAOF will be
    a separate per-shard operation in a later step.
  - Refuses to start if the loaded manifest's layout is TopLevel — the
    spawn site (step 2f) must only invoke this body for PerShard
    layouts. Layout mismatch is a programmer error and logs at error
    level before exiting.
  - Refuses to start if shard_id is out of range for the manifest's
    `shards.len()` (defensive against config drift between manifest
    write and writer spawn).
  - Every log line includes `shard {shard_id}` so operators can map
    log lines to filesystem state without ambiguity.

Both runtimes (runtime-tokio async I/O via tokio::fs + BufWriter +
tokio::select!, runtime-monoio sync I/O via std::fs in a blocking
recv loop) are covered with feature-gated blocks. The shape mirrors
aof_writer_task closely so future fixes to fsync handling or shutdown
flush can be applied uniformly to both functions.

What this does NOT do (in scope for later sub-steps)
  Step 2c — type plumbing: aof_tx: Option<MpscSender> →
            aof_pool: Option<Arc<AofWriterPool>> in conn_state.rs
            and conn/core.rs
  Step 2d — handler_monoio call sites use ctx.aof_pool.sender(ctx.shard_id)
  Step 2e — handler_sharded / handler_single / blocking call sites
  Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) build the
            pool via top_level()/per_shard() and spawn N
            per_shard_aof_writer_task instances for PerShard layouts

Tests
  No new tests in this commit. The function body mirrors the message
  loop in aof_writer_task line-for-line (with the per-shard differences
  above), which already has 21 unit tests covering Append, Rewrite, and
  Shutdown handling. An end-to-end integration test that spawns N
  writers, drives appends through them, kills the process, and verifies
  per-shard files reload cleanly lands as an #[ignore]-by-default test
  in tests/ alongside step 2f.

Verification
  cargo check + cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)
  All 21 existing persistence::aof tests + 5 pool tests from step 2a
  + 9 manifest tests from step 1 remain green (35 in persistence).

Refs
  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
  Commit 3bb4790 (step 1 — manifest v2 format)
  Commit 5a546ff (step 2a — AofWriterPool type)

author: Tin Dang
…ack)

Two reviewer-flagged bugs in the step 1 manifest work (commit 3bb4790):

1. base_path/incr_path/base_path_seq/incr_path_seq were NOT layout-aware
2. migrate_top_level_to_per_shard flipped self.layout = PerShard BEFORE
   any I/O succeeded and had no rollback for failures after the first
   rename

Both verified against current code before fixing. A third reviewer
suggestion (initialize_multi) was reviewed and skipped — see "Note on
initialize_multi" below.

Bug 1 — Layout-aware path helpers (replay/advance routed to wrong dir)
----------------------------------------------------------------------

  Before: base_path(), incr_path(), base_path_seq(), incr_path_seq()
  unconditionally computed TopLevel paths (`appendonlydir/moon.aof.*`).

  After migrate_top_level_to_per_shard flips layout to PerShard,
  replay_multi_part (aof_manifest.rs:871, 895, 916) and advance()
  (lines 796, 821, 836-837) still asked these helpers for paths and
  got TopLevel locations — while the actual files now lived under
  shard-0/.

  Symptom: post-migration boot fails recovery with "AOF base RDB missing";
  BGREWRITEAOF after migration writes new files to TopLevel locations
  the per-shard writer never reads.

  Fix: route PerShard layout through the existing shard_*_path_seq
  helpers, with debug_assert that shards.len() == 1 (these single-file
  helpers are by definition meaningful only for single-shard layouts;
  multi-shard PerShard callers MUST use shard_*_path[_seq] explicitly).
  Release builds fall back to shard-0 paths rather than panicking so
  production stays recoverable on a stale call site.

  No callers need to change — same signatures, layout-correct results.

Bug 2 — Migrate rollback on partial failure
-------------------------------------------

  Before the fix, migrate_top_level_to_per_shard did:
    1. self.layout = PerShard               (line 689; in-memory flip)
    2. create_dir_all(new_dir)              (line 691; may fail)
    3. rename(old_base → new_base)          (line 708; may fail)
    4. rename or create incr                (lines 709-714; may fail)
    5. write_manifest()                     (line 717; may fail)

  Only step 2's `!old_base.exists()` branch (lines 698-707) reset the
  layout flag on error. Any failure at steps 4 or 5 left the base file
  moved with no rollback AND left self.layout out of sync with the
  on-disk manifest (which still claimed v1 if write_manifest had not
  yet run, or claimed v2 with the wrong file locations if it had).

  Fix: defer the layout flip until everything on disk is in the new
  shape; explicit per-step rollback on every failure path:
    - rename(old_base) failure: nothing moved, plain ? return
    - rename(old_incr) or create(new_incr) failure: rename base back,
      return original error (rollback errors logged but do not mask
      the cause)
    - write_manifest() failure: revert layout flag, remove created
      incr or rename incr back, rename base back

  After this fix the migration is atomic from the loader's perspective:
  either everything is in shard-0/ AND the v2 manifest is on disk, or
  everything is at the top level AND the v1 manifest is on disk. No
  intermediate state survives a crash mid-migration.

Note on initialize_multi
------------------------

  The reviewer also flagged initialize_multi (lines 733-776) for the
  same "layout flipped before I/O" pattern. Verified — does NOT apply:
  initialize_multi constructs the struct with `layout: PerShard` in
  local scope only (no manifest on disk yet), creates all dirs/files
  via the shard_* helpers (which don't depend on self.layout), and
  calls write_manifest() LAST. Any failure aborts before any caller
  observes the half-built state. Orphan shard-{N}/ dirs left on disk
  on failure are harmless (next boot's load() returns Ok(None) and
  recovery treats as fresh init). Skipped — no change needed.

Tests (3 new)
  base_incr_paths_route_to_shard_zero_after_migration
    Pre-migration: base_path() and incr_path() return TopLevel paths.
    Post-migration: they route to shard-0/ AND the file exists there.

  migrate_rolls_back_filesystem_when_incr_rename_fails
    Pre-creates shard-0/moon.aof.1.incr.aof as a DIRECTORY (rename onto
    a non-empty dir fails on every supported OS), forcing the rename
    after-base-already-moved path. Verifies: layout reverts to TopLevel,
    base file restored, base contents intact, on-disk manifest still v1.

  migrate_does_not_mutate_on_missing_base
    Pre-flight check path: layout never flips, no rollback needed,
    NotFound error surfaced.

Verification
  379 persistence tests pass on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)
  cargo clippy clean on both. cargo check clean on both.

Refs
  Reviewer comments on aof_manifest.rs:669-775 and :688-717
  Commit 3bb4790 (step 1 introduced the bugs)
  tmp/rfc-per-shard-aof-v02.md (RFC § 5 case 1 migration)

author: Tin Dang
Fourth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Adds `aof_pool: Option<Arc<AofWriterPool>>`
to ConnectionContext as a **compat alias** alongside the existing
`aof_tx: Option<MpscSender<AofMessage>>`. Zero call-site behavior change.

Why compat alias (and not a single big-bang refactor)
-----------------------------------------------------

The aof_tx → aof_pool transition touches 16 call sites across 10 files
(handler_monoio, handler_sharded, handler_single, blocking,
command/persistence, shard/conn_accept, shard/event_loop, main, listener,
embedded), AND one of those sites carries a load-bearing correctness fix
for cross-shard routing (handler_sharded/mod.rs:1651 — owner shard must
be `target`, not `ctx.shard_id`, otherwise per-shard AOF writes land in
the wrong file).

Splitting plumbing from call-site migration:
  - 2c (this commit) adds the field; ConnectionContext::new takes both
    aof_tx and aof_pool; spawn sites build the pool via
    AofWriterPool::top_level(tx). All four ConnectionContext::new call
    sites in shard/conn_accept.rs updated. No behavior change — pool
    just wraps the same single sender.
  - 2d migrates handler_monoio + handler_monoio/dispatch +
    handler_single + blocking.rs call sites (owner = ctx.shard_id /
    shard_id / 0; all uncontroversial).
  - 2e migrates handler_sharded + handler_sharded/dispatch +
    command/persistence call sites. **Includes the cross-shard routing
    fix at mod.rs:1651** (target, not ctx.shard_id) with the audit
    table pasted into its commit body for posterity, plus removal of
    the legacy aof_tx field.

Each commit compiles and tests green. Bisect remains useful because
the type system always has a consistent shape (both fields present
during 2c-2e, only pool present after 2e).

Pre-refactor audit (16 sites mapped to owner shard)
---------------------------------------------------

| Site                                       | Owner shard       |
|--------------------------------------------|-------------------|
| handler_sharded/mod.rs:1175 MOVE           | ctx.shard_id      |
| handler_sharded/mod.rs:1219 COPY           | ctx.shard_id      |
| handler_sharded/mod.rs:1430 local write    | ctx.shard_id      |
| handler_sharded/mod.rs:1651 x-shard reply  | **target**        |
| handler_sharded/dispatch.rs:356 BGREWRITEAOF | (Rewrite — pool rejects) |
| handler_monoio/mod.rs:486,1124,1189,1538,1937 | ctx.shard_id   |
| handler_monoio/dispatch.rs:981 BGREWRITEAOF | (Rewrite — pool rejects) |
| handler_single.rs (5)                      | 0                 |
| blocking.rs:1349 inline SET                | shard_id (param)  |
| command/persistence.rs:233,263 BGREWRITEAOF helpers | (Rewrite) |
| shard/conn_accept.rs + event_loop.rs       | plumbing only     |

Verified by reading the binding scope at each site:
  - mod.rs:1175/1219 inside `if is_local` (line 1125) → home shard.
  - mod.rs:1430 inside `if is_local` + write-path branch → home shard.
  - mod.rs:1651 inside `for (meta, target) in reply_futures` where meta
    was built per-target by remote_groups.entry(target).or_default()
    (line 1610) — every entry's aof_bytes belongs to that target's shard.
  - handler_monoio is shared-nothing per-shard; ctx.shard_id is the
    handler's home shard which also owns the Database being mutated.
  - blocking.rs::try_inline_dispatch takes shard_id as a parameter.

Changes in this commit
----------------------

src/server/conn/core.rs (ConnectionContext)
  + import AofWriterPool
  + aof_pool: Option<Arc<AofWriterPool>>  (with #[allow(dead_code)]
    explaining 2d/2e are the readers)
  + ConnectionContext::new signature gains aof_pool parameter

src/server/conn_state.rs (ConnectionContext — definition-only twin)
  + import AofWriterPool, mirror field for type-system consistency.
    This struct is #[allow(dead_code)] at the struct level (Phase 44
    placeholder, not constructed anywhere); no constructor changes.

src/shard/conn_accept.rs (4 ConnectionContext::new call sites)
  At each site: compute `aof_pool = aof.as_ref().map(|tx|
  AofWriterPool::top_level(tx.clone()))` and pass it into the new
  parameter. Wrapping the same sender means pool.try_send_append(N, b)
  is identical to tx.try_send(AofMessage::Append(b)) for any N — no
  routing change yet.

Verification
  cargo check + cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)
  All 379 persistence tests remain green.

What this does NOT do (in scope for 2d/2e/2f)
  Step 2d — migrate handler_monoio + handler_single + blocking sites
            from ctx.aof_tx to ctx.aof_pool.as_ref().map(|p|
            p.try_send_append(ctx.shard_id, bytes))
  Step 2e — migrate handler_sharded sites INCLUDING the line 1651
            target-routing fix; remove the legacy aof_tx field;
            update command/persistence BGREWRITEAOF helpers to use
            try_send_rewrite (with PerShard rejection)
  Step 2f — spawn sites (main.rs, listener.rs, embedded.rs) detect
            manifest layout and spawn N per_shard_aof_writer_task
            instances wrapped in AofWriterPool::per_shard()

Refs
  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
  Commit 3bb4790 (step 1 — manifest v2 format)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit cb254ce (review fix — layout-aware paths + migrate rollback)

author: Tin Dang
Fifth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Migrates the 7 `ctx.aof_tx` usages in
`server/conn/handler_monoio/mod.rs` to `ctx.aof_pool`. Includes a
cross-shard routing correctness fix at line 1937 that the compat-alias
plumbing in step 2c made discoverable before it could ship as a silent
data-loss bug.

Routing fix at handler_monoio/mod.rs:1937
-----------------------------------------

The reanalysis triggered by step 2c surfaced that this site is structurally
identical to handler_sharded/mod.rs:1651 — both are the bottom of a
cross-shard reply loop where AOF append must land in the **target**
shard's writer, NOT `ctx.shard_id`.

Before: `let _ = tx.try_send(AofMessage::Append(bytes));` — `tx` is the
single top-level writer, so under TopLevel layout this was correct. Under
PerShard layout (step 2f and beyond) it would have written every
cross-shard write into the connection's home shard AOF, leaving the
target shard's AOF without the record and breaking per-shard recovery.

After: `pool.try_send_append(target, bytes);` where `target` is captured
per-batch when the remote_groups entry is drained.

Plumbing required to expose `target` in scope:

  1. `oneshot_futures` declaration at line 1840 gained a leading
     `usize` element (the target shard) — the type-system anchor
     making the rest of the change mechanical.
  2. The push at line 1884 captures `target` from the drain loop.
  3. The polling loop at line 1892 destructures `(target, meta, reply_rx)`.
  4. The AOF send inside the response-zip at line 1937 uses `target`.

Verified by reading the surrounding scope: `target` is bound in
`for (target, entries) in remote_groups.drain()` at line 1844, where
remote_groups was populated by `remote_groups.entry(target).or_default()`
during command classification — so every entry's aof_bytes belongs to
that target shard's data.

Other migrated sites in this commit
-----------------------------------

| Site                   | Owner shard    | Pattern                          |
|------------------------|----------------|----------------------------------|
| mod.rs:1069 is_write   | n/a            | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1124 MOVE       | ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1189 COPY       | ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1538 local write| ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1771 aof_bytes  | n/a            | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1937 x-shard    | **target**     | `pool.try_send_append(target, _)` ← fix |

All four direct-append sites use `pool.try_send_append(owner, bytes)`
which returns `()` (fire-and-forget — back-pressure is intentional in
the AOF hot path; loss is bounded by the channel capacity already
chosen for the single writer). The `let _ =` wrapper from the tx form
is dropped along with the `AofMessage` import that is no longer
referenced at any call site in this file.

What this does NOT do (deferred to 2e)
--------------------------------------

  handler_monoio/dispatch.rs:981 — BGREWRITEAOF still calls
  `bgrewriteaof_start_sharded(tx, ...)` because the helper itself
  takes `&MpscSender<AofMessage>`. Step 2e migrates the helper to
  `pool.try_send_rewrite(msg)` (with PerShard rejection) and updates
  this call site in the same commit.

  handler_monoio/mod.rs:486 — still passes `&ctx.aof_tx` into
  `try_inline_dispatch_loop` in blocking.rs. Step 2e flips the
  parameter type alongside the body migration in blocking.rs and
  handler_single.rs.

Compat-alias progress
---------------------

After this commit, ctx.aof_pool is the sole AOF interface in
handler_monoio's main dispatch loop. ctx.aof_tx remains as a field
because:
  - dispatch.rs:981 (BGREWRITEAOF) still reads it
  - mod.rs:486 (inline path) still reads it
  - handler_sharded and handler_single haven't migrated yet

Step 2e removes the field entirely after the remaining 11 sites move.

Verification
------------

  cargo check + cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)

  Lib persistence tests:
    tokio: 379 passed
    monoio: 378 passed
  (tokio/monoio diff is feature-gated; matches step 2c baseline.)

  Integration tests (`tests/integration.rs`) fail to compile with
  "missing field unsafe_multishard_aof" on 7 ServerConfig literals —
  this is pre-existing (commit e0bb658 added the field but did not
  update the test file), unrelated to step 2c/2d, and verified on the
  branch tip without these changes via `git stash`.

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 root cause)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit cb254ce (review fix — layout-aware paths + migrate rollback)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)

author: Tin Dang
… 2e-α)

Sixth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Migrates the 5 direct `ctx.aof_tx` AOF-
append sites and 2 `is_some()` gates in `server/conn/handler_sharded/mod.rs`
to `ctx.aof_pool`. Includes the **canonical** cross-shard routing fix at
line 1651 that was the motivating P0 for this entire RFC.

Routing fix at handler_sharded/mod.rs:1651
------------------------------------------

This is the originally-discovered site (counterpart to the latent fix
shipped in step 2d for handler_monoio:1937). The cross-shard reply loop
already had `target` in scope at line 1646 — the loop variable from
`for (meta, target) in reply_futures` — so the change is mechanical:

  Before: `if let Some(ref tx) = ctx.aof_tx { let _ = tx.try_send(AofMessage::Append(bytes)); }`
  After:  `if let Some(ref pool) = ctx.aof_pool { pool.try_send_append(target, bytes); }`

Why this matters: under TopLevel layout, a single writer absorbs every
append regardless of `target`, so the wrong-owner write was structurally
masked. Under PerShard (step 2f and beyond) each shard owns its own AOF
file, and a write that mutates target shard's data MUST land in target
shard's file — otherwise replay of target's AOF won't contain the
record and post-crash state diverges. This was the H1/H2 root cause in
P0-INVEST-01-multishard-aof-rootcause.md.

Other migrated sites in this commit
-----------------------------------

| Site                   | Owner shard    | Pattern                          |
|------------------------|----------------|----------------------------------|
| mod.rs:1122 is_write   | n/a            | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1123 aof_bytes  | n/a            | `aof_tx.is_some()` → `aof_pool.is_some()` |
| mod.rs:1175 MOVE       | ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1219 COPY       | ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1430 local write| ctx.shard_id   | `pool.try_send_append(ctx.shard_id, _)` |
| mod.rs:1651 x-shard    | **target**     | `pool.try_send_append(target, _)` ← fix |

The `AofMessage` import is no longer referenced at any call site in
this file and is removed.

Scope split (subdivision of step 2e)
------------------------------------

The original 2c plan listed 2e as one big commit. To keep each step
green-on-both-runtimes and bisectable, 2e is split into 4 atomic commits:

  2e-α (this commit) — handler_sharded/mod.rs only (mirrors 2d shape).
  2e-β — command/persistence.rs BGREWRITEAOF helpers swap to
         `&AofWriterPool` (with PerShard rejection translated to a
         user-facing RESP error); both handler_*/dispatch.rs BGREWRITEAOF
         call sites flip together.
  2e-γ — handler_single.rs (6 sites, parameter type swap),
         blocking.rs (2 fn signatures + 1 use), handler_monoio/mod.rs:486
         (call site for the migrated blocking helper), and the 12
         test call sites in server/conn/tests.rs.
  2e-δ — Remove `aof_tx` field from ConnectionContext and conn_state.rs;
         drop the parameter from `ConnectionContext::new`; simplify the
         4 spawn sites in shard/conn_accept.rs.

Each commit compiles + clippy clean + lib persistence tests green on
both `runtime-monoio` and `runtime-tokio,jemalloc`. The compat-alias
field (`ctx.aof_tx` alongside `ctx.aof_pool`) introduced in step 2c
lets each commit flip its slice of call sites without breaking the
other consumers.

Verification
------------

  cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)

  Lib persistence tests:
    tokio: 379 passed
    monoio: 378 passed
  (Diff is feature-gated; matches step 2c/2d baseline.)

  Pre-existing tests/integration.rs breakage on
  `unsafe_multishard_aof` ServerConfig field (commit e0bb658) remains
  unrelated to this commit — verified via `git stash` in step 2d.

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  tmp/P0-INVEST-01-multishard-aof-rootcause.md (H1/H2 — the bug 1651 fixes)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit cb254ce (review fix — layout-aware paths + migrate rollback)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
  Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)

author: Tin Dang
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/persistence/aof.rs`:
- Around line 858-930: The per-shard writer loop currently blocks on rx.recv()
and never checks cancel, so a cancellation-only shutdown can hang; modify the
loop around rx.recv() (the match handling AofMessage::{Append, Rewrite,
RewriteSharded, Shutdown}) to make cancellation reachable by either using a
non-blocking/timeout receive (e.g., try_recv/recv_timeout) or by selecting
between rx.recv() and checking the cancel flag (atomic or channel) before/after
the recv, breaking out when cancel is set; ensure you still perform the same
final flush/sync logic (the file.flush().and_then(|_| file.sync_data()) block
guarded by write_error) and preserve the metrics/fsync handling and warnings for
Rewrite/RewriteSharded.
- Around line 720-756: The per-shard AOF writer currently ignores errors from
writer.flush().await and writer.get_ref().sync_data().await in the EverySec,
Shutdown, and cancel branches (as well as the Always path already checked),
which can silently drop durability; update the branches handling interval.tick()
when fsync == FsyncPolicy::EverySec, the Ok(AofMessage::Shutdown) / Err(_)
shutdown branch, and the cancel.cancelled() branch to check the Result values
from flush() and sync_data(), log failures including shard_id and the error, and
surface a degraded state (e.g., return or set a tracer/metric) instead of
discarding errors—modify the calls around writer.flush().await and
writer.get_ref().sync_data().await and add error handling/logging similar to the
existing Append path handling.

In `@src/server/conn/handler_monoio/mod.rs`:
- Around line 1536-1541: The AOF is serializing the original client Frame
(`frame`) instead of the possibly workspace-rewritten command arguments
(`cmd_args`), causing persisted writes to use unprefixed keys; change the AOF
serialization for local writes to serialize the dispatched command (the same
representation used by `dispatch_frame`) by passing the rewritten command args
(or the dispatched command object) into `aof::serialize_command` before calling
`pool.try_send_append` so local writes persist the post-rewrite command; update
the block that checks `is_write`/`ctx.aof_pool` to use `cmd_args` (or the
dispatched command) rather than `frame`, keeping the
`pool.try_send_append(ctx.shard_id, ...)` call.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: b9290671-bc3d-4bf2-9a53-f512eecb98c0

📥 Commits

Reviewing files that changed from the base of the PR and between 3bb4790 and a05f3d8.

📒 Files selected for processing (6)
  • src/persistence/aof.rs
  • src/persistence/aof_manifest.rs
  • src/server/conn/core.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn_state.rs
  • src/shard/conn_accept.rs
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/persistence/aof_manifest.rs

Comment thread src/persistence/aof.rs Outdated
Comment on lines +720 to +756
Ok(AofMessage::Append(data)) => {
if let Err(e) = writer.write_all(&data).await {
error!("AOF write error shard {}: {}", shard_id, e);
continue;
}
if matches!(fsync, FsyncPolicy::Always) {
let _ = writer.flush().await;
let _ = writer.get_ref().sync_data().await;
}
}
Ok(AofMessage::Rewrite(_)) | Ok(AofMessage::RewriteSharded(_)) => {
warn!(
"AOF writer shard {}: received Rewrite/RewriteSharded — \
not supported in PerShard layout, dropped. \
Per-shard BGREWRITEAOF lands in RFC step 6.",
shard_id
);
}
Ok(AofMessage::Shutdown) | Err(_) => {
let _ = writer.flush().await;
let _ = writer.get_ref().sync_data().await;
info!("AOF writer shard {} shutting down", shard_id);
break;
}
}
}
_ = interval.tick(), if fsync == FsyncPolicy::EverySec => {
if last_fsync.elapsed() >= std::time::Duration::from_secs(1) {
let _ = writer.flush().await;
let _ = writer.get_ref().sync_data().await;
last_fsync = Instant::now();
}
}
_ = cancel.cancelled() => {
let _ = writer.flush().await;
let _ = writer.get_ref().sync_data().await;
info!("AOF writer shard {} cancelled", shard_id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Surface fsync/flush failures in the Tokio per-shard writer.

Lines 726-727, 748-755 ignore flush() / sync_data() errors. That makes appendfsync always and shutdown/cancel paths silently lose durability without logging or tripping degraded state.

Suggested direction
-                            if matches!(fsync, FsyncPolicy::Always) {
-                                let _ = writer.flush().await;
-                                let _ = writer.get_ref().sync_data().await;
+                            if matches!(fsync, FsyncPolicy::Always) {
+                                if let Err(e) = writer.flush().await {
+                                    error!("AOF flush failed shard {}: {}", shard_id, e);
+                                    break;
+                                }
+                                if let Err(e) = writer.get_ref().sync_data().await {
+                                    error!("AOF sync failed shard {}: {}", shard_id, e);
+                                    break;
+                                }
                             }

Apply the same check to the EverySec, Shutdown, and cancel branches too.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/persistence/aof.rs` around lines 720 - 756, The per-shard AOF writer
currently ignores errors from writer.flush().await and
writer.get_ref().sync_data().await in the EverySec, Shutdown, and cancel
branches (as well as the Always path already checked), which can silently drop
durability; update the branches handling interval.tick() when fsync ==
FsyncPolicy::EverySec, the Ok(AofMessage::Shutdown) / Err(_) shutdown branch,
and the cancel.cancelled() branch to check the Result values from flush() and
sync_data(), log failures including shard_id and the error, and surface a
degraded state (e.g., return or set a tracer/metric) instead of discarding
errors—modify the calls around writer.flush().await and
writer.get_ref().sync_data().await and add error handling/logging similar to the
existing Append path handling.

Comment thread src/persistence/aof.rs
Comment on lines +858 to +930
loop {
match rx.recv() {
Ok(AofMessage::Append(data)) => {
if write_error {
continue;
}
if let Err(e) = file.write_all(&data) {
error!(
"AOF write failed shard {} (seq {}): {}. Persistence degraded.",
shard_id, manifest.seq, e
);
write_error = true;
continue;
}
match fsync {
FsyncPolicy::Always => {
let t = Instant::now();
if let Err(e) = file.flush().and_then(|_| file.sync_data()) {
error!(
"AOF sync failed shard {} (seq {}, always): {}",
shard_id, manifest.seq, e
);
write_error = true;
} else {
crate::admin::metrics_setup::record_aof_fsync(
t.elapsed().as_micros() as u64,
);
}
}
FsyncPolicy::EverySec => {
if last_fsync.elapsed() >= std::time::Duration::from_secs(1) {
let t = Instant::now();
if let Err(e) = file.flush().and_then(|_| file.sync_data()) {
error!(
"AOF sync failed shard {} (seq {}, everysec): {}",
shard_id, manifest.seq, e
);
} else {
crate::admin::metrics_setup::record_aof_fsync(
t.elapsed().as_micros() as u64,
);
last_fsync = Instant::now();
}
}
}
FsyncPolicy::No => {}
}
}
Ok(AofMessage::Rewrite(_)) | Ok(AofMessage::RewriteSharded(_)) => {
warn!(
"AOF writer shard {}: received Rewrite/RewriteSharded — \
not supported in PerShard layout, dropped. \
Per-shard BGREWRITEAOF lands in RFC step 6.",
shard_id
);
}
Ok(AofMessage::Shutdown) | Err(_) => {
if !write_error {
if let Err(e) = file.flush().and_then(|_| file.sync_data()) {
error!(
"AOF final sync failed shard {} (seq {}): {}",
shard_id, manifest.seq, e
);
}
}
info!(
"AOF writer shard {} shutting down (monoio, seq {})",
shard_id, manifest.seq
);
break;
}
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make cancellation reachable in the Monoio per-shard writer loop.

After manifest load, this branch blocks on rx.recv() and never checks cancel again. A cancellation-only shutdown can therefore hang until another channel message arrives.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/persistence/aof.rs` around lines 858 - 930, The per-shard writer loop
currently blocks on rx.recv() and never checks cancel, so a cancellation-only
shutdown can hang; modify the loop around rx.recv() (the match handling
AofMessage::{Append, Rewrite, RewriteSharded, Shutdown}) to make cancellation
reachable by either using a non-blocking/timeout receive (e.g.,
try_recv/recv_timeout) or by selecting between rx.recv() and checking the cancel
flag (atomic or channel) before/after the recv, breaking out when cancel is set;
ensure you still perform the same final flush/sync logic (the
file.flush().and_then(|_| file.sync_data()) block guarded by write_error) and
preserve the metrics/fsync handling and warnings for Rewrite/RewriteSharded.

Comment on lines 1536 to 1541
// AOF logging for successful local writes
if !matches!(response, Frame::Error(_)) && is_write {
if let Some(ref tx) = ctx.aof_tx {
if let Some(ref pool) = ctx.aof_pool {
let serialized = aof::serialize_command(&frame);
let _ = tx.try_send(AofMessage::Append(serialized));
pool.try_send_append(ctx.shard_id, serialized);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Serialize the dispatched command, not the client frame, for local AOF writes.

Line 1539 still uses frame, but by this point cmd_args may already be workspace-rewritten. Remote writes handle this correctly with dispatch_frame; local writes will persist the unprefixed command and replay to the wrong key after restart.

Suggested fix
-                        if let Some(ref pool) = ctx.aof_pool {
-                            let serialized = aof::serialize_command(&frame);
+                        if let Some(ref pool) = ctx.aof_pool {
+                            let aof_frame = if rewritten.is_some() {
+                                let mut parts = Vec::with_capacity(1 + cmd_args.len());
+                                parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd)));
+                                parts.extend_from_slice(cmd_args);
+                                Frame::Array(parts.into())
+                            } else {
+                                frame.clone()
+                            };
+                            let serialized = aof::serialize_command(&aof_frame);
                             pool.try_send_append(ctx.shard_id, serialized);
                         }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
// AOF logging for successful local writes
if !matches!(response, Frame::Error(_)) && is_write {
if let Some(ref tx) = ctx.aof_tx {
if let Some(ref pool) = ctx.aof_pool {
let serialized = aof::serialize_command(&frame);
let _ = tx.try_send(AofMessage::Append(serialized));
pool.try_send_append(ctx.shard_id, serialized);
}
// AOF logging for successful local writes
if !matches!(response, Frame::Error(_)) && is_write {
if let Some(ref pool) = ctx.aof_pool {
let aof_frame = if rewritten.is_some() {
let mut parts = Vec::with_capacity(1 + cmd_args.len());
parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd)));
parts.extend_from_slice(cmd_args);
Frame::Array(parts.into())
} else {
frame.clone()
};
let serialized = aof::serialize_command(&aof_frame);
pool.try_send_append(ctx.shard_id, serialized);
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_monoio/mod.rs` around lines 1536 - 1541, The AOF is
serializing the original client Frame (`frame`) instead of the possibly
workspace-rewritten command arguments (`cmd_args`), causing persisted writes to
use unprefixed keys; change the AOF serialization for local writes to serialize
the dispatched command (the same representation used by `dispatch_frame`) by
passing the rewritten command args (or the dispatched command object) into
`aof::serialize_command` before calling `pool.try_send_append` so local writes
persist the post-rewrite command; update the block that checks
`is_write`/`ctx.aof_pool` to use `cmd_args` (or the dispatched command) rather
than `frame`, keeping the `pool.try_send_append(ctx.shard_id, ...)` call.

TinDang97 added 6 commits May 27, 2026 13:45
…tep 2e-β)

Seventh implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Swings `bgrewriteaof_start` and
`bgrewriteaof_start_sharded` over to `&AofWriterPool` and routes
through `pool.try_send_rewrite(...)`, which rejects under PerShard
layout with a stable user-facing RESP error. All three callers flip
together so the helpers stay strictly typed.

Why this matters
----------------

Step 2b shipped `per_shard_aof_writer_task` with PerShard rejection of
Rewrite/RewriteSharded messages (logged at `warn!`). Before this commit,
under PerShard layout BGREWRITEAOF would have:

  1. Sent `AofMessage::RewriteSharded(...)` into shard-0's writer via
     the legacy `tx.try_send(...)` path,
  2. Received `Ok(())` (channel accepted the message),
  3. Returned `+Background append only file rewriting started\r\n` to the
     client,
  4. The per-shard writer would warn and drop the message — no rewrite
     happens.

That is a silent failure: the client thinks a rewrite is in progress
when nothing is actually happening, and the rewrite-in-progress flag is
stuck set. After this commit, `pool.try_send_rewrite(...)` returns
`RewriteUnsupportedInPerShard`, the helper clears the flag, and the
client receives an explicit error:

  -ERR BGREWRITEAOF is not yet supported under per-shard AOF layout;
   per-shard rewrite ships in step 6 of the per-shard AOF migration

(Under TopLevel layout — i.e. today — `try_send_rewrite` is a thin
pass-through, so behaviour is unchanged.)

Changes
-------

command/persistence.rs
  - Both `bgrewriteaof_start` and `bgrewriteaof_start_sharded` now
    take `pool: &AofWriterPool` instead of `&channel::MpscSender<AofMessage>`.
  - New `rewrite_pool_error_frame(err: AofPoolSendError)` translates
    pool failures into RESP errors (PerShard rejection → user-facing
    "not yet supported"; channel send fail → existing "failed to start").
  - `AOF_REWRITE_IN_PROGRESS` is still cleared on any send failure,
    matching prior behaviour.
  - Removed now-unused `crate::runtime::channel` import.
  - Existing gate test `test_bgrewriteaof_sharded_refuses_under_unsafe_config`
    updated to wrap the local sender as a `TopLevel` pool before
    invoking the helper.

server/conn/handler_monoio/dispatch.rs:980
server/conn/handler_sharded/dispatch.rs:355
  - BGREWRITEAOF dispatch path uses `ctx.aof_pool` (the field plumbed
    in step 2c) instead of `ctx.aof_tx`. Behaviour identical under
    TopLevel; gains PerShard rejection in step 2f.

server/conn/handler_single.rs:610
  - Wraps the local `aof_tx` parameter as a transient
    `AofWriterPool::top_level(tx.clone())` before calling the helper.
    handler_single is single-shard mode by definition, so the writer
    is always TopLevel — the wrapper is purely a type adapter.
    BGREWRITEAOF is a manual admin command, not a hot path; the
    transient allocation is acceptable. Step 2e-γ swaps the function's
    `aof_tx` parameter to `aof_pool` and removes this wrapper.

server/conn/core.rs (ConnectionContext.aof_tx)
  - Doc comment expanded to track the staged removal.
  - `#[cfg_attr(not(feature = "runtime-monoio"), allow(dead_code))]`
    silences clippy under tokio (where the only remaining reader is
    `handler_monoio/mod.rs:486`, which is `#[cfg(feature = "runtime-monoio")]`).
    Future regressions on monoio still trip a real dead-code warning.

What this does NOT do (deferred to 2e-γ)
---------------------------------------

  - handler_single's 5 remaining `aof_tx` sites (SWAPDB at 658, AOF
    drain at 881, WAL records at 1513, is_write at 1531, AOF drain at
    2235). All keep using the local `aof_tx` parameter.
  - handler_single function-parameter rename (`aof_tx` → `aof_pool`).
  - blocking.rs `try_inline_dispatch` / `try_inline_dispatch_loop`
    signatures + the AOF send at line 1349.
  - handler_monoio/mod.rs:486 call site for the migrated blocking
    helper.
  - server/conn/tests.rs (12 call sites — straightforward None/Some
    swaps once blocking.rs's signature flips).

What this does NOT do (deferred to 2e-δ)
---------------------------------------

  - Remove the `aof_tx` field from ConnectionContext and conn_state.rs.
  - Drop the parameter from `ConnectionContext::new`.
  - Simplify the 4 spawn sites in shard/conn_accept.rs.

Verification
------------

  cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)

  Lib persistence tests:
    tokio: 379 passed
    monoio: 378 passed
  Including the gate-refusal test that now exercises the pool path.

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  Commit 5a546ff (step 2a — AofWriterPool type + try_send_rewrite)
  Commit 3afe21f (step 2b — per-shard writer task body that rejects
                  Rewrite/RewriteSharded with warn!)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
  Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
  Commit eb90419 (step 2e-α — handler_sharded migration + canonical
                  routing fix at line 1651)

author: Tin Dang
…o aof_pool (Option B step 2e-γ)

Eighth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Drains the remaining `ctx.aof_tx` and
parameter-level `aof_tx` readers from the connection-handler layer:

  - `blocking.rs::try_inline_dispatch` + `try_inline_dispatch_loop`:
    parameter type changes from `&Option<MpscSender<AofMessage>>` to
    `&Option<Arc<AofWriterPool>>`. The L1349 AOF append uses
    `pool.try_send_append(shard_id, frozen)` — under PerShard layout this
    routes to the shard that owns the data, fixing the same latent bug
    class as 2d/2e-α (a TopLevel writer would absorb every shard's
    inline SET regardless of routing).
  - `handler_monoio/mod.rs:486`: flipped to pass `&ctx.aof_pool` into the
    migrated blocking helper. After this commit no consumer reads
    `ctx.aof_tx` under any feature combo.
  - `handler_single.rs`: top of `handle_connection` constructs
    `aof_pool: Option<Arc<AofWriterPool>>` from the inbound `aof_tx`
    parameter via `AofWriterPool::top_level(tx.clone())`. All six
    consumer sites (BGREWRITEAOF wrapper from 2e-β, SWAPDB WAL,
    per-batch AOF drain at 905, per-batch AOF drain at 2260, GRAPH WAL
    records at 1537, is_write/aof_bytes gate at 1556) now read
    `aof_pool` instead of `aof_tx`. The `aof_tx` function parameter
    survives as a placeholder for 2e-δ when listener.rs starts
    constructing the pool itself.
  - `server/conn/tests.rs`: 12 inline-dispatch test fixtures swap
    `aof_tx: Option<MpscSender<AofMessage>>` for
    `aof_pool: Option<Arc<AofWriterPool>>` and pass `&aof_pool` into
    the migrated `try_inline_dispatch[_loop]`. The one Some-form
    fixture (`test_inline_set_with_aof_falls_through_when_writes_disabled`)
    wraps the local sender as a TopLevel pool.

Two send-style choices made deliberately
----------------------------------------

`AofWriterPool` exposes two send paths today: a fire-and-forget
`try_send_append(shard_id, bytes)` (returns `()`) and the lower-level
`sender(shard_id)` which returns the underlying `&MpscSender` for
callers that need the `Result` or want `send_async`. Most migrated
sites use `try_send_append`; the four exceptions are:

  - SWAPDB at handler_single:677 keeps `sender(0).try_send(...).is_ok()`
    because the swap MUST abort cleanly if the WAL enqueue fails (it
    is the only durability hook before the in-memory swap). The
    fire-and-forget helper silently drops; here we need the Result.
  - The three `send_async(AofMessage::Append(...)).await` sites at
    handler_single:909 / 1540 / 2266 keep `sender(0).send_async(...).await`
    because their pre-pool code awaited capacity on a full channel
    (back-pressure on the inbound write path). `try_send_append` would
    drop instead. Preserving the semantics is more important than the
    uniform call shape here — the per-shard pool exposes the same
    sender under PerShard, so the semantics carry over in 2f.

ConnectionContext.aof_tx
------------------------

After this commit the field has no readers under either runtime. The
doc comment is updated to reflect the staged removal, and the
`cfg_attr(not(...))` gate from 2e-β collapses to a plain
`#[allow(dead_code)]` (the field is write-only — populated by the
constructor — until 2e-δ drops both the constructor parameter and
the field itself).

What this does NOT do (deferred to 2e-δ)
---------------------------------------

  - Remove the `aof_tx` field from ConnectionContext + conn_state.rs.
  - Drop the constructor parameter `aof_tx: Option<MpscSender<...>>`
    from `ConnectionContext::new`.
  - Simplify the 4 spawn sites in shard/conn_accept.rs (they currently
    clone `aof` only to pass it as the field; once the field is gone
    the field-assignment can go too).
  - Replace the `aof_tx` parameter on handler_single's
    `handle_connection` with `aof_pool` (and update listener.rs to
    construct the pool itself).

Verification
------------

  cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)

  Lib persistence tests:
    tokio: 379 passed
    monoio: 378 passed

  Inline dispatch tests (server::conn::tests): 11 passed
  (covers GET hit/miss, multi-shard skip, SET inline, SET with AOF
   fall-through, several malformed-input rejects).

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
  Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
  Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
  Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)

author: Tin Dang
…step 2e-δ)

Ninth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). With handler_monoio (2d), handler_sharded
(2e-α), BGREWRITEAOF helpers (2e-β), and handler_single + blocking +
inline tests (2e-γ) all migrated to `AofWriterPool`, the compat-alias
`aof_tx` field on `ConnectionContext` has no remaining consumers. This
commit removes it, drops the parameter from `ConnectionContext::new`,
and simplifies the 4 spawn sites in `shard/conn_accept.rs` that no
longer need to clone `aof_tx` as an intermediate.

Changes
-------

src/server/conn/core.rs
  - Remove `aof_tx: Option<MpscSender<AofMessage>>` field
    (was `#[allow(dead_code)]` in step 2e-γ after the last reader left).
  - Drop `aof_tx` parameter from `ConnectionContext::new`.
  - Drop `aof_tx` from struct initializer.
  - Doc-comment on `aof_pool` updated to reflect it as the sole AOF
    interface (the "compat alias" framing from step 2c is now history).
  - Remove unused `AofMessage` import.

src/server/conn_state.rs (definition-only placeholder twin)
  - Mirror the same field removal + doc-comment update.
  - Remove unused `AofMessage` import.

src/shard/conn_accept.rs (4 ConnectionContext::new spawn sites)
  - Drop the intermediate `let aof = aof_tx.clone();` — the only
    consumer was the constructor's removed parameter.
  - Build the pool directly: `aof_pool = aof_tx.as_ref().map(...)`.
  - Drop the `aof,` positional argument from each constructor call.
  - Update the "2c compat alias" comment to point forward at the
    layout-aware constructor in step 2f.

What this does NOT do (deferred to 2f)
-------------------------------------

  - handler_single's `aof_tx` parameter on `handle_connection` — needs
    listener.rs (the spawn site) to construct the pool itself first.
  - Spawn-side AOF channel construction in main.rs, listener.rs, and
    embedded.rs — they still build a single `MpscSender<AofMessage>`
    and pass it through `aof_tx` chains. Step 2f introduces the
    layout-aware `AofWriterPool::from_manifest(...)` that emits
    `top_level(tx)` for TopLevel or `per_shard(senders)` for PerShard
    and replaces the per-shard channel fanout in `shard/event_loop.rs`.

Verification
------------

  cargo clippy clean on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc
    (defaults: runtime-monoio,jemalloc,graph,text-index)

  Lib persistence tests:
    tokio: 379 passed
    monoio: 378 passed
  Inline-dispatch tests (server::conn::tests): 11 passed.

End-state of step 2 (handler-layer migration)
---------------------------------------------

After this commit `aof_pool` is the sole AOF interface across:
  - ConnectionContext (struct + constructor)
  - handler_sharded (mod.rs + dispatch.rs)
  - handler_monoio (mod.rs + dispatch.rs)
  - handler_single (all internal sites; parameter still receives
    `aof_tx` but is only used to bootstrap the pool)
  - blocking.rs (try_inline_dispatch + try_inline_dispatch_loop)
  - command/persistence.rs (BGREWRITEAOF helpers, with PerShard
    rejection)
  - server/conn/tests.rs (12 inline-dispatch fixtures)

The remaining `aof_tx` references in the tree:
  - src/main.rs, src/server/embedded.rs, src/server/listener.rs
    (spawn-side channel construction — 2f scope)
  - src/shard/event_loop.rs (passes `aof_tx` through to conn_accept;
    2f flips to per-shard pool construction)
  - src/shard/conn_accept.rs (still receives `aof_tx: &Option<MpscSender>`
    as parameter; 2f changes to `aof_pool: &Option<Arc<AofWriterPool>>`)
  - src/server/conn/handler_single.rs (function parameter only;
    bootstrap site for the local pool — 2f rename)
  - src/persistence/aof.rs (channel type definitions — stable)

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool compat alias)
  Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
  Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
  Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)
  Commit ceac655 (step 2e-γ — handler_single + blocking + inline tests)

author: Tin Dang
…ig literals

Commit e0bb658 added `unsafe_multishard_aof: bool` to `ServerConfig`
(the P0 gate against multi-shard AOF data loss until per-shard replay
lands) but did not update the 17 `ServerConfig { .. }` literals
scattered across the integration-test suite. The tests have been
failing to compile since then on both feature combinations.

This commit backfills `unsafe_multishard_aof: false,` in all affected
literals — preserving the production default (refuse the unsafe config
at startup unless explicitly overridden). No test semantics change:
the tests that exercise multi-shard configs already use single-shard
storage layouts or `appendonly = "no"`, so the gate doesn't fire for
them.

Files touched (17 literals across 10 files)
-------------------------------------------

  tests/ft_search_multi_shard_as_of.rs
  tests/ft_search_temporal_parity.rs
  tests/integration.rs            (7 sites)
  tests/kill_snapshot.rs
  tests/mq_integration.rs
  tests/replication_test.rs
  tests/txn_ft_search_snapshot.rs
  tests/txn_kv_wiring.rs
  tests/vacuum_commands.rs
  tests/workspace_integration.rs  (2 sites)

Verification
------------

  cargo check --tests
  cargo check --tests --no-default-features --features runtime-tokio,jemalloc

Both clean. Unblocks integration-test runs for the per-shard AOF
migration commits (2a..2e-δ on origin) and any future PRs landing on
this branch.

Refs
----

  Commit e0bb658 (origin of the unbackfilled field)
  Commit 6e49050 (docs noting the multi-shard AOF safety gate)
  tmp/rfc-per-shard-aof-v02.md (per-shard AOF migration scope)

author: Tin Dang
…tep 2f-α)

Tenth implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Closes out the handler-layer migration
sequence by lifting `AofWriterPool` construction to the three spawn
sites (`main.rs`, `server/listener.rs`, `server/embedded.rs`) and
retyping the connection-accept fan-out (`shard/event_loop.rs`,
`shard/conn_accept.rs`, `server/conn/handler_single.rs`) to thread
`Option<Arc<AofWriterPool>>` end-to-end. The compat-alias inline
construction that step 2c–2e-δ relied on (`let aof_pool = aof_tx
.as_ref().map(|tx| AofWriterPool::top_level(tx.clone()))`) is deleted
from every site.

After this commit, `aof_tx` no longer exists anywhere in `src/`. Grep
confirms zero matches under any feature combo.

Scope split: 2f-α vs 2f-β
-------------------------

This commit is strictly **type plumbing** — every writer pool is still
`AofLayout::TopLevel` wrapping a single sender. The layout-aware
constructor that reads `AofManifest` and emits PerShard pools (with
fan-out to N writer threads) lands as a follow-up commit (2f-β). The
RFC's "Step 2f" originally bundled both; separating them keeps the
diff bisectable and preserves the property that today's runtime
behavior is byte-identical to step 2e-δ.

Changes
-------

src/main.rs
  - Import `AofWriterPool` alongside `AofMessage` + `FsyncPolicy`.
  - Replace `let aof_tx: Option<MpscSender<AofMessage>>` with
    `let aof_pool: Option<Arc<AofWriterPool>>`. Wrap the writer
    sender via `AofWriterPool::top_level(tx)`.
  - Rename per-shard clone `shard_aof_tx` → `shard_aof_pool` and the
    matching positional argument in `Shard::run(...)`.
  - Shutdown path: `tx.send(AofMessage::Shutdown)` →
    `pool.broadcast_shutdown()`. Under TopLevel this is one try_send;
    under PerShard (2f-β) it fans to every per-shard writer.

src/server/listener.rs
  - Same pattern. `aof_tx` → `aof_pool: Option<Arc<AofWriterPool>>`,
    wrapped at the construction site.
  - Accept-loop captures `aof_pool_conn = aof_pool.clone()` (Arc
    bump) and passes it as the `aof_pool` parameter of
    `connection::handle_connection` (handler_single).
  - Cancel-path shutdown switches to `pool.broadcast_shutdown()`
    (note: `try_send`-based, not async — listener already drains
    on the same runtime).

src/server/embedded.rs
  - Mirror change: outer tuple now `(Option<Arc<AofWriterPool>>,
    Option<JoinHandle>)`.
  - Shutdown-ordering comment updated to reflect the pool-Drop
    semantics — dropping the last `Arc` drops the pool, which drops
    the underlying `Vec<MpscSender>`, which closes the channel. The
    writer's `recv_async()` returns `Err(_)` and the task drains +
    fsyncs + exits cleanly. This preserves Qodo bug #5's fix:
    shards drop their clones before the outer pool, so the writer
    never terminates while shards still have pending appends.

src/shard/event_loop.rs
  - `Shard::run` signature: `aof_tx: Option<MpscSender<AofMessage>>`
    → `aof_pool: Option<Arc<AofWriterPool>>`.
  - 9 internal pass-through sites (`&aof_tx` → `&aof_pool`) updated.

src/shard/conn_accept.rs
  - 4 function signatures (`spawn_tokio_connection`,
    `spawn_monoio_connection`, `spawn_monoio_tls_connection`,
    `spawn_migrated_monoio_connection`): parameter
    `aof_tx: &Option<MpscSender<AofMessage>>` →
    `aof_pool: &Option<Arc<AofWriterPool>>`.
  - 4 inline pool-construction blocks deleted (the compat-alias
    `let aof_pool = aof_tx.as_ref().map(|tx| top_level(tx.clone()))`
    pattern from step 2c). Replaced by a one-line Arc bump:
    `let pool_for_ctx = aof_pool.as_ref().map(Arc::clone);`
    passed positionally into `ConnectionContext::new(.., pool_for_ctx, ..)`.

src/server/conn/handler_single.rs
  - Parameter `aof_tx: Option<MpscSender<AofMessage>>` →
    `aof_pool: Option<Arc<AofWriterPool>>`.
  - **DELETED** the step-2e-γ bootstrap block that wrapped the
    inbound `aof_tx` as a TopLevel pool. The parameter IS the pool
    now; the bootstrap was always a placeholder for this commit.
  - Doc comment on `handle_connection` updated to reflect the
    pool semantics (single-shard ⇒ always TopLevel).

What this does NOT do (deferred to 2f-β)
----------------------------------------

  - Read `AofManifest` from disk in `main.rs`/`embedded.rs` to choose
    between `top_level(...)` and `per_shard(senders)`.
  - Spawn N writer threads when the on-disk manifest is `AofLayout::PerShard`.
  - Add a manifest mismatch warning (manifest says PerShard but
    constructed as TopLevel, or vice versa).
  - Wire `per_shard_aof_writer_task` (already defined in step 2b)
    into the spawn flow.

Today's runtime behavior is byte-identical to step 2e-δ. The only
observable change is: every site speaks the `AofWriterPool` API
instead of `MpscSender<AofMessage>`, which is a precondition for
2f-β shipping the PerShard fan-out without touching call sites again.

Verification
------------

  cargo check on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc   clean
    (defaults: runtime-monoio,jemalloc,graph,text-index)     clean

  cargo clippy -- -D warnings on both feature combinations: clean.

  Lib persistence tests (full set, including the 5 pool_tests added
  in step 2a):
    tokio: 379 passed (baseline match)
    monoio: 378 passed (baseline match)

  cargo test --lib (full lib suite):
    tokio: 2751 passed
    monoio: pre-existing stack overflow in
      `graph::cypher::parser::tests::test_nesting_depth_exceeded`
      (verified on origin/HEAD without these changes — unrelated to
      AOF migration).

  Integration-test compile: clean on both combos after the parallel
  test-fix commit `4fdd50f` (unsafe_multishard_aof backfill).

Net `aof_tx` references in src/
-------------------------------

  Before this commit: 37 across 6 files.
  After this commit:  0.

The full per-shard AOF refactor (steps 2a–2f-α) is now complete on
the handler + spawn layer. Step 2f-β (layout-aware fan-out) and step
3+ (LSN tagging, per-shard replay, cross-shard ordering, AppendSync,
crash matrix) are unblocked.

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 4 — writer architecture)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per-shard writer task body)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool compat alias)
  Commit a05f3d8 (step 2d — handler_monoio migration + latent routing fix)
  Commit eb90419 (step 2e-α — handler_sharded migration + canonical routing fix)
  Commit 5735031 (step 2e-β — BGREWRITEAOF helpers via AofWriterPool)
  Commit ceac655 (step 2e-γ — handler_single + blocking + inline tests)
  Commit d9a3651 (step 2e-δ — drop ConnectionContext.aof_tx field)
  Commit 4fdd50f (test backfill — unsafe_multishard_aof field)

author: Tin Dang
…step 2f-β)

Eleventh implementation step of the per-shard AOF RFC (Option B in
tmp/rfc-per-shard-aof-v02.md). Replaces the unconditional TopLevel
construction at `main.rs:312` (left in place by step 2f-α) with a
read-only manifest peek + layout-aware spawn. When an on-disk manifest
declares `layout == PerShard` AND `--shards >= 2`, main.rs now spawns
one `per_shard_aof_writer_task` per shard and returns
`AofWriterPool::per_shard(senders)` instead of the single-writer
TopLevel pool.

Scope: main.rs only
-------------------

`embedded.rs` and `server/listener.rs` are deliberately untouched.
Both run the tokio single-file legacy AOF path (`aof_writer_task`
opens `<dir>/<appendfilename>`) and never engage the manifest by
design — see the comment block at `embedded.rs:222-235`. Adding a
PerShard branch in either would risk Qodo bug #3 (incr-only replay
on the next boot silently dropping data).

`listener.rs` is the tokio single-shard path: per-shard fan-out has
no meaning with one shard, so it inherits TopLevel from
`AofWriterPool::top_level(tx)` at the construction site.

The new branching logic
-----------------------

src/main.rs (L308-419)

  1. If `appendonly == "yes"`:
       AofManifest::load(&base_dir)
       - Ok(Some(m))  → continue with existing manifest
       - Ok(None)     → no manifest yet (fresh install)
       - Err(_)       → **fatal exit (2)** with the same "refusing to
                        start to avoid data loss" message used by the
                        replay block at L514-526. Mirroring this is
                        load-bearing: silently falling back to TopLevel
                        on a corrupt manifest would let the next write
                        create a fresh manifest that overwrites the
                        reference to the real base RDB, losing data.

  2. If a manifest was loaded: `verify_shard_count(num_shards as u16)`.
     Mismatch is fatal (exit 2) with the verbatim RFC § 3 error
     ("ERR shard count changed (manifest=N, config=M); refusing to
     start to avoid data loss. See docs/runbooks/shard-count-change.md").

  3. Spawn decision:
       use_per_shard = manifest.is_some()
                       && manifest.layout == PerShard
                       && num_shards >= 2

  4. If `use_per_shard`:
       - for sid in 0..num_shards:
           (tx, rx) = channel::mpsc_bounded::<AofMessage>(10_000)
           thread `aof-writer-{sid}` running
             `per_shard_aof_writer_task(rx, base_dir, sid as u16, fsync, cancel)`
           push tx to senders
       - return `Some(AofWriterPool::per_shard(senders))`

     Else (existing TopLevel path):
       - single `aof-writer` thread running `aof_writer_task`
         against `<dir>/<appendfilename>`
       - return `Some(AofWriterPool::top_level(tx))`

What this does NOT do (deferred)
--------------------------------

  - **Fresh-install PerShard creation.** `AofManifest::initialize()`
    still hardcodes TopLevel; nothing in main.rs constructs a PerShard
    manifest from scratch. The PerShard branch is therefore reachable
    only by:
      a) hand-crafting a v2 manifest (the smoke test below)
      b) future migration logic (RFC step 5/9 territory)
    Until then, runtime behavior under default configurations is
    byte-identical to step 2f-α.

  - **Multi-part AOF replay for multi-shard.** The replay block at
    `main.rs:528` still gates on `num_shards == 1`. Step 4 of the RFC
    closes this. A PerShard manifest with `num_shards >= 2` will spawn
    the writers correctly (smoke verified) and the writers will tail
    the existing incr files, but boot-time replay still warns
    "Multi-part AOF skipped in multi-shard mode".

  - **TopLevel→PerShard auto-migration.** `migrate_top_level_to_per_shard`
    exists in `aof_manifest.rs` (step 1) but is not wired into boot.

  - **AppendSync rendezvous, LSN tagging, cross-shard merge, CRASH-01
    matrix.** Steps 3, 5, 7, 8 of the RFC.

  - **Lifting the `--unsafe-multishard-aof` gate.** Step 9. The L280
    refusal still fires whenever `num_shards >= 2 && appendonly == "yes"`
    unless the operator explicitly opts in.

Manual smoke verification
-------------------------

Built `target/debug/moon` and ran four hand-crafted scenarios from
`/tmp/moon-smoke-*` directories (cleaned up post-run):

  1. **PerShard happy path.** Hand-wrote
       version 2
       seq 1
       shards 2
       shard 0 max_lsn 0
       shard 1 max_lsn 0
     at `appendonlydir/moon.aof.manifest`, created shard-0/ and
     shard-1/ dirs. Started with
       moon --port 16399 --shards 2 --unsafe-multishard-aof
            --appendonly yes --dir <smoke> --appendfsync everysec
     Log output:
       "AOF enabled (PerShard, 2 writers, fsync: EverySec)"
       "AOF writer shard 0: seq 1,
        incr=<smoke>/appendonlydir/shard-0/moon.aof.1.incr.aof"
       "AOF writer shard 1: seq 1,
        incr=<smoke>/appendonlydir/shard-1/moon.aof.1.incr.aof"
     Both per-shard writer tasks reached their per-shard incr files.

  2. **Shard-count mismatch.** Same manifest, started with `--shards 4`.
     Process exited 2 with verbatim:
       "REFUSING TO START: ERR shard count changed (manifest=2,
        config=4); refusing to start to avoid data loss.
        See docs/runbooks/shard-count-change.md"

  3. **Corrupt manifest.** Wrote garbage at the manifest path, started
     with `--shards 1`. Process exited 2 with:
       "REFUSING TO START: AOF manifest at <dir>/appendonlydir/ is
        corrupt: AOF manifest at .../moon.aof.manifest has no valid
        sequence number. Inspect manually before deleting; overwriting
        silently loses data."

  4. **TopLevel regression.** Fresh empty `--dir`, `--shards 1
     --appendonly yes`. Log: "AOF enabled (TopLevel, fsync: EverySec)".
     `initialize()` wrote v1 manifest + seq 1 base/incr. Behavior
     identical to step 2f-α.

Verification
------------

  cargo check on both feature combinations:
    --no-default-features --features runtime-tokio,jemalloc    clean
    (defaults: runtime-monoio,jemalloc,graph,text-index)       clean

  cargo clippy -- -D warnings on both combinations: clean.

  Lib persistence tests:
    tokio:  379 passed (baseline match)
    monoio: 378 passed (baseline match)

Refs
----

  tmp/rfc-per-shard-aof-v02.md (RFC § 3 + § 4)
  Commit 5a546ff (step 2a — AofWriterPool type)
  Commit 3afe21f (step 2b — per_shard_aof_writer_task body)
  Commit 6a758f4 (step 2c — type plumbing aof_tx → aof_pool)
  Commit a05f3d8 (step 2d — handler_monoio migration)
  Commit eb90419 (step 2e-α — handler_sharded migration)
  Commit 5735031 (step 2e-β — BGREWRITEAOF helpers)
  Commit ceac655 (step 2e-γ — handler_single + blocking + inline)
  Commit d9a3651 (step 2e-δ — drop ConnectionContext.aof_tx)
  Commit 4fdd50f (test backfill — unsafe_multishard_aof)
  Commit 8fd769c (step 2f-α — spawn-site type plumbing)

author: Tin Dang
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/command/persistence.rs (1)

34-47: ⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Keep the unsafe-gate message aligned with the actual gate.

The new docs/error text still says this refusal only applies when --disk-offload enable is set, but the PR objective describes the startup/runtime gate as --shards >= 2 plus --appendonly yes. If MULTI_SHARD_AOF_REWRITE_UNSAFE is set under that broader condition, this response tells operators to try the wrong workaround (--disk-offload disable).

Also applies to: 284-287

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/command/persistence.rs` around lines 34 - 47, The refusal
message/documentation around MULTI_SHARD_AOF_REWRITE_UNSAFE is misleading:
update the error/runbook text and any related doc comments (including the
instance at the later lines ~284-287) to reflect the actual gate condition
(shards >= 2 AND --appendonly yes) rather than suggesting it only applies when
--disk-offload enable; search for and edit the strings emitted by
bgrewriteaof_start_sharded and the public comment for
MULTI_SHARD_AOF_REWRITE_UNSAFE to mention the correct combination (shards >= 2 +
appendonly yes) and adjust suggested operator workarounds/runbook pointers
accordingly.
src/server/embedded.rs (1)

123-139: ⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Refuse multi-shard AOF in run_embedded too.

Lines 123-139 still start the legacy top-level AOF writer for any appendonly == "yes", but this path never enforces the new shards >= 2 && !unsafe_multishard_aof gate. That leaves embedded deployments on the known write-loss path this PR is supposed to block. Please reuse the same startup validation here before spawning the writer.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/embedded.rs` around lines 123 - 139, The code unconditionally
starts the top-level AOF writer when config.appendonly == "yes" without checking
multi-shard safety; update the block that spawns the AOF writer in run_embedded
to reuse the same startup validation used elsewhere: check the shard count
(shards) and the unsafe_multishard_aof flag (or config.unsafe_multishard_aof)
and refuse/return an error if shards >= 2 and unsafe_multishard_aof is false,
before creating the channel, token, and calling AofWriterPool::top_level or
spawning aof::aof_writer_task; ensure you mirror the exact error message/flow
used in the other validation path so embedded deployments cannot start the
legacy top-level writer in unsafe multi-shard configurations.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/server/conn/handler_sharded/mod.rs`:
- Around line 1122-1123: The current AOF serialization captures aof_bytes from
the original frame (variable frame) before workspace prefix injection, which
records client-visible keys and can replay to wrong shards; change the logic to
compute aof_bytes from the post-rewrite command used for execution—i.e.,
serialize the actual dispatched command (use dispatch_frame or the rewritten
cmd_args that include the {ws_id} injection) when is_write is true and
ctx.aof_pool.is_some(); keep the same is_write calculation
(metadata::is_write(cmd)) but ensure aof::serialize_command is called on the
final command used for dispatch/execution instead of the pre-injection frame so
AOF reflects the physical stored keys.

---

Outside diff comments:
In `@src/command/persistence.rs`:
- Around line 34-47: The refusal message/documentation around
MULTI_SHARD_AOF_REWRITE_UNSAFE is misleading: update the error/runbook text and
any related doc comments (including the instance at the later lines ~284-287) to
reflect the actual gate condition (shards >= 2 AND --appendonly yes) rather than
suggesting it only applies when --disk-offload enable; search for and edit the
strings emitted by bgrewriteaof_start_sharded and the public comment for
MULTI_SHARD_AOF_REWRITE_UNSAFE to mention the correct combination (shards >= 2 +
appendonly yes) and adjust suggested operator workarounds/runbook pointers
accordingly.

In `@src/server/embedded.rs`:
- Around line 123-139: The code unconditionally starts the top-level AOF writer
when config.appendonly == "yes" without checking multi-shard safety; update the
block that spawns the AOF writer in run_embedded to reuse the same startup
validation used elsewhere: check the shard count (shards) and the
unsafe_multishard_aof flag (or config.unsafe_multishard_aof) and refuse/return
an error if shards >= 2 and unsafe_multishard_aof is false, before creating the
channel, token, and calling AofWriterPool::top_level or spawning
aof::aof_writer_task; ensure you mirror the exact error message/flow used in the
other validation path so embedded deployments cannot start the legacy top-level
writer in unsafe multi-shard configurations.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 717300dd-c55e-4336-83a6-22aca7f3b2c6

📥 Commits

Reviewing files that changed from the base of the PR and between a05f3d8 and 5004f4e.

📒 Files selected for processing (25)
  • src/command/persistence.rs
  • src/main.rs
  • src/server/conn/blocking.rs
  • src/server/conn/core.rs
  • src/server/conn/handler_monoio/dispatch.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/server/conn/handler_sharded/dispatch.rs
  • src/server/conn/handler_sharded/mod.rs
  • src/server/conn/handler_single.rs
  • src/server/conn/tests.rs
  • src/server/conn_state.rs
  • src/server/embedded.rs
  • src/server/listener.rs
  • src/shard/conn_accept.rs
  • src/shard/event_loop.rs
  • tests/ft_search_multi_shard_as_of.rs
  • tests/ft_search_temporal_parity.rs
  • tests/integration.rs
  • tests/kill_snapshot.rs
  • tests/mq_integration.rs
  • tests/replication_test.rs
  • tests/txn_ft_search_snapshot.rs
  • tests/txn_kv_wiring.rs
  • tests/vacuum_commands.rs
  • tests/workspace_integration.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/server/conn_state.rs
  • src/main.rs
  • src/server/conn/handler_monoio/mod.rs
  • src/shard/conn_accept.rs

Comment on lines +1122 to +1123
let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled { metadata::is_write(cmd) } else { false };
let aof_bytes = if is_write && ctx.aof_pool.is_some() { Some(aof::serialize_command(&frame)) } else { None };
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical | ⚡ Quick win

Serialize the rewritten command into AOF.

aof_bytes is captured from frame before workspace prefix injection, but both the local path (cmd_args) and the remote path (dispatch_frame) can execute the {ws_id}-prefixed form. In a workspace session this persists the client-visible key instead of the physical stored key, so AOF replay diverges from live state and can even route the write to the wrong shard.

Suggested fix
-                    let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled { metadata::is_write(cmd) } else { false };
-                    let aof_bytes = if is_write && ctx.aof_pool.is_some() { Some(aof::serialize_command(&frame)) } else { None };
+                    let is_write = if ctx.aof_pool.is_some() || conn.tracking_state.enabled {
+                        metadata::is_write(cmd)
+                    } else {
+                        false
+                    };
+                    let aof_bytes = if is_write && ctx.aof_pool.is_some() {
+                        Some(match rewritten.as_deref() {
+                            Some(rewritten_args) => {
+                                let mut parts = Vec::with_capacity(1 + rewritten_args.len());
+                                parts.push(Frame::BulkString(Bytes::copy_from_slice(cmd)));
+                                parts.extend_from_slice(rewritten_args);
+                                aof::serialize_command(&Frame::Array(parts.into()))
+                            }
+                            None => aof::serialize_command(&frame),
+                        })
+                    } else {
+                        None
+                    };
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/server/conn/handler_sharded/mod.rs` around lines 1122 - 1123, The current
AOF serialization captures aof_bytes from the original frame (variable frame)
before workspace prefix injection, which records client-visible keys and can
replay to wrong shards; change the logic to compute aof_bytes from the
post-rewrite command used for execution—i.e., serialize the actual dispatched
command (use dispatch_frame or the rewritten cmd_args that include the {ws_id}
injection) when is_write is true and ctx.aof_pool.is_some(); keep the same
is_write calculation (metadata::is_write(cmd)) but ensure aof::serialize_command
is called on the final command used for dispatch/execution instead of the
pre-injection frame so AOF reflects the physical stored keys.

…tep 3)

Threads a real `lsn: u64` through every AOF append site and prefixes each
PerShard on-disk entry with `[u64 lsn LE][u32 len LE]` ahead of the
RESP-encoded command, matching RFC § 2 Rule 1 wire format. TopLevel
writers continue to emit plain RESP — the framing change is gated on
layout, so legacy single-file deployments and the embedded/listener
tokio paths are unaffected.

LSN sourcing: a new `ReplicationState::issue_lsn(shard_id, delta)`
helper atomically advances both `shard_offsets[shard_id]` and
`master_repl_offset`, returning the master offset *before* the bump.
Existing `increment_shard_offset` delegates through it so call sites
that previously used the legacy helper are unchanged. AOF write sites
go through a new associated function
`AofWriterPool::issue_append_lsn(repl_state, shard_id, delta)` that
issues an LSN when replication state is configured and returns 0
otherwise — keeping standalone (no-replication) and replica startup
paths working without a behavioural change.

Wire-level changes:
- `AofMessage::Append(Bytes)` → `AofMessage::Append { lsn: u64, bytes: Bytes }`
- `AofWriterPool::try_send_append(shard_id, lsn, bytes)` (new lsn arg)
- TopLevel writer (tokio + monoio): destructures `{ bytes, lsn: _ }` —
  ignores LSN, writes plain RESP exactly as before.
- PerShard writer: writes the 12-byte header then bytes; verified on
  disk via `xxd` — shard 0 entries carry monotonically advancing LSNs
  (0 → 0x69), shard 1 carries its own per-shard sequence (0x46).

Call-site fan-out (every place that constructs or dispatches
`AofMessage::Append`):
- `handler_monoio`, `handler_sharded`: 4 sites each, use
  `AofWriterPool::issue_append_lsn`.
- `handler_single`, `blocking::try_inline_dispatch{,_loop}`: now take
  `&Option<Arc<RwLock<ReplicationState>>>` so the inline AOF path can
  source an LSN; 11 test sites updated to pass `&None` (Rust infers the
  Option type from the slot).
- `drain_pending_appends` (rewrite path): keeps the lsn field, threads
  it through the per-message destructure but never reads it because
  rewrite output is the TopLevel base.rdb/incr.aof file.

Tests:
- 4 existing pool tests updated to the new signature.
- New `per_shard_pool_threads_lsn_field_to_each_writer` test verifies
  the LSN survives the channel hop unmodified for each shard.
- Persistence tests: 379 pass under tokio, 379 under monoio (+1 each).
- Replication tests: 31 pass.
- Full lib tests (tokio): 2752 pass.
- Smoke test on a 2-shard server: PerShard manifest spawns 2 writers,
  framed format verified on disk for both shards; TopLevel regression
  smoke confirms plain RESP at offset 0 with no header bytes.

Rule 3 (single LSN issuance point) limitation — call out explicitly:

Step 3 ships the per-entry framing and monotonic per-shard LSN tagging
that step 4 (per-shard replay) requires. Strict Rule 3 alignment —
making the AOF LSN equal the per-shard replication backlog byte
position for the same write — is NOT achieved by this commit.
SPSC-routed writes hit both `master_repl_offset.fetch_add` at
`spsc_handler.rs:3017` (existing) and at the new AOF write site
(`AofWriterPool::issue_append_lsn`), so master advances twice per such
write. Fix is a single-LSN-issuance-point refactor in v0.2 replication
state; out of step 3 scope. Step 4 only depends on per-shard
monotonicity, which this commit provides and the smoke test confirms.

Refs: tmp/rfc-per-shard-aof-v02.md § 2, § 3
author: Tin Dang
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants