Skip to content

feat: wire ReceiverDeltaPipeline into receiver transfer context#3166

Merged
oferchen merged 2 commits into
masterfrom
feat/receiver-pipeline-wiring
Apr 11, 2026
Merged

feat: wire ReceiverDeltaPipeline into receiver transfer context#3166
oferchen merged 2 commits into
masterfrom
feat/receiver-pipeline-wiring

Conversation

@oferchen
Copy link
Copy Markdown
Owner

Summary

  • Adds a pluggable delta_pipeline field (Option<Box<dyn ReceiverDeltaPipeline>>) to ReceiverContext, defaulting to SequentialDeltaPipeline (no behavior change)
  • Adds set_delta_pipeline() public setter for callers to inject ThresholdDeltaPipeline or ParallelDeltaPipeline
  • Adds take_delta_pipeline() internal accessor for the transfer loop to consume the pipeline
  • Adds Debug supertrait to ReceiverDeltaPipeline and manual Debug impls for ParallelDeltaPipeline and ThresholdDeltaPipeline

All existing call sites (ReceiverContext::new) automatically get the sequential default. No wire format or protocol changes.

Closes #1544

Test plan

  • CI fmt+clippy passes (new field and trait bound compile cleanly)
  • CI nextest passes on all platforms (no behavior change - default is sequential)
  • Interop tests pass (no wire protocol changes)

Add a pluggable delta_pipeline field to ReceiverContext so callers can
inject SequentialDeltaPipeline (default), ParallelDeltaPipeline, or
ThresholdDeltaPipeline. The default remains sequential - no behavior
change. This lays the plumbing for opting into parallel delta dispatch.

- Add `delta_pipeline: Option<Box<dyn ReceiverDeltaPipeline>>` field
- Initialize with SequentialDeltaPipeline::new() in ReceiverContext::new()
- Add set_delta_pipeline() public setter and take_delta_pipeline() internal accessor
- Add Debug supertrait to ReceiverDeltaPipeline and impls for all types
- Re-exports already present in lib.rs from prior PR
@github-actions github-actions Bot added the enhancement New feature or request label Apr 11, 2026
…par_iter

The ReceiverDeltaPipeline trait object (containing mpsc::Receiver which
is not Sync) made ReceiverContext not Sync, breaking rayon's par_iter
closure which captures &self. Extract the individual basis config fields
into local variables so the closure only captures those Sync values.
@oferchen oferchen merged commit 6f75982 into master Apr 11, 2026
37 checks passed
@oferchen oferchen deleted the feat/receiver-pipeline-wiring branch April 11, 2026 05:36
oferchen added a commit that referenced this pull request May 1, 2026
* feat: wire ReceiverDeltaPipeline into receiver transfer context

Add a pluggable delta_pipeline field to ReceiverContext so callers can
inject SequentialDeltaPipeline (default), ParallelDeltaPipeline, or
ThresholdDeltaPipeline. The default remains sequential - no behavior
change. This lays the plumbing for opting into parallel delta dispatch.

- Add `delta_pipeline: Option<Box<dyn ReceiverDeltaPipeline>>` field
- Initialize with SequentialDeltaPipeline::new() in ReceiverContext::new()
- Add set_delta_pipeline() public setter and take_delta_pipeline() internal accessor
- Add Debug supertrait to ReceiverDeltaPipeline and impls for all types
- Re-exports already present in lib.rs from prior PR

* fix: extract basis config fields to avoid capturing non-Sync self in par_iter

The ReceiverDeltaPipeline trait object (containing mpsc::Receiver which
is not Sync) made ReceiverContext not Sync, breaking rayon's par_iter
closure which captures &self. Extract the individual basis config fields
into local variables so the closure only captures those Sync values.
oferchen added a commit that referenced this pull request May 5, 2026
* feat: wire ReceiverDeltaPipeline into receiver transfer context

Add a pluggable delta_pipeline field to ReceiverContext so callers can
inject SequentialDeltaPipeline (default), ParallelDeltaPipeline, or
ThresholdDeltaPipeline. The default remains sequential - no behavior
change. This lays the plumbing for opting into parallel delta dispatch.

- Add `delta_pipeline: Option<Box<dyn ReceiverDeltaPipeline>>` field
- Initialize with SequentialDeltaPipeline::new() in ReceiverContext::new()
- Add set_delta_pipeline() public setter and take_delta_pipeline() internal accessor
- Add Debug supertrait to ReceiverDeltaPipeline and impls for all types
- Re-exports already present in lib.rs from prior PR

* fix: extract basis config fields to avoid capturing non-Sync self in par_iter

The ReceiverDeltaPipeline trait object (containing mpsc::Receiver which
is not Sync) made ReceiverContext not Sync, breaking rayon's par_iter
closure which captures &self. Extract the individual basis config fields
into local variables so the closure only captures those Sync values.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant