Skip to content

feat(engine): parallel receive-side delta apply behind --features parallel-receive-delta (#1368)#4300

Closed
oferchen wants to merge 1 commit into
masterfrom
feat/parallel-receive-delta-1368
Closed

feat(engine): parallel receive-side delta apply behind --features parallel-receive-delta (#1368)#4300
oferchen wants to merge 1 commit into
masterfrom
feat/parallel-receive-delta-1368

Conversation

@oferchen
Copy link
Copy Markdown
Owner

Summary

  • Scaffold the parallel receive-side delta apply path behind a new
    parallel-receive-delta feature on the engine crate (forwarded
    from the transfer crate). Default off; the production sequential
    loop is unchanged.
  • New module engine::concurrent_delta::parallel_apply introduces
    DeltaChunk and ParallelDeltaApplier. Per-file Mutex<FileSlot>
    serialises writes; per-file ReorderBuffer replays chunks in
    submission order so per-file byte order is preserved even when the
    rayon verify step completes out of order.
  • ReceiverContext::enable_parallel_receive_delta installs the
    existing ParallelDeltaPipeline only when the feature is compiled
    in, keeping the receiver loop untouched otherwise.

Implements the opt-in scaffold prescribed by
docs/design/parallel-receive-delta-application.md section 6.3
(phase 2). The bench trigger from #4214 is the gating signal noted in
the task; this PR lands the opt-in surface so subsequent benches can
run against the path without disturbing default builds.

Test plan

  • CI: fmt + clippy clean across workspace
  • CI: nextest (stable) green
  • CI: Windows / macOS / Linux musl matrices green
  • Unit: single_file_in_order_matches_sequential passes byte-equal
  • Unit: single_file_out_of_order_preserves_byte_order passes
    byte-equal
  • Unit: batch_apply_matches_sequential_byte_for_byte passes
    byte-equal across two files
  • Property: random_chunk_sizes_and_permutations_match_sequential
    (48 cases, deterministic permutation per seed) passes
  • Unit: error paths pinned - unknown file, double registration,
    finish with pending chunks

Adds the opt-in scaffold for the parallel receive-side delta apply path
recommended in `docs/design/parallel-receive-delta-application.md`. The
production binary keeps the sequential apply loop by default; the new
path compiles in only with the `parallel-receive-delta` feature so the
phased rollout in section 6.3 of the design doc can advance without
disturbing existing users.

The new `engine::concurrent_delta::parallel_apply` module exposes:

- `DeltaChunk` - one literal-or-block segment carrying its file NDX and
  a monotonic per-file sequence number.
- `ParallelDeltaApplier` - per-file Mutex-guarded writer plus a per-file
  `ReorderBuffer` that replays chunks in submission order before they
  touch the destination, even when the rayon verify step completes out
  of order. Per-file byte order is therefore preserved exactly as
  upstream `receiver.c::receive_data()` emits it.
- `apply_chunk_parallel` / `apply_batch_parallel` - single and batched
  entry points; the batch path uses `rayon::par_iter` with a min-len
  derived from the configured concurrency limit.

Unit tests cover the parallel-vs-sequential byte-for-byte invariant
for both in-order and shuffled submissions, plus a proptest that
sweeps random chunk sizes and deterministic permutations against a
sequential baseline. Error paths for unregistered files, double
registration, and finalisation with pending chunks are pinned.

Transfer crate forwards a `parallel-receive-delta` feature to the
engine flag and exposes `ReceiverContext::enable_parallel_receive_delta`
that installs the existing `ParallelDeltaPipeline` when the feature is
on, keeping the receiver loop untouched when it is off.
@oferchen
Copy link
Copy Markdown
Owner Author

Closing due to unresolvable merge conflicts. Re-doing fresh on top of current master.

@oferchen oferchen closed this May 17, 2026
oferchen added a commit that referenced this pull request May 17, 2026
…apply (#4319)

Wires the existing SpillableReorderBuffer into DeltaConsumer behind a new
opt-in ConcurrentDeltaConfig, and lands the parallel receive-side delta
apply scaffold behind the parallel-receive-delta cargo feature.

SpillableReorderBuffer wiring (#1884)
- New ConcurrentDeltaConfig { spill_threshold_bytes, spill_dir } selects
  between the bare ReorderBuffer (default, behaviour unchanged) and the
  bounded-memory SpillableReorderBuffer when a threshold is supplied.
- DeltaConsumer::spawn_with_config dispatches via a ReorderMode enum so
  spawn / spawn_bypass / spawn_with_config share one inner loop entry.
- DeltaConsumerStats surfaces the cumulative spill_events counter via a
  lock-free AtomicU64 published by the reorder thread.
- Spill backend construction or I/O failures map to DeltaResult::failed
  for the offending sequence so the receiver maps to upstream exit code
  11 (FileIo) and aborts. Existing histogram/metrics machinery on the
  bare path is preserved verbatim.

Parallel receive delta apply (#1368)
- New parallel-receive-delta feature on engine (forwarded from transfer).
  Default off so production receivers continue to drive the sequential
  apply loop in receiver/transfer.rs.
- engine::concurrent_delta::parallel_apply adds DeltaChunk and
  ParallelDeltaApplier. Per-file Mutex serialises destination writes,
  per-file ReorderBuffer replays chunks in submission order, and
  rayon::join / par_iter fans the verify step across the rayon pool
  while keeping per-file byte order exact.
- ReceiverContext::enable_parallel_receive_delta installs the existing
  ParallelDeltaPipeline only when the feature is compiled in, leaving
  the default receiver loop untouched.

Re-exports the union of ConcurrentDeltaConfig, DeltaConsumerStats, and
(feature-gated) DeltaChunk / ParallelDeltaApplier from
crates/engine/src/concurrent_delta/mod.rs alongside the existing
HistogramStats, ReorderMetrics, and ReorderBuffer surface.

Tests
- spillable_consumer_preserves_order_under_pressure drives 1000 items
  through a 1 KiB budget with a deliberately delayed head-of-line item
  and asserts both in-order delivery and spill_events > 0.
- spillable_consumer_matches_bare_output_byte_for_byte compares spill
  vs bare paths via SpillCodec encoding.
- spawn_with_config_off_matches_spawn and stats_zero_when_spill_disabled
  pin the default-off invariants.
- parallel_apply: in-order, shuffled, and batched byte-equality tests
  plus a proptest over random chunk sizes / deterministic permutations.

Replaces the conflict-stalled PRs #4299 and #4300 with a single
combined change on top of current master.

Closes #1884
Closes #1368
@github-actions github-actions Bot added the enhancement New feature or request label May 17, 2026
oferchen added a commit that referenced this pull request May 18, 2026
…apply (#4319)

Wires the existing SpillableReorderBuffer into DeltaConsumer behind a new
opt-in ConcurrentDeltaConfig, and lands the parallel receive-side delta
apply scaffold behind the parallel-receive-delta cargo feature.

SpillableReorderBuffer wiring (#1884)
- New ConcurrentDeltaConfig { spill_threshold_bytes, spill_dir } selects
  between the bare ReorderBuffer (default, behaviour unchanged) and the
  bounded-memory SpillableReorderBuffer when a threshold is supplied.
- DeltaConsumer::spawn_with_config dispatches via a ReorderMode enum so
  spawn / spawn_bypass / spawn_with_config share one inner loop entry.
- DeltaConsumerStats surfaces the cumulative spill_events counter via a
  lock-free AtomicU64 published by the reorder thread.
- Spill backend construction or I/O failures map to DeltaResult::failed
  for the offending sequence so the receiver maps to upstream exit code
  11 (FileIo) and aborts. Existing histogram/metrics machinery on the
  bare path is preserved verbatim.

Parallel receive delta apply (#1368)
- New parallel-receive-delta feature on engine (forwarded from transfer).
  Default off so production receivers continue to drive the sequential
  apply loop in receiver/transfer.rs.
- engine::concurrent_delta::parallel_apply adds DeltaChunk and
  ParallelDeltaApplier. Per-file Mutex serialises destination writes,
  per-file ReorderBuffer replays chunks in submission order, and
  rayon::join / par_iter fans the verify step across the rayon pool
  while keeping per-file byte order exact.
- ReceiverContext::enable_parallel_receive_delta installs the existing
  ParallelDeltaPipeline only when the feature is compiled in, leaving
  the default receiver loop untouched.

Re-exports the union of ConcurrentDeltaConfig, DeltaConsumerStats, and
(feature-gated) DeltaChunk / ParallelDeltaApplier from
crates/engine/src/concurrent_delta/mod.rs alongside the existing
HistogramStats, ReorderMetrics, and ReorderBuffer surface.

Tests
- spillable_consumer_preserves_order_under_pressure drives 1000 items
  through a 1 KiB budget with a deliberately delayed head-of-line item
  and asserts both in-order delivery and spill_events > 0.
- spillable_consumer_matches_bare_output_byte_for_byte compares spill
  vs bare paths via SpillCodec encoding.
- spawn_with_config_off_matches_spawn and stats_zero_when_spill_disabled
  pin the default-off invariants.
- parallel_apply: in-order, shuffled, and batched byte-equality tests
  plus a proptest over random chunk sizes / deterministic permutations.

Replaces the conflict-stalled PRs #4299 and #4300 with a single
combined change on top of current master.

Closes #1884
Closes #1368
oferchen added a commit that referenced this pull request May 18, 2026
…apply (#4319)

Wires the existing SpillableReorderBuffer into DeltaConsumer behind a new
opt-in ConcurrentDeltaConfig, and lands the parallel receive-side delta
apply scaffold behind the parallel-receive-delta cargo feature.

SpillableReorderBuffer wiring (#1884)
- New ConcurrentDeltaConfig { spill_threshold_bytes, spill_dir } selects
  between the bare ReorderBuffer (default, behaviour unchanged) and the
  bounded-memory SpillableReorderBuffer when a threshold is supplied.
- DeltaConsumer::spawn_with_config dispatches via a ReorderMode enum so
  spawn / spawn_bypass / spawn_with_config share one inner loop entry.
- DeltaConsumerStats surfaces the cumulative spill_events counter via a
  lock-free AtomicU64 published by the reorder thread.
- Spill backend construction or I/O failures map to DeltaResult::failed
  for the offending sequence so the receiver maps to upstream exit code
  11 (FileIo) and aborts. Existing histogram/metrics machinery on the
  bare path is preserved verbatim.

Parallel receive delta apply (#1368)
- New parallel-receive-delta feature on engine (forwarded from transfer).
  Default off so production receivers continue to drive the sequential
  apply loop in receiver/transfer.rs.
- engine::concurrent_delta::parallel_apply adds DeltaChunk and
  ParallelDeltaApplier. Per-file Mutex serialises destination writes,
  per-file ReorderBuffer replays chunks in submission order, and
  rayon::join / par_iter fans the verify step across the rayon pool
  while keeping per-file byte order exact.
- ReceiverContext::enable_parallel_receive_delta installs the existing
  ParallelDeltaPipeline only when the feature is compiled in, leaving
  the default receiver loop untouched.

Re-exports the union of ConcurrentDeltaConfig, DeltaConsumerStats, and
(feature-gated) DeltaChunk / ParallelDeltaApplier from
crates/engine/src/concurrent_delta/mod.rs alongside the existing
HistogramStats, ReorderMetrics, and ReorderBuffer surface.

Tests
- spillable_consumer_preserves_order_under_pressure drives 1000 items
  through a 1 KiB budget with a deliberately delayed head-of-line item
  and asserts both in-order delivery and spill_events > 0.
- spillable_consumer_matches_bare_output_byte_for_byte compares spill
  vs bare paths via SpillCodec encoding.
- spawn_with_config_off_matches_spawn and stats_zero_when_spill_disabled
  pin the default-off invariants.
- parallel_apply: in-order, shuffled, and batched byte-equality tests
  plus a proptest over random chunk sizes / deterministic permutations.

Replaces the conflict-stalled PRs #4299 and #4300 with a single
combined change on top of current master.

Closes #1884
Closes #1368
@oferchen oferchen deleted the feat/parallel-receive-delta-1368 branch May 19, 2026 19:29
oferchen added a commit that referenced this pull request May 21, 2026
…euristic (PIP-3 + PIP-5) (#4666)

* perf(transfer): enable parallel receive-delta by default via Path B heuristic

Wires the receiver-side parallel delta apply path into production per
`docs/design/parallel-receive-delta-default-on.md` Path B, combining
steps 4 and 5 in a single change.

- Add `PARALLEL_RECEIVE_FILE_COUNT_THRESHOLD = 100` and
  `PARALLEL_RECEIVE_BYTES_THRESHOLD = 64 MiB` on the receiver. Thresholds
  match the existing rayon parallel-stat cutoff convention and the
  `copy_file_range` 64 MiB crossover.
- Add `ReceiverContext::select_receiver_strategy(file_count, total_size)`
  (pure heuristic), `total_source_bytes()` (sums the in-memory file
  list), and `dispatch_receiver_strategy()` (logs the decision under the
  `GENR` debug channel and swaps the delta pipeline when the
  `parallel-receive-delta` feature is on).
- Call `dispatch_receiver_strategy` at the top of `run_sync`,
  `run_pipelined`, and `run_pipelined_incremental`, immediately after
  `setup_transfer` returns the file count and the file list is in
  memory.
- Surface the choice as `TransferStats::receiver_strategy_chosen` via
  the new `ReceiverStrategy { Sequential, Parallel }` enum
  (`Sequential` by default).
- Promote `parallel-receive-delta` into the `default = [...]` set on
  `engine`, `transfer`, `core`, `cli`, and the workspace binary so the
  shipped `oc-rsync` picks up the dispatch with no opt-in flag. Stay
  compatible with `--no-default-features` builds: when the feature is
  compiled out the dispatcher logs `receiver_strategy=parallel_unavailable`
  and falls back to sequential so the telemetry counter never lies
  about the path actually taken.

Tests:
- Unit coverage for the heuristic boundary matrix (below/above each
  threshold, exact-boundary, empty transfer).
- File-list-integration coverage for `total_source_bytes` and the full
  `dispatch_receiver_strategy` flow with populated file lists.

Wire-format parity stays guarded by the existing proptests (#4300 +
#4319). The soak + bench gates (criteria 1, 3, 4, 5 in section 5) are
explicit risk acceptance; PIP-4 (interop suite re-run) and PIP-6 (bench
backfill) remain as follow-ups.

Refs #1368, #2566, #2568.

* style(transfer): apply rustfmt to PIP-3+5 receiver-strategy code

CI fmt+clippy diff: collapsed select_receiver_strategy multi-line
signature and the FileEntry::new_file argument list in
dispatch_large_bytes_picks_parallel onto single lines.

Behaviour-neutral.

* fix(transfer): drop identity-op multiplier in receiver-strategy tests

clippy::identity-op fired on `1 * 1024 * 1024`. Collapsed to
`1024 * 1024` in the two boundary-matrix tests.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant