Skip to content

feat: sharded pub/sub + multi-shard scaling optimizations#18

Merged
pilotspacex-byte merged 23 commits intomainfrom
feat/pubsub-sharded
Mar 30, 2026
Merged

feat: sharded pub/sub + multi-shard scaling optimizations#18
pilotspacex-byte merged 23 commits intomainfrom
feat/pubsub-sharded

Conversation

@TinDang97
Copy link
Copy Markdown
Collaborator

@TinDang97 TinDang97 commented Mar 29, 2026

Summary

  • Sharded pub/sub (Phase 56): Enable SUBSCRIBE/PSUBSCRIBE/UNSUBSCRIBE/PUNSUBSCRIBE in sharded mode (was returning ERR). PUBLISH returns accurate cross-shard subscriber count. PUBSUB CHANNELS/NUMSUB/NUMPAT with cross-shard aggregation. Targeted fanout via RemoteSubscriberMap (only fans out to shards with actual subscribers). Monoio handler mirrored.

  • Pub/sub performance (Phase 57): PubSubResponseSlot (AtomicI64 + AtomicWaker) replaces oneshot channels for PUBLISH counting. Arc<RwLock> enables zero-SPSC PUBSUB introspection via direct shared-read. PubSubPublishBatch batches multiple PUBLISHes per target shard into one SPSC push. Arc<RwLock> eliminates PubSubSubscribe/PubSubUnsubscribe SPSC metadata propagation. Pub/sub-aware AffinityTracker routes subscriber connections to preferred shard.

  • Shard-improve foundation merge (Phase 58): Arc with parking_lot::RwLock replaces Rc<RefCell<Vec>> — cross-shard reads bypass SPSC entirely (~88us→56ns). dispatch_read() + read/write split in handlers. ResponseSlotPool + ExecuteSlotted/PipelineBatchSlotted for zero-alloc cross-shard write dispatch. AtomicWaker upgrade for PubSubResponseSlot.

  • Monoio safety (Quick fix): Replace custom AtomicU8 oneshot with flume::bounded(1) for cross-thread safety on monoio's !Send executor. pending_wakers relay pattern for monoio cross-thread wakeup. jemalloc malloc_conf production tuning.

  • Code review fixes: snapshot_metadata TOCTOU (single-pass read), coordinator error propagation, remote_groups selected_db capture, channel.rs Future clone race fix, cargo fmt.

Benchmark Results (PUBLISH, 4 shards)

Config Moon (before) Moon (after) Redis 8.6.1 Moon/Redis
50c, p=1 N/A (ERR) 126,582 105,263 1.20x
50c, p=16 N/A (ERR) 1,234,568 709,220 1.74x
200c, p=1 N/A (ERR) 143,885 143,062 1.01x
200c, p=16 N/A (ERR) 2,066,116 904,159 2.28x

Key Metrics

  • 26 commits, 28 files changed, +3260/-790 lines
  • 1255 tests pass (1144 unit + 106 integration + 5 replication), 0 failures
  • cargo fmt clean, clippy clean (both feature profiles)
  • GETRANGE/SETRANGE from main preserved

Test plan

  • cargo test --no-default-features --features runtime-tokio,jemalloc — 1255 pass
  • cargo clippy -- -D warnings — zero warnings
  • cargo fmt --check — clean
  • redis-cli SUBSCRIBE/PUBLISH/PSUBSCRIBE/UNSUBSCRIBE functional testing (4 shards)
  • redis-benchmark PUBLISH throughput comparison vs Redis 8.6.1
  • PUBSUB CHANNELS/NUMSUB/NUMPAT cross-shard aggregation
  • Subscriber mode command restriction (SET blocked, PING allowed)
  • Disconnect cleanup (subscription count drops to 0)
  • Monoio runtime testing on Linux (CI uses tokio with MOON_NO_URING=1)

Summary by CodeRabbit

  • New Features

    • PUBSUB CHANNELS, PUBSUB NUMSUB and PUBSUB NUMPAT for pub/sub introspection
    • Targeted cross-shard pub/sub with batched remote delivery and accurate remote counts
    • Client affinity routing to prefer the same shard for repeat IPs
    • Subscriber-mode connection path with proper subscribe/unsubscribe semantics and cleanup
  • Bug Fixes

    • Improved error handling for cross-shard multi-delete/exists operations
  • Tests

    • Expanded sharded pub/sub unit and integration tests (channels, counts, patterns)

@coderabbitai
Copy link
Copy Markdown

coderabbitai bot commented Mar 29, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 4c2b776c-4482-4d1b-9934-8f4facc0fdc0

📥 Commits

Reviewing files that changed from the base of the PR and between 1ccf59e and 9d4c608.

📒 Files selected for processing (1)
  • src/command/mod.rs

📝 Walkthrough

Walkthrough

Pre-allocates and shares per-shard PubSubRegistry and RemoteSubscriberMap plus a shared AffinityTracker; threads them into shard event loops, listeners, and connection handlers. Implements cross-shard pub/sub propagation, batched PUBLISH with atomic response aggregation, affinity-based plain-TCP routing, and new pub/sub introspection APIs.

Changes

Cohort / File(s) Summary
Main & Shard Startup
src/main.rs, src/shard/event_loop.rs, tests/integration.rs
Pre-allocate per-shard PubSubRegistry and RemoteSubscriberMap (Arc<RwLock<...>>) and a shared AffinityTracker; clone and pass them into shard.run and listener bootstrap; tests updated for shared state.
Affinity Tracker
src/shard/affinity.rs, src/shard/mod.rs
New AffinityTracker type with register, lookup, remove, bounded eviction, and unit tests; exported from shard module.
Remote Subscriber Map
src/shard/remote_subscriber_map.rs
New RemoteSubscriberMap tracking per-channel and per-pattern shard subscriber sets; APIs: add, remove, shards_for_channel, shards_for_patterns, target_shards; unit tests added.
Dispatch & Response Slot
src/shard/dispatch.rs, src/shard/spsc_handler.rs
Add PubSubResponseSlot (atomics + AtomicWaker) and PubSubResponseFuture; replace PubSubFanOut with PubSubPublish/PubSubPublishBatch; handlers record per-shard counts into slots.
Connection Handlers (Tokio & Monoio)
src/server/conn/handler_sharded.rs, src/server/conn/handler_monoio.rs
Switch pubsub state from Rc<RefCell<_>> to Arc<parking_lot::RwLock<_>>; add parameters: remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps, pubsub_affinity; implement subscriber-mode loop, propagate/unpropagate subscription across shards, batched cross-shard PUBLISH with response aggregation, and disconnect cleanup.
Listener & Accept Paths
src/server/listener.rs, src/shard/conn_accept.rs
run_sharded signatures extended to accept affinity_tracker; plain-TCP routing now consults affinity and falls back to round-robin; connection spawn paths forward shared pubsub/remote/affinity handles; StdRwLock alias adjustments.
PubSub Registry API & Tests
src/pubsub/mod.rs
Added PubSubRegistry::active_channels, PubSubRegistry::numsub, and PubSubRegistry::numpat with unit tests for listing, counts, and pattern filtering.
Server Conn Utilities & Re-exports
src/server/conn/util.rs, src/server/conn/mod.rs
Added propagate_subscription and unpropagate_subscription helpers to update cross-shard RemoteSubscriberMaps and re-exported them from server::conn.
Coordinator & Misc
src/shard/coordinator.rs, src/shard/mod.rs
coordinate_multi_del_or_exists now returns immediately on Frame::Error(_); shard tests updated to use new publish variants and slots.
Tests
tests/integration.rs
Added multi-shard Pub/Sub integration tests verifying subscribe delivery, pattern subscribe, PUBLISH counts, PUBSUB CHANNELS, NUMSUB, NUMPAT, and unsubscribe cleanup.

Sequence Diagram(s)

sequenceDiagram
    participant Client
    participant Listener
    participant AT as AffinityTracker
    participant Shard as SelectedShard

    Client->>Listener: TCP connect (peer_ip)
    Listener->>AT: lookup(peer_ip)
    alt affinity found & valid
        AT-->>Listener: preferred_shard
        Listener->>Shard: route to preferred_shard
    else missing or invalid
        AT-->>Listener: None
        Listener->>Shard: route via round-robin (next_shard)
        Listener->>Listener: advance next_shard
    end
    Shard->>Client: complete handshake
Loading
sequenceDiagram
    participant Client
    participant Handler
    participant RSM as RemoteSubscriberMap
    participant OS1 as OtherShard1
    participant OS2 as OtherShard2
    participant Slot as PubSubResponseSlot

    Client->>Handler: SUBSCRIBE channel_x
    Handler->>Handler: register locally in PubSubRegistry
    Handler->>RSM: propagate_subscription(all_remote_sub_maps, channel_x)
    Handler->>Handler: register affinity(client_ip, shard_id)
    Handler-->>Client: subscribe confirmation

    Client->>Handler: PUBLISH channel_x "msg"
    Handler->>Handler: publish locally -> local_count
    Handler->>RSM: target_shards(channel_x)
    RSM-->>Handler: [shard1, shard2]
    Handler->>Slot: create slot(num_pending=2)
    par deliver to remote shards
        Handler->>OS1: PubSubPublishBatch {pairs, slot}
        Handler->>OS2: PubSubPublishBatch {pairs, slot}
    and remote shards process
        OS1->>OS1: publish pairs -> count1
        OS1->>Slot: add(count1)
        OS2->>OS2: publish pairs -> count2
        OS2->>Slot: add(count2)
    end
    Handler->>Slot: await PubSubResponseFuture
    Slot-->>Handler: aggregated_remote_total
    Handler->>Client: Integer(local_count + aggregated_remote_total)
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

enhancement

Poem

🐰
I stitched the shards so messages hop,
Affinity nudges each eager stop,
Batched publishes race and play,
Counts come back from far away,
A happy rabbit hops away!

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 inconclusive)

Check name Status Explanation Resolution
Description check ❓ Inconclusive The PR description includes a detailed summary of changes across multiple phases, benchmark results, key metrics, and a comprehensive test plan. However, the description template requires specific sections (Checklist with cargo checks, Performance Impact, Notes) that are either missing or incomplete. Complete the description template sections: add checkmarks or status for cargo fmt/clippy/test commands, provide explicit Performance Impact statement (even if 'See Benchmark Results' with cross-reference), and expand Notes section with design decisions or follow-up work.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title 'feat: sharded pub/sub + multi-shard scaling optimizations' accurately describes the main changes: implementation of sharded pub/sub functionality and related performance optimizations across multiple files.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/pubsub-sharded

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

…riberMap

- Add active_channels(), numsub(), numpat() methods to PubSubRegistry
- Create RemoteSubscriberMap for tracking cross-shard subscriber presence
- Register remote_subscriber_map module in shard/mod.rs
- Add unit tests for all new functionality
- Add PubSubSubscribe, PubSubUnsubscribe, PubSubPublish, PubSubIntrospect variants to ShardMessage
- Add PubSubQuery and PubSubQueryResult enums for cross-shard introspection
- Extend drain_spsc_shared with RemoteSubscriberMap parameter
- Add handler arms for all 4 new variants in handle_shard_message_shared
- Create and pass RemoteSubscriberMap in event_loop.rs
- Update existing tests for new function signature
- PUBLISH uses PubSubPublish for cross-shard subscriber counting
- SUBSCRIBE/PSUBSCRIBE propagate metadata to other shards
- Subscriber mode propagates sub/unsub metadata on all operations
- PUBSUB CHANNELS/NUMSUB/NUMPAT with cross-shard aggregation
- Disconnect cleanup propagates unsubscribe for all active subs
…UB introspection in sharded handler

- Add subscriber mode select! loop with SUBSCRIBE/PSUBSCRIBE/UNSUBSCRIBE/PUNSUBSCRIBE
- Fix PUBLISH to return cross-shard total via PubSubPublish oneshot replies
- Add PUBSUB CHANNELS/NUMSUB/NUMPAT with cross-shard aggregation
- Propagate subscription metadata to other shards via PubSubSubscribe/PubSubUnsubscribe
- Clean up subscriptions on disconnect with cross-shard propagation
- Support PING, QUIT, RESET in subscriber mode
- test_sharded_pubsub_subscribe: basic subscribe + receive message
- test_sharded_pubsub_psubscribe: pattern subscribe + receive
- test_sharded_pubsub_publish_count: cross-shard subscriber counting
- test_sharded_pubsub_channels: PUBSUB CHANNELS aggregation
- test_sharded_pubsub_numsub: PUBSUB NUMSUB cross-shard aggregation
- test_sharded_pubsub_numpat: PUBSUB NUMPAT cross-shard aggregation
- test_sharded_pubsub_unsubscribe_cleanup: unsubscribe removes subscribers
Three optimizations that eliminate the PUBLISH performance gap:

1. Targeted fanout via RemoteSubscriberMap — PUBLISH now only fans out
   PubSubPublish to shards that actually have subscribers for the channel,
   instead of broadcasting to all (N-1) shards. When no remote subscribers
   exist (common case), zero SPSC messages are sent.

2. Deferred await for pipelined PUBLISH — Instead of awaiting each
   oneshot reply sequentially within the batch loop, all SPSC pushes
   happen first, then all replies are collected after the batch. This
   allows the target shard to drain multiple publishes in one cycle.

3. RemoteSubscriberMap threaded to connection handlers via Rc<RefCell<>>
   so handlers can read subscription topology without SPSC round-trips.

Results (50c, p=16): 184K → 1.23M ops/s (6.7x improvement, 1.74x Redis)
Results (200c, p=16): 1.19M → 2.06M ops/s (1.73x improvement, 2.28x Redis)
- Add lock-free PubSubResponseSlot struct with AtomicI64/AtomicU32 for
  cross-shard PUBLISH subscriber count accumulation
- Replace oneshot reply_tx in ShardMessage::PubSubPublish with Arc<PubSubResponseSlot>
…ISH path

- Update SPSC handler to call slot.add(count) instead of reply_tx.send(count)
- Replace Vec<OneshotReceiver<i64>> with Arc<PubSubResponseSlot> in handler_sharded.rs
- Update deferred resolution to spin-yield on slot.is_ready() instead of awaiting receivers
- Mirror PubSubResponseSlot pattern in handler_monoio.rs with monoio::time::sleep yield
- Handle push failures by calling slot.add(0) to maintain correct pending count
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 17

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (8)
src/server/listener.rs (1)

278-321: ⚠️ Potential issue | 🟠 Major

TLS accepts still bypass shard affinity.

Both TLS loops keep round-robining connections and never consult affinity_tracker, so TLS pub/sub clients won't get the same-shard reconnect behavior this PR adds for plain TCP.

Also applies to: 474-497

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/listener.rs` around lines 278 - 321, The TLS accept loop currently
load-balances with tls_next_shard/tls_txs and never consults affinity_tracker,
so update the TLS branches that accept connections (the async task using
tls_listener, tls_next_shard, tls_num_shards and tls_txs) to use the same
affinity selection logic as the plain TCP listener: call
affinity_tracker.lookup_or_assign_shard(addr or the same identifying key used
for TCP) to pick the shard (falling back to round-robin if affinity lookup
fails), send the (stream, true) to the selected tx via tx.send_async, and only
fall back to incrementing tls_next_shard when affinity is not available;
replicate the same changes for the other TLS loop occurrence mentioned (lines
~474-497) so TLS clients get the same-shard reconnect behavior.
src/shard/event_loop.rs (2)

506-509: 🛠️ Refactor suggestion | 🟠 Major

Add required // SAFETY: comment for the unsafe block.

Per coding guidelines, every unsafe block MUST have a // SAFETY: comment explaining the invariant.

         #[cfg(all(target_os = "linux", feature = "runtime-tokio"))]
         if let Some(lfd) = uring_listener_fd {
+            // SAFETY: `lfd` is a valid file descriptor obtained from `create_reuseport_listener`
+            // and has not been closed elsewhere. We own the fd and are shutting down this shard.
             unsafe {
                 libc::close(lfd);
             }
         }

As per coding guidelines: "every unsafe block MUST have a // SAFETY: comment explaining the invariant".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/event_loop.rs` around lines 506 - 509, The unsafe block calling
libc::close(lfd) lacks the required safety justification; add a `// SAFETY:`
comment immediately above the unsafe block in event_loop.rs explaining the
invariant that makes calling libc::close safe (e.g., that `lfd` is a valid, open
file descriptor owned by this thread, will not be used after close, and no
concurrent close occurs), referencing the `lfd` variable and the `libc::close`
call so reviewers can verify the guarantee.

8-8: 🛠️ Refactor suggestion | 🟠 Major

Use parking_lot::RwLock instead of std::sync::RwLock.

The coding guidelines require using parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks. This also eliminates the need for .unwrap() calls since parking_lot doesn't poison.

-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
+use parking_lot::RwLock;

As per coding guidelines: "Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks" and "Replace .read().unwrap() / .write().unwrap() with .read() / .write() (parking_lot doesn't poison)".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/event_loop.rs` at line 8, Replace the std::sync RwLock with
parking_lot::RwLock: change the import to keep Arc but import RwLock from
parking_lot (e.g., use std::sync::Arc; use parking_lot::RwLock) and then remove
any .read().unwrap() / .write().unwrap() call sites in this module
(src/shard/event_loop.rs) replacing them with .read() / .write() since
parking_lot locks do not poison; ensure types using RwLock (variables, function
signatures, and usages in functions like the event loop handlers) still compile
with the new import.
src/server/conn/handler_monoio.rs (2)

137-137: ⚠️ Potential issue | 🟠 Major

Avoid allocation inside subscriber mode hot loop.

vec![0u8; 8192] allocates a new buffer on every subscriber loop iteration. This should reuse a pre-allocated buffer like tmp_buf or a dedicated subscriber buffer.

+    // Pre-allocate subscriber read buffer outside the loop
+    let mut sub_tmp_buf = vec![0u8; 8192];
+
     loop {
         // Subscriber mode: bidirectional select on client commands + published messages
         if subscription_count > 0 {
             let rx = pubsub_rx.as_ref().unwrap();
-            let sub_tmp_buf = vec![0u8; 8192];
+            // Reuse pre-allocated buffer; restore length if needed
+            if sub_tmp_buf.len() < 8192 {
+                sub_tmp_buf.resize(8192, 0);
+            }
             monoio::select! {

Based on learnings: "No Box::new(), Vec::new(), String::new(), Arc::new(), clone(), format!(), or to_string() allocations on hot paths".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio.rs` at line 137, The subscriber loop currently
allocates a new buffer each iteration with let sub_tmp_buf = vec![0u8; 8192]; —
move this allocation out of the hot loop and reuse a pre-allocated buffer (e.g.,
the existing tmp_buf or a dedicated Vec<u8> created before entering the
subscriber loop). Replace per-iteration vec! allocation with reusing a &mut [u8]
or a mutable Vec<u8> (e.g., sub_tmp_buf declared once outside the loop and
cleared/filled as needed) so no heap allocation happens inside the hot path in
handler_monoio.rs.

14-14: 🛠️ Refactor suggestion | 🟠 Major

Use parking_lot::RwLock instead of std::sync::RwLock.

This file has numerous .write().unwrap() calls that would be simplified to .write() after switching to parking_lot.

-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
+use parking_lot::RwLock;

As per coding guidelines: "Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio.rs` at line 14, Replace the std::sync RwLock
import and usages with parking_lot's RwLock: change the import line using
std::sync::{Arc, RwLock} so Arc remains from std::sync::Arc and RwLock comes
from parking_lot::RwLock, then update all lock calls in handler_monoio.rs (e.g.,
occurrences of .write().unwrap(), .read().unwrap()) to use parking_lot's API
(.write(), .read()) without unwraps; ensure you keep Arc where used and run
cargo check to verify no other std::sync locks remain.
src/shard/conn_accept.rs (1)

8-8: 🛠️ Refactor suggestion | 🟠 Major

Use parking_lot::RwLock instead of std::sync::RwLock.

-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
+use parking_lot::RwLock;

As per coding guidelines: "Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/conn_accept.rs` at line 8, Replace the standard library RwLock
import with parking_lot's implementation: change the use declaration from
std::sync::{Arc, RwLock} to import Arc from std::sync and RwLock from
parking_lot (i.e., keep Arc but swap RwLock to parking_lot::RwLock), and update
any type usages such as Arc<RwLock<...>> in this file (e.g. in functions or
structs referencing RwLock) to use the parking_lot RwLock.
src/shard/spsc_handler.rs (1)

8-8: 🛠️ Refactor suggestion | 🟠 Major

Use parking_lot::RwLock instead of std::sync::RwLock.

-use std::sync::{Arc, RwLock};
+use std::sync::Arc;
+use parking_lot::RwLock;

As per coding guidelines: "Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/spsc_handler.rs` at line 8, Replace the standard library RwLock
import with parking_lot's implementation: change the current import that brings
in std::sync::RwLock (the line using Arc and RwLock) to import Arc from
std::sync and RwLock from parking_lot (e.g., use std::sync::Arc; use
parking_lot::RwLock;), then ensure all usages of RwLock in this module (symbols
named RwLock and Arc) continue to compile without API changes.
src/server/conn/handler_sharded.rs (1)

57-82: ⚠️ Potential issue | 🟠 Major

Replace all std::sync::RwLock with parking_lot::RwLock and remove .unwrap() calls.

The handler uses std::sync::RwLock for shared state (lines 57-82, 130-155 parameters) with extensive .read().unwrap() and .write().unwrap() call sites throughout the handler. The coding guidelines require parking_lot::RwLock (no poisoning semantics) with .read() / .write() (no unwrap). This affects pubsub_registry, acl_table, affinity_tracker, remote_subscriber_map, runtime_config, cluster_state, and repl_state. Import parking_lot and refactor all lock calls accordingly.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 57 - 82, Replace the uses of
std::sync::RwLock with parking_lot::RwLock for all affected fields
(pubsub_registry, acl_table, affinity_tracker, remote_subscriber_map,
runtime_config, cluster_state, repl_state) in handler_sharded.rs and add the
parking_lot import; then remove any .read().unwrap() and .write().unwrap() call
sites throughout the handler and replace them with the parking_lot equivalents
(.read() / .write()) so code uses the non-poisoning locks (e.g., change calls
around PubSubRegistry, AclTable, AffinityTracker, RemoteSubscriberMap,
RuntimeConfig, ClusterState, ReplicationState accessors to use .read()/.write()
without unwrap). Ensure types in function signatures (ShardHandler struct fields
and any parameter types) are updated to Arc<RwLock<...>> where RwLock is
parking_lot::RwLock and update all call sites accordingly.
🧹 Nitpick comments (4)
src/main.rs (1)

176-197: Replace std::sync::RwLock with parking_lot::RwLock and remove .unwrap() calls.

The coding guidelines specify using parking_lot::RwLock instead of std::sync locks for better performance and to avoid poisoning handling. Update all three shared structures:

  • all_pubsub_registries: Used with .read().unwrap() in pubsub introspection (handler_sharded.rs:1071–1099, handler_monoio.rs:1112–1144) and initialization (event_loop.rs:164)
  • all_remote_sub_maps: Used with .write().unwrap() for subscription propagation (handler_sharded.rs:246, 284, 1034)
  • affinity_tracker: Used with .read().unwrap() / .write().unwrap() for connection routing (listener.rs:343, 448, 525; handler_sharded.rs:240, 278, 1028, 1442; handler_monoio.rs:188, 286, 1832)

Lock scopes are short and not held across .await points, so migration is straightforward: change std::sync::Arc<std::sync::RwLock<T>> to std::sync::Arc<parking_lot::RwLock<T>> and replace .read().unwrap() / .write().unwrap() with .read() / .write().

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 176 - 197, Replace std::sync::RwLock with
parking_lot::RwLock for the shared structures all_pubsub_registries,
all_remote_sub_maps, and affinity_tracker: change their types from
std::sync::Arc<std::sync::RwLock<T>> to std::sync::Arc<parking_lot::RwLock<T>>
and remove any subsequent .read().unwrap() / .write().unwrap() usages by calling
.read() / .write() instead (update callers referenced in handler_sharded.rs,
handler_monoio.rs, listener.rs and event_loop.rs accordingly); ensure you import
parking_lot::RwLock and keep the Arc wrappers unchanged.
src/command/string.rs (1)

518-680: Please move the range-string handlers into a submodule.

string.rs is already well past the repo threshold for a single command file, and adding GETRANGE/SETRANGE plus tests here keeps pushing more unrelated paths into one compilation unit.

As per coding guidelines No single .rs file should exceed 1500 lines; split into submodules if approaching this limit and Command implementations for a single Redis command group can be larger, but split read/write operations into separate files when exceeding 1000 lines.

Also applies to: 1589-1787

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/command/string.rs` around lines 518 - 680, The file is too large and the
GETRANGE/SETRANGE handlers should live in a submodule: extract getrange,
getrange_readonly, getrange_slice and setrange (and any helper imports they
need) into a new submodule (e.g., string::range), add pub(crate) (or pub)
visibility as needed, add mod range; in src/command/string.rs replace the moved
definitions with use crate::command::string::range::{getrange,
getrange_readonly, setrange} (and getrange_slice if needed) and update any call
sites/tests that referred to the originals; ensure you copy over required
imports (Database, Frame, Entry, Bytes, parse_i64, extract_bytes,
err_wrong_args, etc.), preserve logic exactly, and run tests to confirm no
missing references.
tests/integration.rs (1)

4808-5072: Please split Phase 56 into a dedicated test module.

tests/integration.rs is already far beyond the repo’s size limit, and adding another phase here makes failures harder to isolate and maintain.

As per coding guidelines No single .rs file should exceed 1500 lines; split into submodules if approaching this limit.

src/server/conn/blocking.rs (1)

770-776: Consider using itoa to avoid allocation in integer formatting.

format_blocking_score uses format!() which allocates. While this is only called on successful blocking command completion (not per-poll), using itoa would be consistent with other hot-path code in the codebase.

♻️ Optional refactor using itoa
 /// Format a float score the same way Redis does (integer if whole, otherwise full precision).
-pub(crate) fn format_blocking_score(score: f64) -> String {
+pub(crate) fn format_blocking_score(score: f64, buf: &mut itoa::Buffer) -> &str {
     if score == score.floor() && score.abs() < i64::MAX as f64 {
-        format!("{}", score as i64)
+        buf.format(score as i64)
     } else {
-        format!("{}", score)
+        // For non-integer scores, we still need to use format!
+        // This path is less common
+        // Consider using ryu crate for zero-alloc float formatting
+        buf.format(score as i64) // placeholder - needs proper float handling
     }
 }

Based on learnings: "No Box::new(), Vec::new(), String::new(), Arc::new(), clone(), format!(), or to_string() allocations on hot paths".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/blocking.rs` around lines 770 - 776, Replace the
allocation-heavy format!() in format_blocking_score with itoa for the integer
branch: import itoa::Buffer, create a Buffer, call buffer.format(score as i64)
to get the &str, and return that owned (e.g. .to_owned()) instead of using
format!("{}", score as i64); leave the existing float branch as-is. Update the
function format_blocking_score to use the itoa Buffer for the integer path to
avoid the format! allocation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/command/mod.rs`:
- Around line 31-73: is_dispatch_read_supported is missing the GETRANGE entry so
GETRANGE and SUBSTR get routed to the write path; update
is_dispatch_read_supported to include the (8, b'g') match arm for GETRANGE and
annotate or ensure the existing (6, b's') arm covers SUBSTR (add a comment like
// SUBSTR) so the function stays in sync with dispatch_read(); reference
function is_dispatch_read_supported and dispatch_read and the key symbols
GETRANGE, SUBSTR, (8, b'g'), (6, b's').

In `@src/main.rs`:
- Around line 9-12: The exported static MALLOC_CONF uses #[unsafe(export_name =
"malloc_conf")] but lacks the required // SAFETY: comment; add a short SAFETY
comment directly above the static explaining why this unsafe export is sound
(e.g., that the byte string is a null-terminated static C-compatible
configuration string, it lives for the program lifetime, and exposing it under
the "malloc_conf" symbol matches jemalloc's expected ABI and does not violate
aliasing or mutability guarantees), leaving the attribute and static initializer
unchanged so MALLOC_CONF and its export remain intact.

In `@src/runtime/channel.rs`:
- Around line 95-103: The poll implementation for the Future uses
recv_fut.as_mut().unwrap(), which can panic if the state machine drifts; replace
the unwrap with a pattern match (e.g., match self.recv_fut.as_mut()) and if it's
Some(fut) poll that future, otherwise return Poll::Ready(Err(RecvError)) to
preserve the existing error path. Update the poll method (function name: poll)
to handle recv_fut being None without panicking, referencing the fields
recv_fut, rx and the RecvError return value.

In `@src/server/conn/handler_sharded.rs`:
- Around line 376-382: Create a centralized cleanup helper (or RAII guard) that
accepts subscriber_id and the shared registries (pubsub_registry,
all_remote_sub_maps, affinity_tracker) and performs unsubscribe_all,
punsubscribe_all, and any propagation/removal logic; replace the inline calls in
the RESET branch (currently calling
pubsub_registry.write().unwrap().unsubscribe_all / punsubscribe_all and
discarding results) and call this helper instead, and ensure every early-return
path (including the returns at the spots noted around lines with direct return
397 and 1043 and the block 1403-1443) invokes the same helper so remote
subscription maps and affinity_tracker are updated consistently before the
socket handler exits.
- Around line 1283-1300: remote_groups is currently keyed only by target and
later collapses all entries to the first entry's db (entries.first().map(...) ->
batch_db), causing commands queued with different selected_db values to be sent
with the wrong DB; change the grouping so keys are (target, db_index) instead of
just target (or split each target's entries by db before Phase 2), preserving
the original enqueue order when pushing into remote_groups, then use that
(target, db) key to compute slot_ptr via response_pool.slot_ptr(target) and
construct ShardMessage::PipelineBatchSlotted with the correct batch_db for each
group so each batch is sent to the proper database.
- Around line 312-325: The unsubscribe loop currently decrements
subscription_count and removes remote entries unconditionally; change it to only
decrement and remove when the unsubscribe actually removed a subscription by
checking the result of pubsub_registry.write().unwrap().unsubscribe (and/or
re-evaluating total_subscription_count(subscriber_id)) before calling
subscription_count = subscription_count.saturating_sub(1) and before calling
all_remote_sub_maps[target].write().unwrap().remove(...). Specifically, in the
block that iterates cmd_args (and the similar blocks at the other noted ranges),
capture the unsubscribe call's return (or call
total_subscription_count(subscriber_id) after attempting unsubscribe) and only
serialize/send unsubscribe_response and propagate remote removals when the
registry reported an actual removal; this prevents decrementing to zero or
leaving subscriber mode incorrectly.
- Around line 203-205: When switching between normal and subscriber modes you
must preserve any already-parsed-but-unprocessed frames and any remaining bytes
instead of dropping them; change the mode-switch paths that currently
break/continue (the branches using subscription_count, pubsub_rx, and the
tokio::select blocks) to capture the unconsumed tail from batch and the
remaining bytes from read_buf (e.g., create a pending_tail/pending_bytes
variable or reinsert the leftover frames/bytes back into the parser) and hand
that off to the new mode loop so those commands (PING after
SUBSCRIBE/UNSUBSCRIBE, etc.) are not lost; locate and update the logic around
variables batch, read_buf, subscription_count and the tokio::select/handler
blocks referenced in the comment so the code reuses or re-queues leftover frames
instead of dropping them.
- Around line 1332-1352: The retry handling for failed shard queue pushes is
missing: when producers[idx].try_push(batch_msg) fails you must not call
slot.add(0) and drop the batch; instead implement the same retry loop used
earlier (drop any mutable borrow, yield to allow the queue to drain, then retry
try_push until success), so move creation of batch_msg and slot/counts as you
have, then on push failure release locks/borrows, tokio::task::yield_now().await
(or equivalent used in this file), and retry pushing; once try_push succeeds
notify spsc_notifiers[target].notify_one(), and only call slot.add(0) when you
have a definitive failure condition after exhausting retries (if any) consistent
with the pattern around ChannelMesh::target_index and
ShardMessage::PubSubPublishBatch.

In `@src/server/listener.rs`:
- Around line 254-255: The affinity_tracker is currently an
Arc<std::sync::RwLock<crate::shard::affinity::AffinityTracker>> and the code
calls .read().unwrap(), which risks panic on poisoned locks and violates
no-unwrap rules; change the type to
Arc<parking_lot::RwLock<crate::shard::affinity::AffinityTracker>> (add
parking_lot to Cargo.toml if missing) and replace all .read().unwrap() /
.write().unwrap() uses (e.g., sites referencing affinity_tracker,
AffinityTracker access in the accept/handler logic) with parking_lot's .read() /
.write() which return guards without Result, eliminating unwraps and poisoning
behavior; ensure imports (use parking_lot::RwLock) are added and remove any
unwrap/error handling around poisoned lock assumptions.

In `@src/server/response_slot.rs`:
- Around line 38-45: The race comes from unsynchronized writes to the
UnsafeCell: make the state transitions actually synchronize data writes by (1)
changing fill() to read the state with Acquire (e.g.,
state.load(Ordering::Acquire)) and only proceed to write self.data after that
Acquire check, and (2) changing reset() so it writes into self.data first and
then stores EMPTY with Release (i.e., move the state.store(EMPTY, Release) to
after the write). Update any debug-asserts that assumed the old order (e.g.,
assertions in fill() and poll_take()) to use the Acquire/Release checks; this
ensures the Release store in reset() pairs with the Acquire load in fill() and
prevents the data race on ResponseSlot::data.
- Around line 133-149: The raw pointer to ResponseSlot can outlive the owning
pool; wrap the pool in Arc<ResponseSlotPool> and thread that Arc into any
dispatched messages/futures so the pool lives until all queued shard messages
are processed: change the owner creation (in the connection handler that
currently creates a local ResponseSlotPool) to create an Arc, update slot_ptr()
(and whatever builds ShardMessage) to carry/clone an Arc<ResponseSlotPool> (or
an Arc-wrapped slot reference) instead of a bare pointer, store that Arc inside
ResponseSlotFuture (so ResponseSlotFuture holds an Arc<ResponseSlotPool> plus
the slot index/pointer) and remove the incorrect lifetime SAFETY claim, and
update the unsafe impl Send for ResponseSlotFuture (and ShardMessage) to
document that the Arc ensures the pool outlives dispatched messages;
alternatively, if you prefer the other approach, drain dispatch_tx before
dropping the local pool in handle_connection_sharded_inner.

In `@src/shard/affinity.rs`:
- Around line 18-25: AffinityTracker currently maps IpAddr -> (shard_id,
last_activity_counter) so multiple concurrent connections from the same IP
overwrite each other and remove() clears affinity for all connections; change
the entries storage to keep per-connection records keyed under the IP (e.g.,
entries: HashMap<IpAddr, HashMap<ConnectionId, (usize, u64)>> or HashMap<IpAddr,
Vec<(ConnectionId, usize, u64)>>), update methods that touch entries (the
insertion/update logic that sets entries[ip], the remove() method, and any LRU
eviction logic) to operate on the specific ConnectionId instead of the whole IP
bucket, and ensure remove() only deletes the specific connection entry and only
removes the outer IP key when its per-connection map becomes empty; refer to
AffinityTracker, entries, remove(), and the eviction/counter handling when
making these changes.

In `@src/shard/event_loop.rs`:
- Around line 163-166: The code uses std RwLock `.write().unwrap()` calls (e.g.,
around pubsub_arc and accesses to self.pubsub_registry) which must be replaced
after switching to parking_lot; remove the `.unwrap()` and call `.write()` (or
`.read()`) directly so you get the parking_lot guard type (e.g., replace
`pubsub_arc.write().unwrap()` with `pubsub_arc.write()` and similarly for all
other occurrences such as the accesses that touch `pubsub_registry`, the pubsub
Arc writes, and the read locks at the other noted sites); ensure the module
imports use `parking_lot::RwLock` so the returned guard does not require
unwrapping and update any code that assumed a Result accordingly.

In `@src/shard/remote_subscriber_map.rs`:
- Around line 15-18: Replace the per-channel/pattern HashSet with a per-shard
reference-count map: change the types channels and patterns from HashMap<Bytes,
HashSet<usize>> to HashMap<Bytes, HashMap<usize, u32>>; in add() increment the
counter for the shard (insert 1 if absent, otherwise +=1); in
remove()/unsubscribe_all() decrement the counter for that shard and only remove
the shard entry when the count reaches zero, and only remove the channel/pattern
key when its inner map becomes empty; also update any iteration over
channels/patterns (e.g., when computing target shards for PUBLISH) to iterate
the keys of the inner HashMap instead of a HashSet.

In `@src/shard/shared_databases.rs`:
- Around line 20-31: The constructor ShardDatabases::new currently takes
db_count from the first shard and can hide inconsistent shard lengths; update
new to validate that every Vec<Database> in shard_databases has the same length
as the first (db_count) and reject/misuse early (panic or return an error) with
a clear message if any shard differs so read_db() and snapshot_metadata() cannot
panic or drop entries; locate the new function and add a check that iterates
shard_databases (or use all(|v| v.len() == db_count)) before building shards and
only proceed when all shard lengths match, mentioning ShardDatabases::new,
read_db(), and snapshot_metadata() in the error message for clarity.

In `@tests/integration.rs`:
- Around line 4995-5013: The NUMSUB assertions currently use lax `if let
redis::Value::Int(...)` checks on `result[1]`, `result[3]`, `result[5]` which
silently do nothing if the reply shape or types are wrong; change these to an
explicit structural match of `result` (e.g., match or assert_matches) to require
the exact pair layout [BulkString(_), Int(a), BulkString(_), Int(b),
BulkString(_), Int(c)] and panic otherwise, then assert `a == 2`, `b == 1`, `c
== 0`; update the code that references `result` and `redis::Value::Int` so a
malformed reply fails the test instead of being ignored.
- Around line 4826-4836: The test currently asserts receivers >= 1 after calling
redis::cmd("PUBLISH") into "test-channel" which hides double-counting or stale
sharded bookkeeping; change the assertion to assert_eq!(receivers, 1, "PUBLISH
should return exactly 1 subscriber, got {}", receivers) for this publish call
(the variable receivers from the redis::cmd("PUBLISH").query_async(...) result)
and make the same replacement for the other analogous block around lines
4867-4873 so both one-subscriber cases assert exactly 1.

---

Outside diff comments:
In `@src/server/conn/handler_monoio.rs`:
- Line 137: The subscriber loop currently allocates a new buffer each iteration
with let sub_tmp_buf = vec![0u8; 8192]; — move this allocation out of the hot
loop and reuse a pre-allocated buffer (e.g., the existing tmp_buf or a dedicated
Vec<u8> created before entering the subscriber loop). Replace per-iteration vec!
allocation with reusing a &mut [u8] or a mutable Vec<u8> (e.g., sub_tmp_buf
declared once outside the loop and cleared/filled as needed) so no heap
allocation happens inside the hot path in handler_monoio.rs.
- Line 14: Replace the std::sync RwLock import and usages with parking_lot's
RwLock: change the import line using std::sync::{Arc, RwLock} so Arc remains
from std::sync::Arc and RwLock comes from parking_lot::RwLock, then update all
lock calls in handler_monoio.rs (e.g., occurrences of .write().unwrap(),
.read().unwrap()) to use parking_lot's API (.write(), .read()) without unwraps;
ensure you keep Arc where used and run cargo check to verify no other std::sync
locks remain.

In `@src/server/conn/handler_sharded.rs`:
- Around line 57-82: Replace the uses of std::sync::RwLock with
parking_lot::RwLock for all affected fields (pubsub_registry, acl_table,
affinity_tracker, remote_subscriber_map, runtime_config, cluster_state,
repl_state) in handler_sharded.rs and add the parking_lot import; then remove
any .read().unwrap() and .write().unwrap() call sites throughout the handler and
replace them with the parking_lot equivalents (.read() / .write()) so code uses
the non-poisoning locks (e.g., change calls around PubSubRegistry, AclTable,
AffinityTracker, RemoteSubscriberMap, RuntimeConfig, ClusterState,
ReplicationState accessors to use .read()/.write() without unwrap). Ensure types
in function signatures (ShardHandler struct fields and any parameter types) are
updated to Arc<RwLock<...>> where RwLock is parking_lot::RwLock and update all
call sites accordingly.

In `@src/server/listener.rs`:
- Around line 278-321: The TLS accept loop currently load-balances with
tls_next_shard/tls_txs and never consults affinity_tracker, so update the TLS
branches that accept connections (the async task using tls_listener,
tls_next_shard, tls_num_shards and tls_txs) to use the same affinity selection
logic as the plain TCP listener: call
affinity_tracker.lookup_or_assign_shard(addr or the same identifying key used
for TCP) to pick the shard (falling back to round-robin if affinity lookup
fails), send the (stream, true) to the selected tx via tx.send_async, and only
fall back to incrementing tls_next_shard when affinity is not available;
replicate the same changes for the other TLS loop occurrence mentioned (lines
~474-497) so TLS clients get the same-shard reconnect behavior.

In `@src/shard/conn_accept.rs`:
- Line 8: Replace the standard library RwLock import with parking_lot's
implementation: change the use declaration from std::sync::{Arc, RwLock} to
import Arc from std::sync and RwLock from parking_lot (i.e., keep Arc but swap
RwLock to parking_lot::RwLock), and update any type usages such as
Arc<RwLock<...>> in this file (e.g. in functions or structs referencing RwLock)
to use the parking_lot RwLock.

In `@src/shard/event_loop.rs`:
- Around line 506-509: The unsafe block calling libc::close(lfd) lacks the
required safety justification; add a `// SAFETY:` comment immediately above the
unsafe block in event_loop.rs explaining the invariant that makes calling
libc::close safe (e.g., that `lfd` is a valid, open file descriptor owned by
this thread, will not be used after close, and no concurrent close occurs),
referencing the `lfd` variable and the `libc::close` call so reviewers can
verify the guarantee.
- Line 8: Replace the std::sync RwLock with parking_lot::RwLock: change the
import to keep Arc but import RwLock from parking_lot (e.g., use std::sync::Arc;
use parking_lot::RwLock) and then remove any .read().unwrap() /
.write().unwrap() call sites in this module (src/shard/event_loop.rs) replacing
them with .read() / .write() since parking_lot locks do not poison; ensure types
using RwLock (variables, function signatures, and usages in functions like the
event loop handlers) still compile with the new import.

In `@src/shard/spsc_handler.rs`:
- Line 8: Replace the standard library RwLock import with parking_lot's
implementation: change the current import that brings in std::sync::RwLock (the
line using Arc and RwLock) to import Arc from std::sync and RwLock from
parking_lot (e.g., use std::sync::Arc; use parking_lot::RwLock;), then ensure
all usages of RwLock in this module (symbols named RwLock and Arc) continue to
compile without API changes.

---

Nitpick comments:
In `@src/command/string.rs`:
- Around line 518-680: The file is too large and the GETRANGE/SETRANGE handlers
should live in a submodule: extract getrange, getrange_readonly, getrange_slice
and setrange (and any helper imports they need) into a new submodule (e.g.,
string::range), add pub(crate) (or pub) visibility as needed, add mod range; in
src/command/string.rs replace the moved definitions with use
crate::command::string::range::{getrange, getrange_readonly, setrange} (and
getrange_slice if needed) and update any call sites/tests that referred to the
originals; ensure you copy over required imports (Database, Frame, Entry, Bytes,
parse_i64, extract_bytes, err_wrong_args, etc.), preserve logic exactly, and run
tests to confirm no missing references.

In `@src/main.rs`:
- Around line 176-197: Replace std::sync::RwLock with parking_lot::RwLock for
the shared structures all_pubsub_registries, all_remote_sub_maps, and
affinity_tracker: change their types from std::sync::Arc<std::sync::RwLock<T>>
to std::sync::Arc<parking_lot::RwLock<T>> and remove any subsequent
.read().unwrap() / .write().unwrap() usages by calling .read() / .write()
instead (update callers referenced in handler_sharded.rs, handler_monoio.rs,
listener.rs and event_loop.rs accordingly); ensure you import
parking_lot::RwLock and keep the Arc wrappers unchanged.

In `@src/server/conn/blocking.rs`:
- Around line 770-776: Replace the allocation-heavy format!() in
format_blocking_score with itoa for the integer branch: import itoa::Buffer,
create a Buffer, call buffer.format(score as i64) to get the &str, and return
that owned (e.g. .to_owned()) instead of using format!("{}", score as i64);
leave the existing float branch as-is. Update the function format_blocking_score
to use the itoa Buffer for the integer path to avoid the format! allocation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 247c3c47-820a-4dde-8099-b0850b06f8e2

📥 Commits

Reviewing files that changed from the base of the PR and between 6548fa3 and b3a88e0.

📒 Files selected for processing (30)
  • src/cluster/migration.rs
  • src/command/metadata.rs
  • src/command/mod.rs
  • src/command/string.rs
  • src/main.rs
  • src/persistence/snapshot.rs
  • src/pubsub/mod.rs
  • src/runtime/channel.rs
  • src/server/conn/blocking.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_sharded.rs
  • src/server/conn/shared.rs
  • src/server/conn/tests.rs
  • src/server/conn_state.rs
  • src/server/listener.rs
  • src/server/mod.rs
  • src/server/response_slot.rs
  • src/shard/affinity.rs
  • src/shard/conn_accept.rs
  • src/shard/coordinator.rs
  • src/shard/dispatch.rs
  • src/shard/event_loop.rs
  • src/shard/mod.rs
  • src/shard/persistence_tick.rs
  • src/shard/remote_subscriber_map.rs
  • src/shard/shared_databases.rs
  • src/shard/spsc_handler.rs
  • src/shard/timers.rs
  • src/shard/uring_handler.rs
  • tests/integration.rs

src/main.rs Outdated
Comment on lines +9 to +12
#[cfg(feature = "jemalloc")]
#[unsafe(export_name = "malloc_conf")]
static MALLOC_CONF: &[u8] =
b"background_thread:true,metadata_thp:always,dirty_decay_ms:3000,muzzy_decay_ms:5000\0";
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing // SAFETY: comment for unsafe export.

Per coding guidelines, every unsafe block must have a // SAFETY: comment explaining the invariant. This static export for jemalloc configuration needs documentation.

📝 Proposed fix
 #[cfg(feature = "jemalloc")]
+// SAFETY: This static exports a null-terminated configuration string for jemalloc.
+// The string is valid UTF-8, null-terminated, and lives for the entire program duration.
 #[unsafe(export_name = "malloc_conf")]
 static MALLOC_CONF: &[u8] =
     b"background_thread:true,metadata_thp:always,dirty_decay_ms:3000,muzzy_decay_ms:5000\0";
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#[cfg(feature = "jemalloc")]
#[unsafe(export_name = "malloc_conf")]
static MALLOC_CONF: &[u8] =
b"background_thread:true,metadata_thp:always,dirty_decay_ms:3000,muzzy_decay_ms:5000\0";
#[cfg(feature = "jemalloc")]
// SAFETY: This static exports a null-terminated configuration string for jemalloc.
// The string is valid UTF-8, null-terminated, and lives for the entire program duration.
#[unsafe(export_name = "malloc_conf")]
static MALLOC_CONF: &[u8] =
b"background_thread:true,metadata_thp:always,dirty_decay_ms:3000,muzzy_decay_ms:5000\0";
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/main.rs` around lines 9 - 12, The exported static MALLOC_CONF uses
#[unsafe(export_name = "malloc_conf")] but lacks the required // SAFETY:
comment; add a short SAFETY comment directly above the static explaining why
this unsafe export is sound (e.g., that the byte string is a null-terminated
static C-compatible configuration string, it lives for the program lifetime, and
exposing it under the "malloc_conf" symbol matches jemalloc's expected ABI and
does not violate aliasing or mutability guarantees), leaving the attribute and
static initializer unchanged so MALLOC_CONF and its export remain intact.

Comment on lines +312 to +325
} else {
for arg in cmd_args {
if let Some(ch) = extract_bytes(arg) {
{ pubsub_registry.write().unwrap().unsubscribe(ch.as_ref(), subscriber_id); }
subscription_count = subscription_count.saturating_sub(1);
for target in 0..num_shards {
if target == shard_id { continue; }
all_remote_sub_maps[target].write().unwrap().remove(&ch, shard_id, false);
}
write_buf.clear();
crate::protocol::serialize(&pubsub::unsubscribe_response(&ch, subscription_count), &mut write_buf);
if stream.write_all(&write_buf).await.is_err() { sub_break = true; break; }
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Only decrement the subscription counter when an unsubscribe actually removed something.

These branches saturating_sub(1) and propagate a remote removal unconditionally. UNSUBSCRIBE / PUNSUBSCRIBE for an unknown channel or duplicate arg can drive subscription_count to zero while the registry still has live subscriptions, and Line 401 then exits subscriber mode so the client stops receiving messages.

Please gate the decrement/removal on the registry result, or recompute total_subscription_count(subscriber_id) before deciding to leave subscriber mode.

Also applies to: 347-359, 394-401

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 312 - 325, The unsubscribe
loop currently decrements subscription_count and removes remote entries
unconditionally; change it to only decrement and remove when the unsubscribe
actually removed a subscription by checking the result of
pubsub_registry.write().unwrap().unsubscribe (and/or re-evaluating
total_subscription_count(subscriber_id)) before calling subscription_count =
subscription_count.saturating_sub(1) and before calling
all_remote_sub_maps[target].write().unwrap().remove(...). Specifically, in the
block that iterates cmd_args (and the similar blocks at the other noted ranges),
capture the unsubscribe call's return (or call
total_subscription_count(subscriber_id) after attempting unsubscribe) and only
serialize/send unsubscribe_response and propagate remote removals when the
registry reported an actual removal; this prevents decrementing to zero or
leaving subscriber mode incorrectly.

Comment on lines +20 to +31
pub fn new(shard_databases: Vec<Vec<Database>>) -> Arc<Self> {
let num_shards = shard_databases.len();
let db_count = shard_databases.first().map_or(0, |v| v.len());
let shards = shard_databases
.into_iter()
.map(|dbs| dbs.into_iter().map(RwLock::new).collect())
.collect();
Arc::new(Self {
shards,
num_shards,
db_count,
})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Reject uneven DB layouts when building ShardDatabases.

db_count is taken from the first shard only. A shorter later shard will panic in read_db() / snapshot_metadata(), while a longer one will have trailing databases silently ignored. Validate that every shard vector has the same length before caching db_count.

🛡️ Fail fast on inconsistent shard shapes
     pub fn new(shard_databases: Vec<Vec<Database>>) -> Arc<Self> {
         let num_shards = shard_databases.len();
         let db_count = shard_databases.first().map_or(0, |v| v.len());
+        assert!(
+            shard_databases.iter().all(|dbs| dbs.len() == db_count),
+            "all shards must expose the same database count"
+        );
         let shards = shard_databases
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn new(shard_databases: Vec<Vec<Database>>) -> Arc<Self> {
let num_shards = shard_databases.len();
let db_count = shard_databases.first().map_or(0, |v| v.len());
let shards = shard_databases
.into_iter()
.map(|dbs| dbs.into_iter().map(RwLock::new).collect())
.collect();
Arc::new(Self {
shards,
num_shards,
db_count,
})
pub fn new(shard_databases: Vec<Vec<Database>>) -> Arc<Self> {
let num_shards = shard_databases.len();
let db_count = shard_databases.first().map_or(0, |v| v.len());
assert!(
shard_databases.iter().all(|dbs| dbs.len() == db_count),
"all shards must expose the same database count"
);
let shards = shard_databases
.into_iter()
.map(|dbs| dbs.into_iter().map(RwLock::new).collect())
.collect();
Arc::new(Self {
shards,
num_shards,
db_count,
})
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/shared_databases.rs` around lines 20 - 31, The constructor
ShardDatabases::new currently takes db_count from the first shard and can hide
inconsistent shard lengths; update new to validate that every Vec<Database> in
shard_databases has the same length as the first (db_count) and reject/misuse
early (panic or return an error) with a clear message if any shard differs so
read_db() and snapshot_metadata() cannot panic or drop entries; locate the new
function and add a check that iterates shard_databases (or use all(|v| v.len()
== db_count)) before building shards and only proceed when all shard lengths
match, mentioning ShardDatabases::new, read_db(), and snapshot_metadata() in the
error message for clarity.

Comment on lines +4826 to +4836
let receivers: i64 = redis::cmd("PUBLISH")
.arg("test-channel")
.arg("hello-sharded")
.query_async(&mut pub_conn)
.await
.unwrap();
assert!(
receivers >= 1,
"PUBLISH should return at least 1 subscriber, got {}",
receivers
);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Assert exact PUBLISH counts in the one-subscriber cases.

>= 1 will still pass if the sharded bookkeeping double-counts a subscriber or leaves stale remote state behind, which is exactly what these tests should catch.

Also applies to: 4867-4873

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4826 - 4836, The test currently asserts
receivers >= 1 after calling redis::cmd("PUBLISH") into "test-channel" which
hides double-counting or stale sharded bookkeeping; change the assertion to
assert_eq!(receivers, 1, "PUBLISH should return exactly 1 subscriber, got {}",
receivers) for this publish call (the variable receivers from the
redis::cmd("PUBLISH").query_async(...) result) and make the same replacement for
the other analogous block around lines 4867-4873 so both one-subscriber cases
assert exactly 1.

Comment on lines +4995 to +5013
// Parse pairs: result is [BulkString("ch1"), Int(2), BulkString("ch2"), Int(1), BulkString("ch3"), Int(0)]
assert_eq!(
result.len(),
6,
"Expected 6 elements (3 channels * 2), got {:?}",
result
);
// ch1 should have 2 subscribers
if let redis::Value::Int(count) = &result[1] {
assert_eq!(*count, 2, "ch1 should have 2 subscribers");
}
// ch2 should have 1 subscriber
if let redis::Value::Int(count) = &result[3] {
assert_eq!(*count, 1, "ch2 should have 1 subscriber");
}
// ch3 should have 0 subscribers
if let redis::Value::Int(count) = &result[5] {
assert_eq!(*count, 0, "ch3 should have 0 subscribers");
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the NUMSUB assertions fail closed.

Those if let redis::Value::Int(...) checks become no-ops when the server returns the wrong shape or type, so a malformed reply can still pass. Match the pair layout explicitly and panic on anything unexpected.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4995 - 5013, The NUMSUB assertions
currently use lax `if let redis::Value::Int(...)` checks on `result[1]`,
`result[3]`, `result[5]` which silently do nothing if the reply shape or types
are wrong; change these to an explicit structural match of `result` (e.g., match
or assert_matches) to require the exact pair layout [BulkString(_), Int(a),
BulkString(_), Int(b), BulkString(_), Int(c)] and panic otherwise, then assert
`a == 2`, `b == 1`, `c == 0`; update the code that references `result` and
`redis::Value::Int` so a malformed reply fails the test instead of being
ignored.

…haring infrastructure

- Create Vec<Arc<RwLock<PubSubRegistry>>> in main.rs, one per shard
- Pass all_pubsub_registries through event_loop -> conn_accept -> handlers
- Replace pubsub_rc (Rc<RefCell<>>) with pubsub_arc (Arc<RwLock<>>)
- Update all .borrow()/.borrow_mut() to .read().unwrap()/.write().unwrap()
- Both tokio and monoio handlers updated
…ed-read

- PUBSUB CHANNELS/NUMSUB/NUMPAT now read all registries via Arc<RwLock<>>
- Zero SPSC dispatch for introspection — eliminates N-1 oneshot round-trips
- Remove PubSubIntrospect variant from ShardMessage
- Remove PubSubQuery and PubSubQueryResult enums from dispatch.rs
- Remove PubSubIntrospect handler arm from spsc_handler.rs
- Remove unused PubSubQuery/PubSubQueryResult imports from handler_monoio.rs
- Update integration tests with all_pubsub_registries parameter
- Add PubSubPublishBatch to ShardMessage with Vec<(Bytes,Bytes)> pairs, shared ResponseSlot, and per-pair AtomicI64 counts
- Add handler arm in spsc_handler that iterates pairs, publishes each, stores per-pair counts, and accumulates batch total into slot
…C subscription propagation

- Replace Rc<RefCell<RemoteSubscriberMap>> with Arc<RwLock<RemoteSubscriberMap>> shared across all connections
- Add all_remote_sub_maps Vec passed through main.rs -> event_loop -> conn_accept -> handlers
- Replace all PubSubSubscribe/PubSubUnsubscribe SPSC propagation with direct writes to all_remote_sub_maps
- Update PUBLISH read path from .borrow() to .read().unwrap()
- Mirror all changes in handler_monoio.rs
- Update integration tests with all_remote_sub_maps wiring
- Remove PubSubSubscribe and PubSubUnsubscribe variants from ShardMessage enum
- Remove corresponding handler arms from spsc_handler handle_shard_message_shared
- Remove remote_subscriber_map parameter from drain_spsc_shared and handle_shard_message_shared
- Update all call sites in event_loop.rs and unit tests in shard/mod.rs
- Zero-SPSC subscription metadata propagation complete
- Bounded HashMap with LRU-style eviction (max 16384 entries)
- register/lookup/remove API for client IP -> shard affinity
- 4 unit tests covering register, lookup, remove, overwrite
… handlers

- Listener uses affinity lookup for shard selection before round-robin fallback
- Register affinity on first SUBSCRIBE (subscription_count == 1)
- Remove affinity on disconnect when subscriber had active subscriptions
- Both tokio and monoio handlers updated
- Integration tests updated with AffinityTracker plumbing
…eFuture

- Add AtomicWaker field to PubSubResponseSlot for proper async notification
- Wake connection handler from add() when last shard responds (remaining hits 0)
- Add poll_ready() method with race-safe waker registration
- Create PubSubResponseFuture implementing std::future::Future
- Add unit tests for single-shard, multi-shard, and already-ready cases
…dlers

- Replace while !slot.is_ready() { yield_now().await } with PubSubResponseFuture
- Replace while !slot.is_ready() { sleep(1us).await } in monoio handler
- Both tokio and monoio handlers now properly await waker notification
- Eliminates CPU waste from spin-polling in cross-shard PUBLISH
1. snapshot_metadata TOCTOU: single-pass read_db per DB index to
   extract both segment_count and base_timestamp under the same
   RwLock guard, preventing inconsistency between passes.

2. coordinator error propagation: coordinate_multi_del_or_exists now
   propagates Frame::Error and channel-closed errors instead of
   silently returning 0 for failed shards.

3. handler_sharded remote_groups selected_db: capture selected_db
   per-command when queueing to remote_groups, use the captured
   value when constructing PipelineBatchSlotted (prevents stale
   db_index after mid-pipeline SELECT).

4. channel.rs Future clone race: OneshotReceiver.rx is now
   Option<flume::Receiver<T>>, taken (not cloned) on first poll.
   try_recv returns Disconnected after rx is taken by Future.
   Split Send-only methods into separate impl block.

5. cargo fmt: fix formatting in handler_monoio.rs, main.rs,
   response_slot.rs, event_loop.rs, coordinator.rs.

Findings verified NOT applicable (skipped):
- per_shard_accept: variable does not exist in codebase
- conn_accept.rs std::sync::RwLock: matches types from main.rs
- listener.rs SO_REUSEPORT: no per_shard_accept to gate
- conn_accept.rs unsafe SAFETY: no libc::dup in conn_accept.rs
- bench-scaling.sh: file does not exist
- blocking.rs multi-shard: by-design, keys must be on same shard
- handler_monoio.rs db_count: constant value, capture-once is fine
- conn_accept.rs Lua expect/unwrap: pre-existing pattern
…ove foundation

- Add missing ShardDatabases import and parameter in event_loop.rs
- Add missing pub/sub params (remote_sub_map, all_pubsub_registries,
  all_remote_sub_maps, affinity_tracker) to spawn_migrated_* calls
- Rename pub/sub AffinityTracker param to pubsub_affinity to avoid
  collision with FD migration AffinityTracker
- Fix bare return statements in subscriber mode to return (HandlerResult, Option<S>)
- Add per_shard_accept parameter to run_sharded calls in main and tests
- Update test helpers to use SharedDatabases pattern (create on main thread)
- Add is_dispatch_read_supported guard to cross-shard read fast path
@TinDang97 TinDang97 force-pushed the feat/pubsub-sharded branch from b3a88e0 to 05a2600 Compare March 29, 2026 15:11
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/shard/conn_accept.rs (2)

599-604: ⚠️ Potential issue | 🔴 Critical

Mirror the new pub/sub plumbing in the migrated monoio path.

This function was only partially updated: it clones pubsub_arc, but it still doesn't accept or forward remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps, and affinity_tracker. The handle_connection_sharded_monoio call below is therefore stuck on the old shape and currently fails with E0061.

Patch sketch
     cached_clock: &CachedClock,
+    remote_subscriber_map: &Arc<RwLock<RemoteSubscriberMap>>,
+    all_pubsub_registries: &[Arc<RwLock<PubSubRegistry>>],
+    all_remote_sub_maps: &[Arc<RwLock<RemoteSubscriberMap>>],
+    affinity_tracker: &Arc<RwLock<AffinityTracker>>,
     shard_id: usize,
+            let rsm = remote_subscriber_map.clone();
+            let all_regs = all_pubsub_registries.to_vec();
+            let all_rsm = all_remote_sub_maps.to_vec();
+            let aff = affinity_tracker.clone();
-                    clk,
-                    false, // can_migrate: already-migrated connections skip re-migration sampling
+                    clk,
+                    rsm,
+                    all_regs,
+                    all_rsm,
+                    aff,
+                    false, // can_migrate: already-migrated connections skip re-migration sampling

As per coding guidelines "All runtime-specific code must compile under both runtime-tokio and runtime-monoio; verify with cargo check --no-default-features --features runtime-tokio,jemalloc."

Also applies to: 643-643, 673-703

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/conn_accept.rs` around lines 599 - 604, The migrated monoio
entrypoint spawn_migrated_monoio_connection was only partially updated: it still
only clones pubsub_arc and does not accept or forward the new pub/sub plumbing
parameters, causing the downstream handle_connection_sharded_monoio call
signature mismatch (E0061). Update spawn_migrated_monoio_connection signature to
accept remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps, and
affinity_tracker (use the same types/Arc/Rc usage as other monoio/tokio paths),
clone/borrow them consistently alongside pubsub_arc, and pass them through into
the call to handle_connection_sharded_monoio so the call site and function
signature match the new shape. Ensure the new parameters are threaded through
any helper closures/spawn points in this function (and mirror the same changes
around the other affected ranges noted).

367-383: ⚠️ Potential issue | 🔴 Critical

Finish the monoio spawn signature update.

The body now clones affinity_tracker and all_remote_sub_maps, and handle_connection_sharded_monoio expects Arc<RwLock<RemoteSubscriberMap>>, but this function signature still exposes the old monoio shape. That's why this path currently fails with E0425 / E0308.

Patch sketch
-    remote_subscriber_map: &Rc<RefCell<RemoteSubscriberMap>>,
-    all_pubsub_registries: &[Arc<RwLock<PubSubRegistry>>],
+    remote_subscriber_map: &Arc<RwLock<RemoteSubscriberMap>>,
+    all_pubsub_registries: &[Arc<RwLock<PubSubRegistry>>],
+    all_remote_sub_maps: &[Arc<RwLock<RemoteSubscriberMap>>],
+    affinity_tracker: &Arc<RwLock<AffinityTracker>>,

As per coding guidelines "All runtime-specific code must compile under both runtime-tokio and runtime-monoio; verify with cargo check --no-default-features --features runtime-tokio,jemalloc."

Also applies to: 393-424

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/conn_accept.rs` around lines 367 - 383, Update the monoio spawn
function signature to match what handle_connection_sharded_monoio expects:
replace any parameters typed as &Rc<RefCell<RemoteSubscriberMap>> (and
slices/collections of Rc<RefCell<...>> named like all_remote_sub_maps) with
&Arc<RwLock<RemoteSubscriberMap>> (and &[Arc<RwLock<RemoteSubscriberMap>>] for
collections), and adjust affinity_tracker parameter to be clonable (Arc) so the
body can clone it; then update the other duplicate signature block (the one
referenced around lines 393-424) the same way so both tokio and monoio variants
compile consistently with handle_connection_sharded_monoio and the clones
performed in the function body.
src/server/conn/handler_sharded.rs (1)

81-102: ⚠️ Potential issue | 🟠 Major

Use parking_lot::RwLock and replace .unwrap() with non-panicking accessors for pub/sub state.

The new pub/sub shared-state parameters thread std::sync::RwLock instead of parking_lot::RwLock, and the subscriber mode immediately adds 30+ .read().unwrap() / .write().unwrap() calls on these locks in library code. This violates the mandatory guidelines: "Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks" and "No unwrap() or expect() in library code outside tests". Replace all occurrences with parking_lot::RwLock and use .read() / .write() directly without .unwrap().

Affected lines: 353, 362, 367, 373, 391, 400, 405, 411, 421, 423, 432, 442, 446, 456, 458, 467, 477, 481, 504, 505, 734, 800, 1063, 1072, 1074, 1136, 1146, 1148, 1154, 1160, 1198, 1208, 1223, 1310, 1595.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 81 - 102, The struct fields
using std::sync::RwLock (e.g., pubsub_registry, repl_state, cluster_state,
acl_table, runtime_config, remote_subscriber_map, all_pubsub_registries,
all_remote_sub_maps, pubsub_affinity) must be changed to parking_lot::RwLock and
every call site that does .read().unwrap() / .write().unwrap() (notably in the
subscriber mode and the listed affected lines) should be updated to use
parking_lot’s non-panicking accessors .read() and .write() directly; scan for
occurrences of .read().unwrap()/.write().unwrap() and replace them with
.read()/.write(), and change the use/imports to bring parking_lot::RwLock into
scope so no unwraps are necessary.
♻️ Duplicate comments (9)
tests/integration.rs (2)

4825-4835: ⚠️ Potential issue | 🟡 Minor

Assert the one-subscriber PUBLISH cases exactly.

>= 1 will still pass if the sharded bookkeeping double-counts a subscriber or leaves stale remote state behind. These cases should fail unless the reply is exactly 1.

Also applies to: 4866-4872

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4825 - 4835, The test currently allows
PUBLISH to return receivers >= 1 which masks double-counting or stale state;
change the assertion to require the reply equals exactly 1 by replacing the
assert that checks receivers >= 1 with one that checks receivers == 1 (and
update the failure message accordingly), locating the call that runs
redis::cmd("PUBLISH").query_async(&mut pub_conn) that assigns to the receivers
variable and make the same exact-equality change for the duplicate case later in
the file where receivers is asserted after another PUBLISH.

4994-5012: ⚠️ Potential issue | 🟡 Minor

Make the NUMSUB assertion fail closed.

Those if let redis::Value::Int(...) checks become no-ops when the server returns the wrong shape or type, so a malformed reply can still pass. Match the full [channel, count, ...] structure and panic on anything unexpected.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4994 - 5012, The current `if let
redis::Value::Int(...)` checks in the NUMSUB assertion are no-ops for wrong
shapes/types; change them to exhaustively match the expected reply structure so
the test fails on any malformed response. Specifically, match
`result.as_slice()` (or pattern-match the `result` vector) against the full
pattern like `[redis::Value::BulkString(ch1), redis::Value::Int(c1),
redis::Value::BulkString(ch2), redis::Value::Int(c2),
redis::Value::BulkString(ch3), redis::Value::Int(c3)]` and then assert `c1==2,
c2==1, c3==0` (and optionally validate the channel bytes/strings), and panic (or
use `assert!`/`panic!`) in the `_` arm for any other pattern; update the code
around the `result` variable in the test to use this match instead of `if let`.
src/server/listener.rs (1)

255-255: ⚠️ Potential issue | 🟠 Major

Use parking_lot::RwLock here and drop the poison-driven unwrap().

Line 354 and the monoio equivalents call .read().unwrap() on a std::sync::RwLock in the accept path. If any affinity writer panics, the lock becomes poisoned and every later accept can panic here too. Switching this tracker to parking_lot::RwLock removes the poisoning path and the unwrap().

Minimal fix
-use parking_lot::Mutex;
-use std::sync::{Arc, RwLock};
+use parking_lot::{Mutex, RwLock};
+use std::sync::Arc;
...
-                            if let Some(preferred) = affinity_tracker.read().unwrap().lookup(&peer_ip) {
+                            if let Some(preferred) = affinity_tracker.read().lookup(&peer_ip) {
...
-                                if let Some(preferred) = affinity_tracker.read().unwrap().lookup(&peer_ip) {
+                                if let Some(preferred) = affinity_tracker.read().lookup(&peer_ip) {
...
-                                if let Some(preferred) = affinity_tracker.read().unwrap().lookup(&peer_ip) {
+                                if let Some(preferred) = affinity_tracker.read().lookup(&peer_ip) {

As per coding guidelines, Use parking_lot::RwLock / parking_lot::Mutex instead of std::sync locks and No unwrap() or expect() in library code outside tests; use pattern matching or if let.

Also applies to: 351-367, 406-406, 464-478, 548-562

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/listener.rs` at line 255, The struct field affinity_tracker
currently uses std::sync::RwLock which leads to poison panics when callers use
.read().unwrap(); replace the type with
parking_lot::RwLock<crate::shard::affinity::AffinityTracker> (import
parking_lot::RwLock) and update all call sites (e.g., the accept path that calls
.read().unwrap(), and the other locations around the accept logic) to use the
parking_lot .read() (no unwrap) or .read() handle directly; remove any
unwrap()/expect() around RwLock accesses and use the safe handle returned or
pattern-match if necessary so lock poisoning cannot cause panics.
src/shard/affinity.rs (1)

18-25: ⚠️ Potential issue | 🟠 Major

Track affinity per connection instead of overwriting the whole IP entry.

Line 40 replaces the entire IpAddr record, and Line 52 removes it wholesale. Two concurrent pub/sub connections from the same host/NAT will overwrite each other, and a disconnect from either one clears affinity for the other. That makes reconnect routing flap for the exact multi-connection clients this feature is trying to help.

Also applies to: 38-52

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/affinity.rs` around lines 18 - 25, The current AffinityTracker
stores a single (shard_id, last_activity) per IpAddr in entries (HashMap<IpAddr,
(usize, u64)>) and the code paths that "replace" or "remove" the IpAddr
overwrite or clear all connections for that host; change entries to track
per-connection affinity (e.g. HashMap<IpAddr, HashMap<ConnId, (usize, u64)>> or
HashMap<IpAddr, Vec<(ConnId, usize, u64)>>), update the methods that add/update
affinity to insert/update only the specific connection ID instead of replacing
the whole IpAddr entry, and update the disconnect/remove logic to remove only
that connection's record (and only remove the IpAddr key when its per-connection
map becomes empty); also adjust eviction/LRU logic (counter, max_entries) to
operate over individual connection entries rather than whole-IP entries and
update any function signatures (e.g., track_connection, remove_connection) to
accept a connection identifier.
src/server/conn/handler_sharded.rs (5)

439-449: ⚠️ Potential issue | 🔴 Critical

Only decrement the subscription count when an unsubscribe actually removed something.

Both branches ignore the result of unsubscribe / punsubscribe, then unconditionally decrement subscription_count and propagate a remote removal. UNSUBSCRIBE missing can still drive the connection out of subscriber mode while live subscriptions remain.

Also applies to: 475-485

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 439 - 449, The unsubscribe
branch currently ignores the return value of
pubsub_registry.write().unwrap().unsubscribe (and similarly for punsubscribe)
and always decrements subscription_count and removes entries from
all_remote_sub_maps; change it to check the unsubscribe/punsubscribe result from
the registry call (the value indicating whether a subscription was actually
removed) and only decrement subscription_count, call
all_remote_sub_maps[target].write().unwrap().remove(...), and serialize an
unsubscribe_response when the registry reported a successful removal; apply the
same conditional logic to the punsubscribe handling so only actual removals
affect subscription_count and remote maps (refer to extract_bytes,
pubsub_registry.write().unwrap().unsubscribe / punsubscribe, subscription_count,
all_remote_sub_maps.remove, and unsubscribe_response).

580-583: ⚠️ Potential issue | 🔴 Critical

Keep remote pipeline batches partitioned by (target, db_index).

selected_db is captured per entry, but remote_groups is still keyed only by shard and phase 2 still collapses each batch onto entries.first()'s DB. A pipeline that changes DB mid-batch can execute later remote commands against the wrong database.

Also applies to: 1416-1430

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 580 - 583, remote pipeline
batching is keyed only by target shard causing DB selection to be lost when
entries in a batch have different selected_db; change the batching keys to
(target_shard, selected_db) so batches are partitioned by both target and DB.
Update remote_groups and publish_batches to use a composite key (usize, usize)
and include selected_db in the tuple payloads (and stop relying on
entries.first() to infer DB); in the phase-2 dispatch logic (the code that
builds remote commands from each batch) use the batch key's selected_db (or
per-entry selected_db if building per-entry commands) when creating remote
commands so remote requests run against the correct database; apply the same
change at the other occurrence around lines 1416-1430 where the same grouping
and dispatch happen (adjust all uses of remote_groups, publish_batches, and any
code that uses entries.first() for DB).

503-509: ⚠️ Potential issue | 🟠 Major

RESET and the new early returns still bypass shared pub/sub cleanup.

RESET throws away the channel/pattern lists from unsubscribe_all / punsubscribe_all, and the new return paths in the subscribe entry flow still skip the cleanup block at the bottom. That leaves all_remote_sub_maps and pubsub_affinity stale after reset or partial subscribe failure.

Also applies to: 524-524, 1121-1169, 1556-1597

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 503 - 509, The RESET branch
and early-return paths skip the shared pub/sub cleanup (leaving
all_remote_sub_maps and pubsub_affinity stale); change the flow so
unsubscribe_all and punsubscribe_all return the removed channel/pattern lists
(or expose a helper) and invoke a single cleanup helper that updates
all_remote_sub_maps and pubsub_affinity using those lists and subscriber_id
before any return/early-exit; specifically adjust the RESET handling (where
pubsub_registry.write().unwrap().unsubscribe_all(subscriber_id) and
punsubscribe_all are called), and every early-return path in the subscribe entry
flow to call this new cleanup function (or inline the same logic) so
subscriber_id, subscription_count, all_remote_sub_maps and pubsub_affinity are
consistently cleaned up.

1462-1483: ⚠️ Potential issue | 🔴 Critical

Don't drop PubSubPublishBatch when the shard queue is full.

On try_push failure this path marks the slot done and discards the batch. That drops the message for remote subscribers and undercounts the PUBLISH reply; it needs the same yield-and-retry loop used by the other shard dispatch paths.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 1462 - 1483, The current
dispatch path drops PubSubPublishBatch on producers[idx].try_push failure;
instead implement the same yield-and-retry loop used by other shard dispatch
code: create the ShardMessage::PubSubPublishBatch (as you already do), then loop
attempting producers[idx].try_push(batch_msg.clone() or re-create the message as
needed) and if it fails call tokio::task::yield_now().await (or the same backoff
used elsewhere) until try_push succeeds, only then call
spsc_notifiers[target].notify_one(); remove the slot.add(0) on failure and keep
pushing the (slot, counts, resp_indices) into batch_slots after a successful
enqueue. Reference symbols: publish_batches, producers (from
dispatch_tx.borrow_mut()), ChannelMesh::target_index,
ShardMessage::PubSubPublishBatch, slot, counts, spsc_notifiers, try_push,
batch_slots.

521-528: ⚠️ Potential issue | 🔴 Critical

Mode switches still lose or stall buffered commands.

Breaking out of the normal-mode batch on SUBSCRIBE drops later frames already parsed into batch, and leaving subscriber mode at subscription_count == 0 leaves any unread tail in read_buf dormant until another socket read arrives. Pipelines like PING, SUBSCRIBE foo, PING or UNSUBSCRIBE foo, PING still lose/stall the tail command.

Also applies to: 1101-1173

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 521 - 528, The code in
handler_sharded.rs currently drops or stalls already-parsed frames when
switching into/out of subscriber mode: when encountering SUBSCRIBE/UNSUBSCRIBE
the logic breaks out (or continues) and discards the remainder of batch and any
unread bytes in read_buf; fix by preserving and continuing to process any
already-parsed frames and any unread tail instead of discarding them—i.e., when
you set sub_break or check subscription_count use a loop/control-flow that
processes the rest of batch (do not break out of the batch loop or do a bare
continue that leaves read_buf dormant), or explicitly push remaining parsed
frames back onto the input queue before switching modes; adjust the block that
inspects subscription_count, the handling around sub_break, and the code that
manipulates batch/read_buf so parsed frames and unread tails are always handled
(references: variables and symbols subscription_count, sub_break, batch,
read_buf in handler_sharded.rs).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/server/conn/handler_sharded.rs`:
- Around line 489-496: The PING branch currently always writes
Frame::Array(["pong",""]) and ignores arguments; change it to honor the optional
payload by reusing the PING argument handling: read/inspect the parsed arguments
(e.g. the same cmd_args used for normal PING handling or parse the command into
args), then if args.len() == 0 respond ["pong",""], if args.len() == 1 respond
["pong", args[0]] (echo the single payload), and if args.len() > 1 return a
WrongArity error response; update the logic in the
cmd.eq_ignore_ascii_case(b"PING") branch (and use write_buf /
crate::protocol::serialize as before) so it validates arity and echoes the
single argument instead of always sending an empty string.

In `@tests/integration.rs`:
- Around line 2439-2441: The test harness currently injects a real affinity
tracker (moon::shard::affinity::AffinityTracker) into start_sharded_server(),
causing all Phase 56 clients from 127.0.0.1 to be co-located on one shard and
hiding cross-shard pub/sub behavior; fix by either replacing the real
AffinityTracker with a no-op/disabled affinity implementation when wiring
affinity_tracker into start_sharded_server() for these tests, or by changing the
test to establish all pub/sub client connections before any client issues the
first SUBSCRIBE so affinity cannot bias routing; update the setup at the points
creating affinity_tracker (AffinityTracker::new usage) and where
start_sharded_server() is called to use the chosen approach.

---

Outside diff comments:
In `@src/server/conn/handler_sharded.rs`:
- Around line 81-102: The struct fields using std::sync::RwLock (e.g.,
pubsub_registry, repl_state, cluster_state, acl_table, runtime_config,
remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps,
pubsub_affinity) must be changed to parking_lot::RwLock and every call site that
does .read().unwrap() / .write().unwrap() (notably in the subscriber mode and
the listed affected lines) should be updated to use parking_lot’s non-panicking
accessors .read() and .write() directly; scan for occurrences of
.read().unwrap()/.write().unwrap() and replace them with .read()/.write(), and
change the use/imports to bring parking_lot::RwLock into scope so no unwraps are
necessary.

In `@src/shard/conn_accept.rs`:
- Around line 599-604: The migrated monoio entrypoint
spawn_migrated_monoio_connection was only partially updated: it still only
clones pubsub_arc and does not accept or forward the new pub/sub plumbing
parameters, causing the downstream handle_connection_sharded_monoio call
signature mismatch (E0061). Update spawn_migrated_monoio_connection signature to
accept remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps, and
affinity_tracker (use the same types/Arc/Rc usage as other monoio/tokio paths),
clone/borrow them consistently alongside pubsub_arc, and pass them through into
the call to handle_connection_sharded_monoio so the call site and function
signature match the new shape. Ensure the new parameters are threaded through
any helper closures/spawn points in this function (and mirror the same changes
around the other affected ranges noted).
- Around line 367-383: Update the monoio spawn function signature to match what
handle_connection_sharded_monoio expects: replace any parameters typed as
&Rc<RefCell<RemoteSubscriberMap>> (and slices/collections of Rc<RefCell<...>>
named like all_remote_sub_maps) with &Arc<RwLock<RemoteSubscriberMap>> (and
&[Arc<RwLock<RemoteSubscriberMap>>] for collections), and adjust
affinity_tracker parameter to be clonable (Arc) so the body can clone it; then
update the other duplicate signature block (the one referenced around lines
393-424) the same way so both tokio and monoio variants compile consistently
with handle_connection_sharded_monoio and the clones performed in the function
body.

---

Duplicate comments:
In `@src/server/conn/handler_sharded.rs`:
- Around line 439-449: The unsubscribe branch currently ignores the return value
of pubsub_registry.write().unwrap().unsubscribe (and similarly for punsubscribe)
and always decrements subscription_count and removes entries from
all_remote_sub_maps; change it to check the unsubscribe/punsubscribe result from
the registry call (the value indicating whether a subscription was actually
removed) and only decrement subscription_count, call
all_remote_sub_maps[target].write().unwrap().remove(...), and serialize an
unsubscribe_response when the registry reported a successful removal; apply the
same conditional logic to the punsubscribe handling so only actual removals
affect subscription_count and remote maps (refer to extract_bytes,
pubsub_registry.write().unwrap().unsubscribe / punsubscribe, subscription_count,
all_remote_sub_maps.remove, and unsubscribe_response).
- Around line 580-583: remote pipeline batching is keyed only by target shard
causing DB selection to be lost when entries in a batch have different
selected_db; change the batching keys to (target_shard, selected_db) so batches
are partitioned by both target and DB. Update remote_groups and publish_batches
to use a composite key (usize, usize) and include selected_db in the tuple
payloads (and stop relying on entries.first() to infer DB); in the phase-2
dispatch logic (the code that builds remote commands from each batch) use the
batch key's selected_db (or per-entry selected_db if building per-entry
commands) when creating remote commands so remote requests run against the
correct database; apply the same change at the other occurrence around lines
1416-1430 where the same grouping and dispatch happen (adjust all uses of
remote_groups, publish_batches, and any code that uses entries.first() for DB).
- Around line 503-509: The RESET branch and early-return paths skip the shared
pub/sub cleanup (leaving all_remote_sub_maps and pubsub_affinity stale); change
the flow so unsubscribe_all and punsubscribe_all return the removed
channel/pattern lists (or expose a helper) and invoke a single cleanup helper
that updates all_remote_sub_maps and pubsub_affinity using those lists and
subscriber_id before any return/early-exit; specifically adjust the RESET
handling (where pubsub_registry.write().unwrap().unsubscribe_all(subscriber_id)
and punsubscribe_all are called), and every early-return path in the subscribe
entry flow to call this new cleanup function (or inline the same logic) so
subscriber_id, subscription_count, all_remote_sub_maps and pubsub_affinity are
consistently cleaned up.
- Around line 1462-1483: The current dispatch path drops PubSubPublishBatch on
producers[idx].try_push failure; instead implement the same yield-and-retry loop
used by other shard dispatch code: create the ShardMessage::PubSubPublishBatch
(as you already do), then loop attempting
producers[idx].try_push(batch_msg.clone() or re-create the message as needed)
and if it fails call tokio::task::yield_now().await (or the same backoff used
elsewhere) until try_push succeeds, only then call
spsc_notifiers[target].notify_one(); remove the slot.add(0) on failure and keep
pushing the (slot, counts, resp_indices) into batch_slots after a successful
enqueue. Reference symbols: publish_batches, producers (from
dispatch_tx.borrow_mut()), ChannelMesh::target_index,
ShardMessage::PubSubPublishBatch, slot, counts, spsc_notifiers, try_push,
batch_slots.
- Around line 521-528: The code in handler_sharded.rs currently drops or stalls
already-parsed frames when switching into/out of subscriber mode: when
encountering SUBSCRIBE/UNSUBSCRIBE the logic breaks out (or continues) and
discards the remainder of batch and any unread bytes in read_buf; fix by
preserving and continuing to process any already-parsed frames and any unread
tail instead of discarding them—i.e., when you set sub_break or check
subscription_count use a loop/control-flow that processes the rest of batch (do
not break out of the batch loop or do a bare continue that leaves read_buf
dormant), or explicitly push remaining parsed frames back onto the input queue
before switching modes; adjust the block that inspects subscription_count, the
handling around sub_break, and the code that manipulates batch/read_buf so
parsed frames and unread tails are always handled (references: variables and
symbols subscription_count, sub_break, batch, read_buf in handler_sharded.rs).

In `@src/server/listener.rs`:
- Line 255: The struct field affinity_tracker currently uses std::sync::RwLock
which leads to poison panics when callers use .read().unwrap(); replace the type
with parking_lot::RwLock<crate::shard::affinity::AffinityTracker> (import
parking_lot::RwLock) and update all call sites (e.g., the accept path that calls
.read().unwrap(), and the other locations around the accept logic) to use the
parking_lot .read() (no unwrap) or .read() handle directly; remove any
unwrap()/expect() around RwLock accesses and use the safe handle returned or
pattern-match if necessary so lock poisoning cannot cause panics.

In `@src/shard/affinity.rs`:
- Around line 18-25: The current AffinityTracker stores a single (shard_id,
last_activity) per IpAddr in entries (HashMap<IpAddr, (usize, u64)>) and the
code paths that "replace" or "remove" the IpAddr overwrite or clear all
connections for that host; change entries to track per-connection affinity (e.g.
HashMap<IpAddr, HashMap<ConnId, (usize, u64)>> or HashMap<IpAddr, Vec<(ConnId,
usize, u64)>>), update the methods that add/update affinity to insert/update
only the specific connection ID instead of replacing the whole IpAddr entry, and
update the disconnect/remove logic to remove only that connection's record (and
only remove the IpAddr key when its per-connection map becomes empty); also
adjust eviction/LRU logic (counter, max_entries) to operate over individual
connection entries rather than whole-IP entries and update any function
signatures (e.g., track_connection, remove_connection) to accept a connection
identifier.

In `@tests/integration.rs`:
- Around line 4825-4835: The test currently allows PUBLISH to return receivers
>= 1 which masks double-counting or stale state; change the assertion to require
the reply equals exactly 1 by replacing the assert that checks receivers >= 1
with one that checks receivers == 1 (and update the failure message
accordingly), locating the call that runs redis::cmd("PUBLISH").query_async(&mut
pub_conn) that assigns to the receivers variable and make the same
exact-equality change for the duplicate case later in the file where receivers
is asserted after another PUBLISH.
- Around line 4994-5012: The current `if let redis::Value::Int(...)` checks in
the NUMSUB assertion are no-ops for wrong shapes/types; change them to
exhaustively match the expected reply structure so the test fails on any
malformed response. Specifically, match `result.as_slice()` (or pattern-match
the `result` vector) against the full pattern like
`[redis::Value::BulkString(ch1), redis::Value::Int(c1),
redis::Value::BulkString(ch2), redis::Value::Int(c2),
redis::Value::BulkString(ch3), redis::Value::Int(c3)]` and then assert `c1==2,
c2==1, c3==0` (and optionally validate the channel bytes/strings), and panic (or
use `assert!`/`panic!`) in the `_` arm for any other pattern; update the code
around the `result` variable in the test to use this match instead of `if let`.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ff849aa0-25b2-424a-b6f0-3e3e5e84e4ba

📥 Commits

Reviewing files that changed from the base of the PR and between b3a88e0 and 05a2600.

📒 Files selected for processing (14)
  • src/main.rs
  • src/pubsub/mod.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_sharded.rs
  • src/server/listener.rs
  • src/shard/affinity.rs
  • src/shard/conn_accept.rs
  • src/shard/coordinator.rs
  • src/shard/dispatch.rs
  • src/shard/event_loop.rs
  • src/shard/mod.rs
  • src/shard/remote_subscriber_map.rs
  • src/shard/spsc_handler.rs
  • tests/integration.rs
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/pubsub/mod.rs
  • src/shard/coordinator.rs
  • src/shard/remote_subscriber_map.rs
  • src/shard/event_loop.rs
  • src/server/conn/handler_monoio.rs

Comment on lines +489 to +496
} else if cmd.eq_ignore_ascii_case(b"PING") {
write_buf.clear();
let resp = Frame::Array(crate::framevec![
Frame::BulkString(Bytes::from_static(b"pong")),
Frame::BulkString(Bytes::from_static(b"")),
]);
crate::protocol::serialize(&resp, &mut write_buf);
if stream.write_all(&write_buf).await.is_err() { sub_break = true; break; }
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Honor the optional payload for subscriber-mode PING.

This branch always returns ["pong", ""] and never validates arity, so PING hello loses its payload and PING a b incorrectly succeeds. Reuse the normal PING parsing here or echo cmd_args[0] when exactly one argument is present.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 489 - 496, The PING branch
currently always writes Frame::Array(["pong",""]) and ignores arguments; change
it to honor the optional payload by reusing the PING argument handling:
read/inspect the parsed arguments (e.g. the same cmd_args used for normal PING
handling or parse the command into args), then if args.len() == 0 respond
["pong",""], if args.len() == 1 respond ["pong", args[0]] (echo the single
payload), and if args.len() > 1 return a WrongArity error response; update the
logic in the cmd.eq_ignore_ascii_case(b"PING") branch (and use write_buf /
crate::protocol::serialize as before) so it validates arity and echoes the
single argument instead of always sending an empty string.

Comment on lines +2439 to +2441
let affinity_tracker = std::sync::Arc::new(std::sync::RwLock::new(
moon::shard::affinity::AffinityTracker::new(),
));
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

This harness now co-locates the pub/sub tests on one shard.

Line 2439 wires the real affinity tracker into start_sharded_server(). Since every Phase 56 client connects from 127.0.0.1, the first SUBSCRIBE records loopback affinity and later pub/sub connections get routed back to that same shard. The new sharded pub/sub tests can therefore pass without ever exercising remote fanout or cross-shard aggregation. Either disable affinity in this test harness or establish all pub/sub connections before the first subscription.

Also applies to: 2464-2466, 2522-2524

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 2439 - 2441, The test harness currently
injects a real affinity tracker (moon::shard::affinity::AffinityTracker) into
start_sharded_server(), causing all Phase 56 clients from 127.0.0.1 to be
co-located on one shard and hiding cross-shard pub/sub behavior; fix by either
replacing the real AffinityTracker with a no-op/disabled affinity implementation
when wiring affinity_tracker into start_sharded_server() for these tests, or by
changing the test to establish all pub/sub client connections before any client
issues the first SUBSCRIBE so affinity cannot bias routing; update the setup at
the points creating affinity_tracker (AffinityTracker::new usage) and where
start_sharded_server() is called to use the chosen approach.

1. parking_lot::RwLock for pub/sub types — Replace std::sync::RwLock
   with parking_lot::RwLock for PubSubRegistry, RemoteSubscriberMap,
   and AffinityTracker. Remove all .unwrap() (parking_lot doesn't
   poison). Introduces StdRwLock alias in handlers for pre-existing
   std::sync::RwLock types (ACL, cluster, replication, runtime config).

2. Eliminate Arc<Vec<AtomicI64>> hot-path allocation — Move per-pair
   counts into PubSubResponseSlot::counts field. Add with_counts()
   constructor. PubSubPublishBatch no longer carries separate counts
   Arc. One fewer heap allocation per pipeline batch.

3. Extract subscription propagation helpers — New propagate_subscription()
   and unpropagate_subscription() in server/conn/util.rs. Replaces 12+
   inline for-loops across handler_sharded.rs and handler_monoio.rs.
   Cleaner lock scoping, DRY.

4. Add Relaxed ordering safety comments — Document why Relaxed ordering
   on total.load() in get()/poll_ready() is sound (Acquire on remaining
   provides happens-before with Release in add()).

5. AffinityTracker eviction — Accepted as-is (O(n log n) only on
   overflow past 16384 entries, which is rare).

6. Remove dead PubSubFanOut variant — Eliminated from ShardMessage
   and spsc_handler. Updated shard/mod.rs tests to use PubSubPublish.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

♻️ Duplicate comments (9)
tests/integration.rs (3)

4825-4835: ⚠️ Potential issue | 🟡 Minor

Assert the one-subscriber PUBLISH cases exactly.

>= 1 still passes if sharded bookkeeping double-counts or leaves stale remote state behind. These cases should fail closed with == 1.

Minimal fix
-    assert!(
-        receivers >= 1,
-        "PUBLISH should return at least 1 subscriber, got {}",
-        receivers
-    );
+    assert_eq!(
+        receivers, 1,
+        "PUBLISH should return exactly 1 subscriber, got {}",
+        receivers
+    );
-    assert!(receivers >= 1, "PUBLISH should return at least 1");
+    assert_eq!(receivers, 1, "PUBLISH should return exactly 1");

Also applies to: 4866-4872

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4825 - 4835, The test currently asserts
receivers >= 1 for the Redis PUBLISH result, which masks sharded double-counting
or stale state; change the assertion in the PUBLISH checks that use
redis::cmd("PUBLISH") (the variable receivers from the .query_async(&mut
pub_conn).await.unwrap()) to require equality (== 1) instead of >= 1, and update
the identical assertion at the other occurrence (the one around lines 4866-4872)
so both PUBLISH cases fail closed if more than one subscriber is reported.

2439-2441: ⚠️ Potential issue | 🟠 Major

This harness can still collapse the pub/sub cases onto one shard.

All of these new pub/sub tests connect from 127.0.0.1, and the listener now routes plain sockets through AffinityTracker::lookup(&peer_ip). Once the first subscription records loopback affinity, later pub/sub connections can all be sent back to the same shard, so the "sharded" cases may pass without exercising remote fanout or cross-shard aggregation. Disable affinity in this harness or establish all pub/sub connections before the first SUBSCRIBE/PSUBSCRIBE.

Also applies to: 2522-2524

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 2439 - 2441, The test harness creates an
AffinityTracker and therefore allows loopback affinity to collapse pub/sub onto
one shard; to fix, disable affinity for these pub/sub tests by not using or
passing the affinity tracker (remove or bypass the
std::sync::Arc::new(parking_lot::RwLock::new(moon::shard::affinity::AffinityTracker::new()))
used as affinity_tracker) or ensure all pub/sub connections are established
before issuing any SUBSCRIBE/PSUBSCRIBE so AffinityTracker::lookup(&peer_ip)
cannot bias shard selection; update the harness around the affinity_tracker
creation or the test connection setup so sharded cases exercise remote fanout
and cross-shard aggregation.

4994-5012: ⚠️ Potential issue | 🟡 Minor

Make the NUMSUB reply shape part of the test.

These if let redis::Value::Int(...) checks silently do nothing if the server returns the wrong layout or types, so a malformed reply can still pass. Match the full [channel, count, ...] structure and panic on anything unexpected.

Minimal fix
-    // ch1 should have 2 subscribers
-    if let redis::Value::Int(count) = &result[1] {
-        assert_eq!(*count, 2, "ch1 should have 2 subscribers");
-    }
-    // ch2 should have 1 subscriber
-    if let redis::Value::Int(count) = &result[3] {
-        assert_eq!(*count, 1, "ch2 should have 1 subscriber");
-    }
-    // ch3 should have 0 subscribers
-    if let redis::Value::Int(count) = &result[5] {
-        assert_eq!(*count, 0, "ch3 should have 0 subscribers");
-    }
+    match result.as_slice() {
+        [
+            redis::Value::BulkString(_),
+            redis::Value::Int(ch1),
+            redis::Value::BulkString(_),
+            redis::Value::Int(ch2),
+            redis::Value::BulkString(_),
+            redis::Value::Int(ch3),
+        ] => {
+            assert_eq!(*ch1, 2, "ch1 should have 2 subscribers");
+            assert_eq!(*ch2, 1, "ch2 should have 1 subscriber");
+            assert_eq!(*ch3, 0, "ch3 should have 0 subscribers");
+        }
+        other => panic!("unexpected PUBSUB NUMSUB reply: {:?}", other),
+    }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration.rs` around lines 4994 - 5012, The NUMSUB reply validation
is currently using silent if-let checks on result which allow malformed shapes
to pass; change the test to explicitly match the full reply shape (the sequence
[BulkString(channel1), Int(count1), BulkString(channel2), Int(count2), ...]) and
panic/assert on any unexpected variants or length — e.g., match
result.as_slice() against the exact pattern of redis::Value::BulkString and
redis::Value::Int for the three channels (referencing result,
redis::Value::BulkString and redis::Value::Int) and use assert_eq! on the
extracted counts so the test fails if the layout or types differ.
src/server/conn/handler_sharded.rs (4)

441-450: ⚠️ Potential issue | 🟠 Major

Unconditional subscription count decrement on explicit UNSUBSCRIBE.

The unsubscribe result is discarded, so subscription_count decrements even if the channel wasn't subscribed. This can drive the counter to zero while live subscriptions remain, causing premature exit from subscriber mode.

Gate the decrement on whether the registry actually removed something:

 for arg in cmd_args {
     if let Some(ch) = extract_bytes(arg) {
-        { pubsub_registry.write().unsubscribe(ch.as_ref(), subscriber_id); }
-        subscription_count = subscription_count.saturating_sub(1);
-        unpropagate_subscription(&all_remote_sub_maps, &ch, shard_id, num_shards, false);
+        let removed = { pubsub_registry.write().unsubscribe(ch.as_ref(), subscriber_id) };
+        if removed {
+            subscription_count = subscription_count.saturating_sub(1);
+            unpropagate_subscription(&all_remote_sub_maps, &ch, shard_id, num_shards, false);
+        }
         write_buf.clear();
         crate::protocol::serialize(&pubsub::unsubscribe_response(&ch, subscription_count), &mut write_buf);

The same issue exists for PUNSUBSCRIBE at lines 470-479.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 441 - 450, The code
unconditionally decrements subscription_count after calling
pubsub_registry.write().unsubscribe; change it to check the unsubscribe result
(e.g., the boolean/Option returned by unsubscribe) and only decrement
subscription_count, call unpropagate_subscription(&all_remote_sub_maps, &ch,
shard_id, num_shards, false), and send the unsubscribe_response when the
registry actually removed a subscription; apply the same guarded logic to the
PUNSUBSCRIBE handling (mirror the conditional decrement/unpropagate/send for
each channel) so subscription_count cannot go negative or reach zero
incorrectly.

495-501: ⚠️ Potential issue | 🟠 Major

RESET discards unsubscribe results, preventing remote propagation.

The unsubscribe_all and punsubscribe_all results are thrown away, so unpropagate_subscription is never called for the removed channels/patterns. This leaves stale entries in all_remote_sub_maps.

 } else if cmd.eq_ignore_ascii_case(b"RESET") {
-    { pubsub_registry.write().unsubscribe_all(subscriber_id); }
-    { pubsub_registry.write().punsubscribe_all(subscriber_id); }
+    let removed_ch = { pubsub_registry.write().unsubscribe_all(subscriber_id) };
+    let removed_pat = { pubsub_registry.write().punsubscribe_all(subscriber_id) };
+    for ch in removed_ch {
+        unpropagate_subscription(&all_remote_sub_maps, &ch, shard_id, num_shards, false);
+    }
+    for pat in removed_pat {
+        unpropagate_subscription(&all_remote_sub_maps, &pat, shard_id, num_shards, true);
+    }
     subscription_count = 0;
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 495 - 501, The RESET branch
currently drops the return values of
pubsub_registry.write().unsubscribe_all(subscriber_id) and
.punsubscribe_all(subscriber_id), so unpropagate_subscription is never invoked
and stale entries remain; change the code to store each call's result (e.g., let
removed = pubsub_registry.write().unsubscribe_all(subscriber_id); and let
premoved = pubsub_registry.write().punsubscribe_all(subscriber_id);), then
iterate over the returned channel list and pattern list and call
unpropagate_subscription for each removed channel/pattern (using subscriber_id
and the returned channel/pattern identifiers) before sending the RESET reply;
keep existing write_buf/stream handling and ensure you still reset
subscription_count and clear write_buf.

481-488: ⚠️ Potential issue | 🟡 Minor

Subscriber-mode PING ignores optional payload.

Per Redis protocol, PING [message] should echo the message back. This always returns ["pong", ""] regardless of arguments, and doesn't validate arity (PING a b should error).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 481 - 488, The PING branch
currently ignores arguments and always returns ["pong",""]; update the handler
(the cmd == b"PING" branch where write_buf, Frame::Array, Frame::BulkString and
crate::protocol::serialize are used) to validate arity (only 0 or 1 allowed) and
return a protocol error (Frame::Error with "ERR wrong number of arguments for
'ping' command) when more than one argument is supplied; when zero args keep the
second element as empty BulkString, and when one arg echo that argument back as
the second BulkString (use the actual arg bytes from the parsed command) before
serializing and writing via stream.write_all.

1462-1466: ⚠️ Potential issue | 🔴 Critical

PUBLISH batch silently drops messages on SPSC push failure.

When try_push fails, calling slot.add(0) marks completion without actually delivering the message. This causes message loss and undercounted PUBLISH replies.

Apply the same retry pattern used elsewhere in this file (e.g., line ~1421):

-                            if producers[idx].try_push(batch_msg).is_ok() {
-                                spsc_notifiers[target].notify_one();
-                            } else {
-                                slot.add(0); // push failed, mark as done
-                            }
+                            let mut pending = batch_msg;
+                            loop {
+                                match producers[idx].try_push(pending) {
+                                    Ok(()) => {
+                                        spsc_notifiers[target].notify_one();
+                                        break;
+                                    }
+                                    Err(msg) => {
+                                        pending = msg;
+                                        drop(producers);
+                                        tokio::task::yield_now().await;
+                                        producers = dispatch_tx.borrow_mut();
+                                    }
+                                }
+                            }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_sharded.rs` around lines 1462 - 1466, The current
PUBLISH path silently drops messages when producers[idx].try_push(batch_msg)
fails by immediately calling slot.add(0); instead, replace that else branch with
the same retry/backoff used elsewhere: loop attempting
producers[idx].try_push(batch_msg) until it succeeds, calling
spsc_notifiers[target].notify_one() and performing the same yield/backoff
between attempts (as used around the other try_push use site), and only call
slot.add(0) if you detect a terminal condition (e.g., producer closed) using the
same check used in the other pattern; reference the symbols
producers[idx].try_push, spsc_notifiers[target].notify_one, and slot.add to
locate and update the code.
src/server/conn/handler_monoio.rs (2)

377-386: ⚠️ Potential issue | 🟡 Minor

Subscriber-mode PING ignores optional payload (same as tokio handler).

This always returns ["pong", ""] regardless of arguments.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio.rs` around lines 377 - 386, The PING branch in
handler_monoio.rs always responds with ["pong",""] and ignores any optional
payload; change it to mirror the tokio handler by checking the incoming command
arguments and echoing the provided payload when present (otherwise send an empty
bulk string). Locate the PING match arm (where resp is constructed, resp_buf is
created, codec.encode_frame is called and stream.write_all is awaited) and
replace the fixed
Frame::Array([...Frame::BulkString(Bytes::from_static(b""))...]) with logic that
uses the parsed argument bytes (the command's payload variable/args) to build
the second Frame::BulkString so the response returns the supplied payload when
provided.

1640-1644: ⚠️ Potential issue | 🔴 Critical

PUBLISH batch silently drops messages on SPSC push failure (same as tokio handler).

When try_push fails, slot.add(0) marks completion without delivery. Implement retry loop as suggested for handler_sharded.rs.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/server/conn/handler_monoio.rs` around lines 1640 - 1644, Replace the
single failing push with a bounded retry loop that mirrors the logic used in
handler_sharded.rs: repeatedly attempt producers[idx].try_push(batch_msg)
(cloning or otherwise making batch_msg available for retries), break and call
spsc_notifiers[target].notify_one() on success, and only call slot.add(0) after
the retry limit is exhausted; ensure the loop uses the same backoff/yield
behavior as handler_sharded.rs to avoid busy-waiting and preserves the original
semantics of slot and notification on success.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/server/conn/handler_monoio.rs`:
- Around line 1138-1140: The compilation fails because Frame::Array expects a
FrameVec, but the code builds a Vec<Frame> (variable arr) and passes it
directly; change the call to construct the Frame::Array from arr.into() so the
Vec<Frame> is converted to a FrameVec (same fix where NUMSUB is built and at the
other occurrence around the block using Frame::BulkString →
responses.push(Frame::Array(...)) so replace those pushes to use .into() on the
Vec<Frame> before wrapping in Frame::Array).
- Around line 232-240: The code is shadowing the intended
Arc<RwLock<AffinityTracker>> by an Option<AffinityTracker> named
affinity_tracker; change usages to operate on the actual shared lock (the
parameter renamed to pubsub_affinity) instead of calling .write() on the Option.
Replace affinity_tracker.write().register(...) with
pubsub_affinity.write().register(...) (or unwrap/handle the Option around the
Arc if the param is optional), and apply the same rename/usage fix for the other
occurrences referenced (the blocks that call .write() and .register() at the
other locations). Ensure you reference the existing symbols
pubsub_registry.write().subscribe, propagate_subscription, peer_addr, and
register so you update the correct call sites without introducing new shadowing.
- Around line 103-110: The constructor/handler parameter named affinity_tracker
(type Arc<RwLock<AffinityTracker>>) is being shadowed by a local
Option<AffinityTracker>; rename the parameter to pubsub_affinity to match
handler_sharded.rs and avoid shadowing, update the parameter name in the
function/struct signature (affinity_tracker -> pubsub_affinity), and replace all
uses that expect the Arc<RwLock<_>> (e.g., calls like affinity_tracker.write())
with pubsub_affinity.write(); keep the local Option<AffinityTracker> name for
migration tracking unchanged so the .write() calls resolve to the Arc<RwLock>
again.
- Around line 1839-1853: The disconnect cleanup currently calls
affinity_tracker.write().remove(&addr.ip()) but the shared affinity lock
parameter was renamed to pubsub_affinity; replace the call to use
pubsub_affinity (the Arc<parking_lot::RwLock<AffinityTracker>>) instead of the
local Option, i.e. call pubsub_affinity.write().remove(&addr.ip()) where the
code currently references affinity_tracker.write(), ensuring you operate on the
shared RwLock for AffinityTracker in the unsubscribe block inside the disconnect
handling.
- Around line 1564-1567: The call to the missing helper
is_dispatch_read_supported(cmd) in the monoio fast-path should be resolved:
either remove the is_dispatch_read_supported(...) guard to match
handler_sharded.rs (so the condition becomes just !metadata::is_write(cmd) &&
!remote_groups.contains_key(&target)), or implement the helper with the same
logic/predicate you intended (create a function named is_dispatch_read_supported
that accepts the command type and returns a bool) and use that function here;
update any documentation/comments to reflect which approach you chose and ensure
the condition short-circuits correctly around metadata::is_write,
remote_groups.contains_key, and the new helper if added.

In `@src/shard/dispatch.rs`:
- Around line 64-74: add() currently does an unconditional fetch_sub(1) which
can underflow remaining and wrap to u32::MAX; fix by preventing that: either
reject zero-pending slots in PubSubResponseSlot::new (return Err for count==0)
or make add() detect and fail-closed when remaining is already 0 by using an
atomic compare/modify (e.g. fetch_update/compare_exchange loop) to only
decrement when remaining > 0 and only call self.waker.wake() when the atomic
transition was from 1->0; also ensure memory orderings mirror the existing logic
and keep total.fetch_add(count) behavior.

---

Duplicate comments:
In `@src/server/conn/handler_monoio.rs`:
- Around line 377-386: The PING branch in handler_monoio.rs always responds with
["pong",""] and ignores any optional payload; change it to mirror the tokio
handler by checking the incoming command arguments and echoing the provided
payload when present (otherwise send an empty bulk string). Locate the PING
match arm (where resp is constructed, resp_buf is created, codec.encode_frame is
called and stream.write_all is awaited) and replace the fixed
Frame::Array([...Frame::BulkString(Bytes::from_static(b""))...]) with logic that
uses the parsed argument bytes (the command's payload variable/args) to build
the second Frame::BulkString so the response returns the supplied payload when
provided.
- Around line 1640-1644: Replace the single failing push with a bounded retry
loop that mirrors the logic used in handler_sharded.rs: repeatedly attempt
producers[idx].try_push(batch_msg) (cloning or otherwise making batch_msg
available for retries), break and call spsc_notifiers[target].notify_one() on
success, and only call slot.add(0) after the retry limit is exhausted; ensure
the loop uses the same backoff/yield behavior as handler_sharded.rs to avoid
busy-waiting and preserves the original semantics of slot and notification on
success.

In `@src/server/conn/handler_sharded.rs`:
- Around line 441-450: The code unconditionally decrements subscription_count
after calling pubsub_registry.write().unsubscribe; change it to check the
unsubscribe result (e.g., the boolean/Option returned by unsubscribe) and only
decrement subscription_count, call
unpropagate_subscription(&all_remote_sub_maps, &ch, shard_id, num_shards,
false), and send the unsubscribe_response when the registry actually removed a
subscription; apply the same guarded logic to the PUNSUBSCRIBE handling (mirror
the conditional decrement/unpropagate/send for each channel) so
subscription_count cannot go negative or reach zero incorrectly.
- Around line 495-501: The RESET branch currently drops the return values of
pubsub_registry.write().unsubscribe_all(subscriber_id) and
.punsubscribe_all(subscriber_id), so unpropagate_subscription is never invoked
and stale entries remain; change the code to store each call's result (e.g., let
removed = pubsub_registry.write().unsubscribe_all(subscriber_id); and let
premoved = pubsub_registry.write().punsubscribe_all(subscriber_id);), then
iterate over the returned channel list and pattern list and call
unpropagate_subscription for each removed channel/pattern (using subscriber_id
and the returned channel/pattern identifiers) before sending the RESET reply;
keep existing write_buf/stream handling and ensure you still reset
subscription_count and clear write_buf.
- Around line 481-488: The PING branch currently ignores arguments and always
returns ["pong",""]; update the handler (the cmd == b"PING" branch where
write_buf, Frame::Array, Frame::BulkString and crate::protocol::serialize are
used) to validate arity (only 0 or 1 allowed) and return a protocol error
(Frame::Error with "ERR wrong number of arguments for 'ping' command) when more
than one argument is supplied; when zero args keep the second element as empty
BulkString, and when one arg echo that argument back as the second BulkString
(use the actual arg bytes from the parsed command) before serializing and
writing via stream.write_all.
- Around line 1462-1466: The current PUBLISH path silently drops messages when
producers[idx].try_push(batch_msg) fails by immediately calling slot.add(0);
instead, replace that else branch with the same retry/backoff used elsewhere:
loop attempting producers[idx].try_push(batch_msg) until it succeeds, calling
spsc_notifiers[target].notify_one() and performing the same yield/backoff
between attempts (as used around the other try_push use site), and only call
slot.add(0) if you detect a terminal condition (e.g., producer closed) using the
same check used in the other pattern; reference the symbols
producers[idx].try_push, spsc_notifiers[target].notify_one, and slot.add to
locate and update the code.

In `@tests/integration.rs`:
- Around line 4825-4835: The test currently asserts receivers >= 1 for the Redis
PUBLISH result, which masks sharded double-counting or stale state; change the
assertion in the PUBLISH checks that use redis::cmd("PUBLISH") (the variable
receivers from the .query_async(&mut pub_conn).await.unwrap()) to require
equality (== 1) instead of >= 1, and update the identical assertion at the other
occurrence (the one around lines 4866-4872) so both PUBLISH cases fail closed if
more than one subscriber is reported.
- Around line 2439-2441: The test harness creates an AffinityTracker and
therefore allows loopback affinity to collapse pub/sub onto one shard; to fix,
disable affinity for these pub/sub tests by not using or passing the affinity
tracker (remove or bypass the
std::sync::Arc::new(parking_lot::RwLock::new(moon::shard::affinity::AffinityTracker::new()))
used as affinity_tracker) or ensure all pub/sub connections are established
before issuing any SUBSCRIBE/PSUBSCRIBE so AffinityTracker::lookup(&peer_ip)
cannot bias shard selection; update the harness around the affinity_tracker
creation or the test connection setup so sharded cases exercise remote fanout
and cross-shard aggregation.
- Around line 4994-5012: The NUMSUB reply validation is currently using silent
if-let checks on result which allow malformed shapes to pass; change the test to
explicitly match the full reply shape (the sequence [BulkString(channel1),
Int(count1), BulkString(channel2), Int(count2), ...]) and panic/assert on any
unexpected variants or length — e.g., match result.as_slice() against the exact
pattern of redis::Value::BulkString and redis::Value::Int for the three channels
(referencing result, redis::Value::BulkString and redis::Value::Int) and use
assert_eq! on the extracted counts so the test fails if the layout or types
differ.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 15dbc0d8-e8c0-4d14-b43d-7c1c9a615a29

📥 Commits

Reviewing files that changed from the base of the PR and between 05a2600 and 31cbd16.

📒 Files selected for processing (12)
  • src/main.rs
  • src/server/conn/handler_monoio.rs
  • src/server/conn/handler_sharded.rs
  • src/server/conn/mod.rs
  • src/server/conn/util.rs
  • src/server/listener.rs
  • src/shard/conn_accept.rs
  • src/shard/dispatch.rs
  • src/shard/event_loop.rs
  • src/shard/mod.rs
  • src/shard/spsc_handler.rs
  • tests/integration.rs
🚧 Files skipped from review as they are similar to previous changes (4)
  • src/shard/spsc_handler.rs
  • src/main.rs
  • src/shard/conn_accept.rs
  • src/shard/event_loop.rs

Comment on lines +84 to +104
/// Remove a subscription (or pattern subscription) from all remote shards' subscriber maps.
///
/// Called after unsubscribing locally. Acquires each shard's RemoteSubscriberMap write lock
/// individually (no nested locks).
pub(crate) fn unpropagate_subscription(
all_remote_sub_maps: &[std::sync::Arc<
parking_lot::RwLock<crate::shard::remote_subscriber_map::RemoteSubscriberMap>,
>],
channel: &Bytes,
shard_id: usize,
num_shards: usize,
is_pattern: bool,
) {
for target in 0..num_shards {
if target == shard_id {
continue;
}
all_remote_sub_maps[target]
.write()
.remove(channel, shard_id, is_pattern);
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Only unpropagate after the last local unsubscribe.

RemoteSubscriberMap only tracks whether a shard is interested in a channel/pattern, not how many local subscribers that shard still has. Removing shard_id here on every local UNSUBSCRIBE clears remote fanout even when this shard still has other subscribers, so later cross-shard PUBLISHes stop reaching them. Gate this helper on the local count transitioning to zero, or make the remote map keep per-shard refcounts.

Comment on lines +64 to +74
/// Called by SPSC handler: add this shard's subscriber count and decrement remaining.
/// Wakes the connection handler when the last shard responds.
#[inline]
pub fn add(&self, count: i64) {
self.total.fetch_add(count, Ordering::Relaxed);
let prev = self.remaining.fetch_sub(1, Ordering::Release);
if prev == 1 {
// This was the last shard -- wake the connection handler
self.waker.wake();
}
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't let remaining wrap past zero.

add() unconditionally does fetch_sub(1). If a PubSubResponseSlot::new(0) is still attached to a dispatched publish, remaining wraps to u32::MAX and the slot can never become ready again. The new SPSC tests in src/shard/mod.rs already build that shape, so this is not just theoretical. Reject zero-pending dispatched slots up front, or make add() fail closed when the slot is already complete.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/dispatch.rs` around lines 64 - 74, add() currently does an
unconditional fetch_sub(1) which can underflow remaining and wrap to u32::MAX;
fix by preventing that: either reject zero-pending slots in
PubSubResponseSlot::new (return Err for count==0) or make add() detect and
fail-closed when remaining is already 0 by using an atomic compare/modify (e.g.
fetch_update/compare_exchange loop) to only decrement when remaining > 0 and
only call self.waker.wake() when the atomic transition was from 1->0; also
ensure memory orderings mirror the existing logic and keep
total.fetch_add(count) behavior.

…dler

- spawn_migrated_monoio_connection: add 4 missing pub/sub params
  (remote_subscriber_map, all_pubsub_registries, all_remote_sub_maps,
  pubsub_affinity) and pass to handle_connection_sharded_monoio
- event_loop.rs: update all 4 monoio migration call sites
- spawn_monoio_connection: fix remote_subscriber_map type from
  Rc<RefCell<>> to Arc<parking_lot::RwLock<>>
- handler_monoio.rs: add is_dispatch_read_supported import, rename
  affinity_tracker → pubsub_affinity to avoid shadowing with FD
  migration AffinityTracker, fix Frame::Array conversions
- command/mod.rs: add is_dispatch_read_supported() function

Verified: cargo check (monoio), cargo check (tokio), cargo test,
cargo clippy (both profiles) — all clean.
Copy link
Copy Markdown

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/command/mod.rs (1)

645-683: Add parity tests for fast-path gating vs dispatch_read().

Given this function is a routing gate, add focused unit tests that assert is_dispatch_read_supported() is true for representative supported reads (e.g., GETRANGE, SUBSTR) and false for nearby non-read/unknown commands. This will prevent future drift.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/command/mod.rs` around lines 645 - 683, Add unit tests that assert parity
between the fast-path gate is_dispatch_read_supported(cmd: &[u8]) and the real
routing behavior by covering representative supported read commands and nearby
non-read or unknown commands; specifically, write tests that call
is_dispatch_read_supported with byte slices for commands like b"GET",
b"GETRANGE", b"SUBSTR", b"STRLEN", b"HGETALL" (expect true where listed in the
match) and with similar-length non-read/unknown commands (expect false), and
also add a small assertion that compares these boolean results against
dispatch_read(...) for the same inputs to detect drift; place tests near other
command tests and reference the functions is_dispatch_read_supported and
dispatch_read so reviewers can locate them.
src/shard/conn_accept.rs (1)

99-100: Share the shard-wide pub/sub collections instead of cloning them into each handler.

all_pubsub_registries.to_vec() and all_remote_sub_maps.to_vec() allocate fresh containers per accepted connection, and the same pattern repeats in the migrated/monoio paths below. With many shards and long-lived clients, that turns already-shared global state into O(connections × shards) pointer storage. An Arc<[...]> or a shared wrapper struct would avoid the per-connection container churn.

♻️ Suggested shape
-    all_pubsub_registries: &[Arc<parking_lot::RwLock<PubSubRegistry>>],
-    all_remote_sub_maps: &[Arc<parking_lot::RwLock<RemoteSubscriberMap>>],
+    all_pubsub_registries: &Arc<[Arc<parking_lot::RwLock<PubSubRegistry>>]>,
+    all_remote_sub_maps: &Arc<[Arc<parking_lot::RwLock<RemoteSubscriberMap>>]>,
...
-    let all_regs = all_pubsub_registries.to_vec();
-    let all_rsm = all_remote_sub_maps.to_vec();
+    let all_regs = all_pubsub_registries.clone();
+    let all_rsm = all_remote_sub_maps.clone();

Mirror the same type change in the migrated and monoio helper signatures.

Also applies to: 136-137

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/conn_accept.rs` around lines 99 - 100, The code currently clones
the slice parameters all_pubsub_registries and all_remote_sub_maps into
per-connection Vecs (via to_vec()), causing O(connections×shards) pointer churn;
instead change the parameters and any helper signatures (including the migrated
and monoio helpers) to accept a shared reference-counted container (for example
Arc<[Arc<parking_lot::RwLock<PubSubRegistry>>]> and
Arc<[Arc<parking_lot::RwLock<RemoteSubscriberMap>>]>) or a small shared wrapper
type and pass that directly into the connection handler so you can remove the
to_vec() calls and stop allocating per-connection containers; update all
function signatures that currently take the slices (all_pubsub_registries,
all_remote_sub_maps) to the new Arc-backed types to keep the same shared
ownership without per-connection allocation.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/command/mod.rs`:
- Around line 639-644: The doc comment for the shared-read guard overstates its
guarantees: update the comment to describe it as a coarse prefilter (matching
only on (len, b0) buckets) and clearly state that exact command matching is
performed later in dispatch_read(), or alternatively tighten the predicate to
perform a full exact match here; apply the same wording change to the other
similar doc block referenced (lines ~651–682). Mention the guard and
dispatch_read() by name so readers can find the coarse prefilter and the
definitive check.

In `@src/shard/conn_accept.rs`:
- Around line 29-31: The code defines a StdRwLock type alias that points to
std::sync::RwLock and is used for global config state (ACL, runtime config,
replication, cluster state); replace that alias and all instantiations with
parking_lot::RwLock (or remove the alias entirely) so parking_lot locks are used
everywhere, update any use sites that reference StdRwLock to the parking_lot
type, and ensure constructors in the code that create the global
ACL/runtime/replication/cluster state use parking_lot::RwLock::new(...) instead
of std::sync::RwLock so the CLAUDE.md guideline is followed.

---

Nitpick comments:
In `@src/command/mod.rs`:
- Around line 645-683: Add unit tests that assert parity between the fast-path
gate is_dispatch_read_supported(cmd: &[u8]) and the real routing behavior by
covering representative supported read commands and nearby non-read or unknown
commands; specifically, write tests that call is_dispatch_read_supported with
byte slices for commands like b"GET", b"GETRANGE", b"SUBSTR", b"STRLEN",
b"HGETALL" (expect true where listed in the match) and with similar-length
non-read/unknown commands (expect false), and also add a small assertion that
compares these boolean results against dispatch_read(...) for the same inputs to
detect drift; place tests near other command tests and reference the functions
is_dispatch_read_supported and dispatch_read so reviewers can locate them.

In `@src/shard/conn_accept.rs`:
- Around line 99-100: The code currently clones the slice parameters
all_pubsub_registries and all_remote_sub_maps into per-connection Vecs (via
to_vec()), causing O(connections×shards) pointer churn; instead change the
parameters and any helper signatures (including the migrated and monoio helpers)
to accept a shared reference-counted container (for example
Arc<[Arc<parking_lot::RwLock<PubSubRegistry>>]> and
Arc<[Arc<parking_lot::RwLock<RemoteSubscriberMap>>]>) or a small shared wrapper
type and pass that directly into the connection handler so you can remove the
to_vec() calls and stop allocating per-connection containers; update all
function signatures that currently take the slices (all_pubsub_registries,
all_remote_sub_maps) to the new Arc-backed types to keep the same shared
ownership without per-connection allocation.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: ef2eb90f-ecfe-4741-9229-9a3377e5b9e6

📥 Commits

Reviewing files that changed from the base of the PR and between 31cbd16 and 1ccf59e.

📒 Files selected for processing (4)
  • src/command/mod.rs
  • src/server/conn/handler_monoio.rs
  • src/shard/conn_accept.rs
  • src/shard/event_loop.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/shard/event_loop.rs
  • src/server/conn/handler_monoio.rs

Comment on lines +29 to +31
/// Type alias to distinguish pre-existing std::sync::RwLock (for ACL, runtime config, etc.)
/// from parking_lot::RwLock (used for pubsub types).
type StdRwLock<T> = std::sync::RwLock<T>;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

# First, check the actual code at the flagged lines
head -100 src/shard/conn_accept.rs | tail -80

Repository: pilotspace/moon

Length of output: 3586


🏁 Script executed:

# Look for CLAUDE.md or coding guidelines
find . -name "CLAUDE.md" -o -name "*.md" | grep -i claude

Repository: pilotspace/moon

Length of output: 645


🏁 Script executed:

# Search for how std::sync::RwLock vs parking_lot::RwLock are used elsewhere in the codebase
rg "std::sync::RwLock|parking_lot::RwLock" --type rust -B 2 -A 2 | head -150

Repository: pilotspace/moon

Length of output: 9598


🏁 Script executed:

# Check CLAUDE.md for any nuance on lock usage or exceptions
cat CLAUDE.md | head -200

Repository: pilotspace/moon

Length of output: 7702


🏁 Script executed:

# Look at main.rs to understand where these std::sync::RwLock are created
grep -n "std::sync::RwLock" src/main.rs

Repository: pilotspace/moon

Length of output: 638


🏁 Script executed:

# Check if any handlers actually hold these locks across await points
rg "acl_table|runtime_config|repl_state|cluster_state" src/server/conn/ -A 3 -B 1 | head -100

Repository: pilotspace/moon

Length of output: 7689


🏁 Script executed:

# Check if std::sync::RwLock locks are held across .await points in handlers
rg "read\(\)\.unwrap\(\)|write\(\)\.unwrap\(\)" src/server/conn/ -B 3 -A 5 | head -150

Repository: pilotspace/moon

Length of output: 14791


🏁 Script executed:

# Check if the handlers actually use these locks and whether they're held across await
grep -n "\.await" src/server/conn/handler_single.rs | head -20

Repository: pilotspace/moon

Length of output: 2077


🏁 Script executed:

# Look at how the std::sync locks interact with async code
rg "acl_table\.read\(\)|runtime_config\.read\(\)|repl_state\.write\(\)" src/server/conn/ -B 2 -A 2 | head -100

Repository: pilotspace/moon

Length of output: 9265


🏁 Script executed:

# Check the actual architectural intent: are these truly "pre-existing" or newly introduced?
git log --oneline -20 src/shard/conn_accept.rs 2>/dev/null || echo "git not available, checking file age"

Repository: pilotspace/moon

Length of output: 151


🏁 Script executed:

# Check if parking_lot::RwLock alternatives already exist for these global states
rg "parking_lot::RwLock.*AclTable|parking_lot::RwLock.*RuntimeConfig|parking_lot::RwLock.*ReplicationState|parking_lot::RwLock.*ClusterState" --type rust

Repository: pilotspace/moon

Length of output: 41


🏁 Script executed:

# Look at whether other global state uses parking_lot or std::sync
rg "Arc.*RwLock.*" src/main.rs | grep -E "acl|config|repl|cluster"

Repository: pilotspace/moon

Length of output: 478


Replace std::sync::RwLock with parking_lot::RwLock for all global config state.

The guideline in CLAUDE.md is unambiguous: "Use parking_lot::RwLock / parking_lot::Mutex — never std::sync locks." The StdRwLock alias codifies std::sync::RwLock usage for ACL, runtime config, replication, and cluster state. Migrate these to parking_lot::RwLock in main.rs where they are instantiated, then update or remove the type alias.

Also applies to: 90-96

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/shard/conn_accept.rs` around lines 29 - 31, The code defines a StdRwLock
type alias that points to std::sync::RwLock and is used for global config state
(ACL, runtime config, replication, cluster state); replace that alias and all
instantiations with parking_lot::RwLock (or remove the alias entirely) so
parking_lot locks are used everywhere, update any use sites that reference
StdRwLock to the parking_lot type, and ensure constructors in the code that
create the global ACL/runtime/replication/cluster state use
parking_lot::RwLock::new(...) instead of std::sync::RwLock so the CLAUDE.md
guideline is followed.

1. is_dispatch_read_supported() doc comment: clarify it is a coarse
   prefilter matching (len, b0) buckets, not exact commands. Multiple
   commands may share a bucket. The definitive check is in dispatch_read()
   via eq_ignore_ascii_case.

2. Add 3 unit tests:
   - test_is_dispatch_read_supported_known_reads: 46 known read commands
   - test_is_dispatch_read_supported_non_reads: commands with buckets
     not in the prefilter (avoids false positives from coarse matching)
   - test_dispatch_read_parity_with_prefilter: prefilter-accepted commands
     don't panic in dispatch_read()

Skipped (not our code):
- StdRwLock alias for ACL/runtime/cluster/repl: identical on origin/main
- to_vec() per-connection: ~288 bytes, not worth 8+ signature changes
@pilotspacex-byte pilotspacex-byte added this pull request to the merge queue Mar 30, 2026
Merged via the queue into main with commit 72160b6 Mar 30, 2026
9 checks passed
@pilotspacex-byte pilotspacex-byte deleted the feat/pubsub-sharded branch March 30, 2026 02:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants