perf: sharded mode pipeline throughput optimization#112
Merged
Conversation
extract the message-processing body into a process_message helper, then add a try_recv drain loop after the initial recv(). this amortizes the tokio::select! overhead across bursts of pipelined commands — multiple messages are processed per wakeup instead of re-entering select! for each one.
make Engine::shard_for_key and ShardHandle::dispatch public, add Engine::dispatch_to_shard for the upcoming dispatch-collect pipeline. update all ~30 single-key command arms in execute() to compute the shard index first, then move the key into ShardRequest instead of cloning it. saves one String heap allocation per request. multi-key commands (MGET, MSET, DEL, EXISTS) still clone since keys go to different shards.
replace the join_all(futures) pipeline pattern with a two-phase dispatch-collect approach: 1. dispatch phase: parse each frame, send the request to the owning shard via mpsc (fast — completes immediately with channel capacity), storing a oneshot receiver + lightweight ResponseTag 2. collect phase: await each oneshot in order, convert ShardResponse to Frame using the tag this eliminates N large async state machines from join_all (each was a full process() + execute() future, ~1KB+ on the stack for P=16). instead, all dispatches are simple mpsc sends, and shards process in parallel while the connection handler waits. single-key commands (GET, SET, INCR, etc) use the fast dispatch path. complex commands (broadcast, multi-key, cluster) fall through to the existing execute() function.
kacy
added a commit
that referenced
this pull request
Feb 19, 2026
* perf: drain shard channel after recv to reduce select! overhead extract the message-processing body into a process_message helper, then add a try_recv drain loop after the initial recv(). this amortizes the tokio::select! overhead across bursts of pipelined commands — multiple messages are processed per wakeup instead of re-entering select! for each one. * perf: eliminate key clones in single-key shard commands make Engine::shard_for_key and ShardHandle::dispatch public, add Engine::dispatch_to_shard for the upcoming dispatch-collect pipeline. update all ~30 single-key command arms in execute() to compute the shard index first, then move the key into ShardRequest instead of cloning it. saves one String heap allocation per request. multi-key commands (MGET, MSET, DEL, EXISTS) still clone since keys go to different shards. * perf: replace join_all with dispatch-collect pipeline replace the join_all(futures) pipeline pattern with a two-phase dispatch-collect approach: 1. dispatch phase: parse each frame, send the request to the owning shard via mpsc (fast — completes immediately with channel capacity), storing a oneshot receiver + lightweight ResponseTag 2. collect phase: await each oneshot in order, convert ShardResponse to Frame using the tag this eliminates N large async state machines from join_all (each was a full process() + execute() future, ~1KB+ on the stack for P=16). instead, all dispatches are simple mpsc sends, and shards process in parallel while the connection handler waits. single-key commands (GET, SET, INCR, etc) use the fast dispatch path. complex commands (broadcast, multi-key, cluster) fall through to the existing execute() function.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
summary
three optimizations to reduce channel overhead in sharded mode, targeting the 4-8x throughput gap vs concurrent mode at high pipeline depths (P=16).
root cause: every pipelined command in sharded mode pays ~300-450ns of channel overhead (oneshot allocation + mpsc send + atomic wakeup + oneshot recv). the actual keyspace operation takes ~20-50ns. channel overhead is 6-9x the useful work.
changes
shard channel draining — after
recv()returns a message, drain pending messages withtry_recv()before re-enteringselect!. this amortizes the select! overhead across bursts of pipelined commands.eliminate key clones — make
Engine::shard_for_keypublic and compute the shard index before moving the key intoShardRequest, avoiding oneStringheap allocation per single-key command (~30 command types).dispatch-collect pipeline — replace
join_all(futures)with a two-phase pattern: dispatch all commands to shards via mpsc sends (fast, non-blocking), then collect oneshot responses in order. eliminates N large async state machines from join_all. uses a lightweightResponseTagenum to guide ShardResponse → Frame conversion in the collect phase.what was tested
cargo clippy --workspace --features protobuf -- -D warnings— cleancargo test --workspace --features protobuf— all 363 unit tests + 113 integration tests pass (occasional flaky port-collision failures pre-exist on main)design considerations
execute()function.ResponseTagis a ~1-byte enum discriminant that replaces keeping the fullCommandalive during the collect phase.process_messagehelper inshard.rshandles the same dispatch + AOF + special-request logic as before, just extracted to avoid duplication.