Skip to content

MSQ: Enable stages to control their own shuffle logic. #18144

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jun 24, 2025

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Jun 15, 2025

This patch pushes shuffle logic from RunWorkOrder into the stage definition. The motivation is to enable optimizations that take advantage of data already being sorted or partitioned in the desired way.

Main changes:

  • FrameProcessorFactory is renamed StageProcessor, and its "makeProcessors" method is replaced with a more powerful "execute" method. The new "execute" is expected to fully contain partitioning and shuffling logic.

  • RunWorkOrder, the worker class that manages execution, no longer has any shuffle-related processing code. It delegates this to the StageProcessor.

  • StandardStageOperations, StandardStageRunner, & StandardStageProcessor are introduced for StageProcessor that want to perform shuffling in the "standard" way that was formerly mandatory.

Other changes:

  • Moved FrameContext and related classes from the "kernel" package to the "exec" package.

  • Remove "readableChannelUsableWhenWriting" from OutputChannel, to reduce complexity. It was only used by the standard hashPartition implementation, and wasn't strictly needed there.

This patch pushes shuffle logic from RunWorkOrder into the stage definition.
The motivation is to enable optimizations that take advantage of data already
being sorted or partitioned in the desired way.

Main changes:

- FrameProcessorFactory is renamed StageProcessor, and its "makeProcessors"
  method is replaced with a more powerful "execute" method. The new "execute"
  is expected to fully contain partitioning and shuffling logic.

- RunWorkOrder, the worker class that manages execution, no longer has any
  shuffle-related processing code. It delegates this to the StageProcessor.

- StandardStageOperations, StandardStageRunner, & StandardStageProcessor are
  introduced for StageProcessor that want to perform shuffling in the "standard"
  way that was formerly mandatory.

Other changes:

- Moved FrameContext and related classes from the "kernel" package to the "exec" package.

- Remove "readableChannelUsableWhenWriting" from OutputChannel, to reduce complexity.
  It was only used by the standard hashPartition implementation, and wasn't strictly
  needed there.
@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Jun 15, 2025
@cryptoe cryptoe requested a review from adarshsanjeev June 16, 2025 04:43
Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not finished going thru all parts...so far it was interesting :)
just left some comments here and there :)

Due to the fact that classes were renamed/moved its harder to focus on the core aspects of the changes - since those changes have touched a lot of files I'm a bit worried if the PR enters a conflicted change after some other changes will be merged.

I think it doesn't change fundamentally a lot of things right now - is that right?
but is it on the roadmap to take away the shuffleSpec from the StageDefintion ?

/**
* All the things needed for {@link StageProcessor#execute(ExecutionContext)} to run the work for a stage.
*/
public interface ExecutionContext
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this interface might be unnecessary: it declares 12 methods and has only a single implementation - its already seem to have suffered from some bending from that single implementation.

might be better to just drop the interface and introduce generalization when there will be at least 2 implementors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose these could be collapsed. I don't currently have plans to create other implementations. I had a couple reasons for doing an interface:

  • I thought it would be easier for developers of StageProcessor to understand what they get for execute if they could read an interface, rather than a concrete class. The code for a concrete class has a bunch of incidental implementation stuff in it that isn't necessarily helpful to its user.
  • I thought it might be useful for testing to have the interface and concrete class be separate things. Although, I didn't end up taking advantage of this. I suppose tests could use mocks too.

I left this as is, because IMO the interface approach is cleaner, but it's kind of borderline so I could go either way.

Comment on lines +93 to +106
/**
* Callback that must be called when input is done being read. This is essential for two reasons:
* (1) If the prior stage ran with {@link OutputChannelMode#MEMORY}, this informs the controller that it can shut
* down the prior stage.
* (2) With {@link ShuffleKind#GLOBAL_SORT}, this provides statistics that are used to determine global boundaries.
*
* Typically called by {@link StandardShuffleOperations#gatherResultKeyStatisticsIfNeeded(ListenableFuture)}.
*/
void onDoneReadingInput(@Nullable ClusterByStatisticsSnapshot snapshot);

/**
* Callback to report a nonfatal warning.
*/
void onWarning(Throwable t);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: in the scope of an ExecutionContext these callback like methods look a little bit like outliers

Comment on lines +93 to +104
final ProcessorsAndChannels<T, R> processors = makeProcessors(
context.workOrder().getStageDefinition(),
context.workOrder().getWorkerNumber(),
context.workOrder().getInputs(),
context.inputSliceReader(),
(ExtraInfoType) context.workOrder().getExtraInfo(),
stageRunner.workOutputChannelFactory(),
context.frameContext(),
context.threadCount(),
context.counters(),
context::onWarning,
MultiStageQueryContext.removeNullBytes(context.workOrder().getWorkerContext())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: around 10 arguments are constructed one way or another from the context argument....
I think it would be more straight forward to just pass the ExecutionContext and let the other end decide what it needs...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did it this way to avoid changing too much code in this PR. Changing the signature of makeProcessors would require changes to all the subclasses, which are many. In a future patch I think it would make sense to remove this entire StandardStageProcessor class and rewrite the subclasses to directly implement execute. I left a note about this in the class-level javadoc:

 * This abstract class may be removed someday, in favor of its subclasses using {@link StandardStageRunner} directly.
 * It was introduced mainly to minimize code changes in a refactor.

}

initInputSliceReader();
initGlobalSortPartitionBoundariesIfNeeded();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like something is leaking here...not sure how to fix it - but this and the stuff it moves doesn't seem to belong here...I'll try to dig into this

Copy link
Contributor Author

@gianm gianm Jun 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the initGlobalSortPartitionBoundariesIfNeeded()?

The situation with global sort boundaries has some back-and-forth between the worker and controller, which leads to some complexity. The flow of partitioned global sort is like this:

  • Each worker gathers statistics from its input as it is read. While doing so it buffers the input.
  • Each worker sends those statistics to the controller when done reading input.
  • The controller gathers statistics from each worker, uses them to determine good global partition boundaries, then sends those boundaries to all workers. (Crucially each worker gets the same set of boundaries)
  • The workers partition data according to the boundaries and sorts within each partition. When each partition is later merged across workers, this yields a total sort of the data.

This means that some stuff related to global partition boundaries is going to leak into the ExecutionContext:

  • there has to be a way for the worker to send its statistics (onDoneReadingInput)
  • there has to be a way for the worker to receive the global partition boundaries (globalClusterByPartitions)

Coming back to the initGlobalSortPartitionBoundariesIfNeeded you highlighted: this is for the case where the controller isn't involved, possibly because there's only one partition. In this case the worker does not send statistics nor does it receive global boundaries from the controller. It can know its boundaries on its own. (It will just be "all data goes to partition 0").

@gianm
Copy link
Contributor Author

gianm commented Jun 16, 2025

@kgyrtkirk thank you for taking a look.

Due to the fact that classes were renamed/moved its harder to focus on the core aspects of the changes - since those changes have touched a lot of files I'm a bit worried if the PR enters a conflicted change after some other changes will be merged.

There are a lot of renames, but most of them are just renaming things from XFrameProcessorFactory to XStageProcessor.

I think it doesn't change fundamentally a lot of things right now - is that right?

Yes, the idea in this patch was to not change anything fundamentally, especially with the existing stage implementations. The main change is that in the future, stages can control their own shuffling logic, rather than being "forced" into having the standard approach applied.

but is it on the roadmap to take away the shuffleSpec from the StageDefintion ?

I don't plan to remove shuffleSpec from StageDefinition. It's still needed for a couple of reasons:

  • the MSQ framework still does determine how many workers should be launched, when they should be launched, and which workers should handle which partitions. It needs the shuffleSpec to help figure this out.
  • for the GLOBAL_SORT shuffle, the MSQ framework still determines the partition boundaries based on statistics sent from each worker. It needs the shuffleSpec to know it should activate this mode.

For this reason, I have this text in the javadoc for StageProcessor:

 * For stages that shuffle, i.e. where {@link StageDefinition#doesShuffle()}, the outputs must be partitioned according
 * to the {@link StageDefinition#getShuffleSpec()}.

The idea is that the shuffleSpec still exists, and the stage processor must respect it. The StandardStageRunner and StandardShuffleOperations are provided for stages that want to do things the "standard" way. In the future I imagine some stages will have an optimized path for certain cases of shuffleSpec, and then fallback to the StandardShuffleOperations for other cases.

Copy link
Member

@kgyrtkirk kgyrtkirk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like the result and the outpuchannel should be more closer to eachother...
there are multiple classes which has an And in their name. I feel like some concept is missing...as those are essentially try to repack those into a single object - I suspect that the fact that Future-s are used doesn't make it easier either.

I don't want to stall this change as it also cleaned some things up a bit.

@gianm
Copy link
Contributor Author

gianm commented Jun 23, 2025

I feel like the result and the outpuchannel should be more closer to eachother...
there are multiple classes which has an And in their name. I feel like some concept is missing...as those are essentially try to repack those into a single object - I suspect that the fact that Future-s are used doesn't make it easier either.

I pushed a couple commits resolving merge conflicts, and also adding various javadocs that I hope clarifies the usage of the "result" and "channels" concepts. The main reason for all the future-stuff is that the "result" is available only after all processing is fully complete, whereas the "output channels" may be available for reading prior to processing being complete (when data is streamed between two concurrently-running stages).

Thank you for the review, I'll merge after the tests pass.

*/
@Nonnull
public abstract BaseFrameProcessorFactory buildFrameProcessor(StageMaker stageMaker);
public abstract StageProcessor<?, ?> buildStageProcessor(StageMaker stageMaker);

Check notice

Code scanning / CodeQL

Useless parameter Note

The parameter 'stageMaker' is never used.
@gianm gianm merged commit 2b1f7df into apache:master Jun 24, 2025
74 checks passed
@gianm gianm deleted the msq-stage-processor-control branch June 24, 2025 04:32
gianm added a commit to gianm/druid that referenced this pull request Jul 8, 2025
The main change is that single-partition ComposingWritableFrameChannels (i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.

Fixes a bug introduced in apache#18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
apache#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.

This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.

Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.
cryptoe added a commit that referenced this pull request Jul 9, 2025
* MSQ: Fix composing channels losing partitionless frames.

The main change is that single-partition ComposingWritableFrameChannels (i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.

Fixes a bug introduced in #18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.

This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.

Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.

* dependency analyze.

---------

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
capistrant pushed a commit to capistrant/incubator-druid that referenced this pull request Jul 13, 2025
* MSQ: Fix composing channels losing partitionless frames.

The main change is that single-partition ComposingWritableFrameChannels (i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.

Fixes a bug introduced in apache#18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
apache#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.

This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.

Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.

* dependency analyze.

---------

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
capistrant pushed a commit to capistrant/incubator-druid that referenced this pull request Jul 16, 2025
* MSQ: Fix composing channels losing partitionless frames.

The main change is that single-partition ComposingWritableFrameChannels (i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.

Fixes a bug introduced in apache#18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
apache#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.

This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.

Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.

* dependency analyze.

---------

Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
capistrant added a commit that referenced this pull request Jul 16, 2025
…8267)

* MSQ: Fix composing channels losing partitionless frames.

The main change is that single-partition ComposingWritableFrameChannels (i.e.
those created by `ChannelOutputFactory#openChannel`) now associate all incoming
frames with that partition. Previously, frames might have come in with
partition set to `NO_PARTITION`, which would cause them to get "lost" by
the composing channel.

Fixes a bug introduced in #18144 when composed intermediate stage-internal
channels started being used for the output of hash partitioning. Prior to
#18144, they were only used for internal channels of the SuperSorter. This
bug could cause frames to go missing during sortMerge joins.

This patch also adds an embedded test for various durable storage scenarios,
including sortMerge join tests that would have caught the original bug.

Finally, this patch adjusts the way that Calcites escapes string literals,
to use the actual characters more often when possible. This helps format
the test SQLs generated by the embedded test more nicely.

* dependency analyze.

---------

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
@capistrant capistrant added this to the 34.0.0 milestone Jul 22, 2025
gianm added a commit to gianm/druid that referenced this pull request Jul 24, 2025
Fixes a regression from apache#18144. The refactoring in that patch lost some
logic that ensured stage output channels were stored in read-only form.
This is important, because the writable form includes a 1MB frame memory
allocation buffer. It can add up to a lot of memory if retained across
lots of channels.

This patch simplifies things by unconditionally converting all
stage outputs to read-only before they are retained, by replacing the
channel in "stageOutputChannels.add(channel)" with "channel.readOnly()".
It also simplifies various other bits of code that deal with intermediate
output channels, by converting them to read-only in the constructors of
ProcessorsAndChannels and ResultAndChannels, rather than at only some call
sites.
cryptoe pushed a commit that referenced this pull request Jul 25, 2025
Fixes a regression from #18144. The refactoring in that patch lost some
logic that ensured stage output channels were stored in read-only form.
This is important, because the writable form includes a 1MB frame memory
allocation buffer. It can add up to a lot of memory if retained across
lots of channels.

This patch simplifies things by unconditionally converting all
stage outputs to read-only before they are retained, by replacing the
channel in "stageOutputChannels.add(channel)" with "channel.readOnly()".
It also simplifies various other bits of code that deal with intermediate
output channels, by converting them to read-only in the constructors of
ProcessorsAndChannels and ResultAndChannels, rather than at only some call
sites.
cryptoe pushed a commit to cryptoe/druid that referenced this pull request Jul 25, 2025
…he#18322)

Fixes a regression from apache#18144. The refactoring in that patch lost some
logic that ensured stage output channels were stored in read-only form.
This is important, because the writable form includes a 1MB frame memory
allocation buffer. It can add up to a lot of memory if retained across
lots of channels.

This patch simplifies things by unconditionally converting all
stage outputs to read-only before they are retained, by replacing the
channel in "stageOutputChannels.add(channel)" with "channel.readOnly()".
It also simplifies various other bits of code that deal with intermediate
output channels, by converting them to read-only in the constructors of
ProcessorsAndChannels and ResultAndChannels, rather than at only some call
sites.

(cherry picked from commit 97f9412)
cryptoe added a commit that referenced this pull request Jul 25, 2025
…nels. (#18322) (#18325)

Fixes a regression from #18144. The refactoring in that patch lost some
logic that ensured stage output channels were stored in read-only form.
This is important, because the writable form includes a 1MB frame memory
allocation buffer. It can add up to a lot of memory if retained across
lots of channels.

This patch simplifies things by unconditionally converting all
stage outputs to read-only before they are retained, by replacing the
channel in "stageOutputChannels.add(channel)" with "channel.readOnly()".
It also simplifies various other bits of code that deal with intermediate
output channels, by converting them to read-only in the constructors of
ProcessorsAndChannels and ResultAndChannels, rather than at only some call
sites.

(cherry picked from commit 97f9412)

Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants