-
Notifications
You must be signed in to change notification settings - Fork 3.7k
MSQ: Enable stages to control their own shuffle logic. #18144
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This patch pushes shuffle logic from RunWorkOrder into the stage definition. The motivation is to enable optimizations that take advantage of data already being sorted or partitioned in the desired way. Main changes: - FrameProcessorFactory is renamed StageProcessor, and its "makeProcessors" method is replaced with a more powerful "execute" method. The new "execute" is expected to fully contain partitioning and shuffling logic. - RunWorkOrder, the worker class that manages execution, no longer has any shuffle-related processing code. It delegates this to the StageProcessor. - StandardStageOperations, StandardStageRunner, & StandardStageProcessor are introduced for StageProcessor that want to perform shuffling in the "standard" way that was formerly mandatory. Other changes: - Moved FrameContext and related classes from the "kernel" package to the "exec" package. - Remove "readableChannelUsableWhenWriting" from OutputChannel, to reduce complexity. It was only used by the standard hashPartition implementation, and wasn't strictly needed there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've not finished going thru all parts...so far it was interesting :)
just left some comments here and there :)
Due to the fact that classes were renamed/moved its harder to focus on the core aspects of the changes - since those changes have touched a lot of files I'm a bit worried if the PR enters a conflicted change after some other changes will be merged.
I think it doesn't change fundamentally a lot of things right now - is that right?
but is it on the roadmap to take away the shuffleSpec
from the StageDefintion
?
/** | ||
* All the things needed for {@link StageProcessor#execute(ExecutionContext)} to run the work for a stage. | ||
*/ | ||
public interface ExecutionContext |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this interface might be unnecessary: it declares 12 methods and has only a single implementation - its already seem to have suffered from some bending from that single implementation.
might be better to just drop the interface and introduce generalization when there will be at least 2 implementors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose these could be collapsed. I don't currently have plans to create other implementations. I had a couple reasons for doing an interface:
- I thought it would be easier for developers of
StageProcessor
to understand what they get forexecute
if they could read an interface, rather than a concrete class. The code for a concrete class has a bunch of incidental implementation stuff in it that isn't necessarily helpful to its user. - I thought it might be useful for testing to have the interface and concrete class be separate things. Although, I didn't end up taking advantage of this. I suppose tests could use mocks too.
I left this as is, because IMO the interface approach is cleaner, but it's kind of borderline so I could go either way.
/** | ||
* Callback that must be called when input is done being read. This is essential for two reasons: | ||
* (1) If the prior stage ran with {@link OutputChannelMode#MEMORY}, this informs the controller that it can shut | ||
* down the prior stage. | ||
* (2) With {@link ShuffleKind#GLOBAL_SORT}, this provides statistics that are used to determine global boundaries. | ||
* | ||
* Typically called by {@link StandardShuffleOperations#gatherResultKeyStatisticsIfNeeded(ListenableFuture)}. | ||
*/ | ||
void onDoneReadingInput(@Nullable ClusterByStatisticsSnapshot snapshot); | ||
|
||
/** | ||
* Callback to report a nonfatal warning. | ||
*/ | ||
void onWarning(Throwable t); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: in the scope of an ExecutionContext
these callback like methods look a little bit like outliers
final ProcessorsAndChannels<T, R> processors = makeProcessors( | ||
context.workOrder().getStageDefinition(), | ||
context.workOrder().getWorkerNumber(), | ||
context.workOrder().getInputs(), | ||
context.inputSliceReader(), | ||
(ExtraInfoType) context.workOrder().getExtraInfo(), | ||
stageRunner.workOutputChannelFactory(), | ||
context.frameContext(), | ||
context.threadCount(), | ||
context.counters(), | ||
context::onWarning, | ||
MultiStageQueryContext.removeNullBytes(context.workOrder().getWorkerContext()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: around 10 arguments are constructed one way or another from the context
argument....
I think it would be more straight forward to just pass the ExecutionContext
and let the other end decide what it needs...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did it this way to avoid changing too much code in this PR. Changing the signature of makeProcessors
would require changes to all the subclasses, which are many. In a future patch I think it would make sense to remove this entire StandardStageProcessor
class and rewrite the subclasses to directly implement execute
. I left a note about this in the class-level javadoc:
* This abstract class may be removed someday, in favor of its subclasses using {@link StandardStageRunner} directly.
* It was introduced mainly to minimize code changes in a refactor.
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
Outdated
Show resolved
Hide resolved
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
Outdated
Show resolved
Hide resolved
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
Show resolved
Hide resolved
...-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardStageRunner.java
Outdated
Show resolved
Hide resolved
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
initInputSliceReader(); | ||
initGlobalSortPartitionBoundariesIfNeeded(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like something is leaking here...not sure how to fix it - but this and the stuff it moves doesn't seem to belong here...I'll try to dig into this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean the initGlobalSortPartitionBoundariesIfNeeded()
?
The situation with global sort boundaries has some back-and-forth between the worker and controller, which leads to some complexity. The flow of partitioned global sort is like this:
- Each worker gathers statistics from its input as it is read. While doing so it buffers the input.
- Each worker sends those statistics to the controller when done reading input.
- The controller gathers statistics from each worker, uses them to determine good global partition boundaries, then sends those boundaries to all workers. (Crucially each worker gets the same set of boundaries)
- The workers partition data according to the boundaries and sorts within each partition. When each partition is later merged across workers, this yields a total sort of the data.
This means that some stuff related to global partition boundaries is going to leak into the ExecutionContext
:
- there has to be a way for the worker to send its statistics (
onDoneReadingInput
) - there has to be a way for the worker to receive the global partition boundaries (
globalClusterByPartitions
)
Coming back to the initGlobalSortPartitionBoundariesIfNeeded
you highlighted: this is for the case where the controller isn't involved, possibly because there's only one partition. In this case the worker does not send statistics nor does it receive global boundaries from the controller. It can know its boundaries on its own. (It will just be "all data goes to partition 0").
@kgyrtkirk thank you for taking a look.
There are a lot of renames, but most of them are just renaming things from
Yes, the idea in this patch was to not change anything fundamentally, especially with the existing stage implementations. The main change is that in the future, stages can control their own shuffling logic, rather than being "forced" into having the standard approach applied.
I don't plan to remove
For this reason, I have this text in the javadoc for
The idea is that the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like the result
and the outpuchannel
should be more closer to eachother...
there are multiple classes which has an And
in their name. I feel like some concept is missing...as those are essentially try to repack those into a single object - I suspect that the fact that Future
-s are used doesn't make it easier either.
I don't want to stall this change as it also cleaned some things up a bit.
I pushed a couple commits resolving merge conflicts, and also adding various javadocs that I hope clarifies the usage of the "result" and "channels" concepts. The main reason for all the future-stuff is that the "result" is available only after all processing is fully complete, whereas the "output channels" may be available for reading prior to processing being complete (when data is streamed between two concurrently-running stages). Thank you for the review, I'll merge after the tests pass. |
The main change is that single-partition ComposingWritableFrameChannels (i.e. those created by `ChannelOutputFactory#openChannel`) now associate all incoming frames with that partition. Previously, frames might have come in with partition set to `NO_PARTITION`, which would cause them to get "lost" by the composing channel. Fixes a bug introduced in apache#18144 when composed intermediate stage-internal channels started being used for the output of hash partitioning. Prior to apache#18144, they were only used for internal channels of the SuperSorter. This bug could cause frames to go missing during sortMerge joins. This patch also adds an embedded test for various durable storage scenarios, including sortMerge join tests that would have caught the original bug. Finally, this patch adjusts the way that Calcites escapes string literals, to use the actual characters more often when possible. This helps format the test SQLs generated by the embedded test more nicely.
* MSQ: Fix composing channels losing partitionless frames. The main change is that single-partition ComposingWritableFrameChannels (i.e. those created by `ChannelOutputFactory#openChannel`) now associate all incoming frames with that partition. Previously, frames might have come in with partition set to `NO_PARTITION`, which would cause them to get "lost" by the composing channel. Fixes a bug introduced in #18144 when composed intermediate stage-internal channels started being used for the output of hash partitioning. Prior to #18144, they were only used for internal channels of the SuperSorter. This bug could cause frames to go missing during sortMerge joins. This patch also adds an embedded test for various durable storage scenarios, including sortMerge join tests that would have caught the original bug. Finally, this patch adjusts the way that Calcites escapes string literals, to use the actual characters more often when possible. This helps format the test SQLs generated by the embedded test more nicely. * dependency analyze. --------- Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
* MSQ: Fix composing channels losing partitionless frames. The main change is that single-partition ComposingWritableFrameChannels (i.e. those created by `ChannelOutputFactory#openChannel`) now associate all incoming frames with that partition. Previously, frames might have come in with partition set to `NO_PARTITION`, which would cause them to get "lost" by the composing channel. Fixes a bug introduced in apache#18144 when composed intermediate stage-internal channels started being used for the output of hash partitioning. Prior to apache#18144, they were only used for internal channels of the SuperSorter. This bug could cause frames to go missing during sortMerge joins. This patch also adds an embedded test for various durable storage scenarios, including sortMerge join tests that would have caught the original bug. Finally, this patch adjusts the way that Calcites escapes string literals, to use the actual characters more often when possible. This helps format the test SQLs generated by the embedded test more nicely. * dependency analyze. --------- Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
* MSQ: Fix composing channels losing partitionless frames. The main change is that single-partition ComposingWritableFrameChannels (i.e. those created by `ChannelOutputFactory#openChannel`) now associate all incoming frames with that partition. Previously, frames might have come in with partition set to `NO_PARTITION`, which would cause them to get "lost" by the composing channel. Fixes a bug introduced in apache#18144 when composed intermediate stage-internal channels started being used for the output of hash partitioning. Prior to apache#18144, they were only used for internal channels of the SuperSorter. This bug could cause frames to go missing during sortMerge joins. This patch also adds an embedded test for various durable storage scenarios, including sortMerge join tests that would have caught the original bug. Finally, this patch adjusts the way that Calcites escapes string literals, to use the actual characters more often when possible. This helps format the test SQLs generated by the embedded test more nicely. * dependency analyze. --------- Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
…8267) * MSQ: Fix composing channels losing partitionless frames. The main change is that single-partition ComposingWritableFrameChannels (i.e. those created by `ChannelOutputFactory#openChannel`) now associate all incoming frames with that partition. Previously, frames might have come in with partition set to `NO_PARTITION`, which would cause them to get "lost" by the composing channel. Fixes a bug introduced in #18144 when composed intermediate stage-internal channels started being used for the output of hash partitioning. Prior to #18144, they were only used for internal channels of the SuperSorter. This bug could cause frames to go missing during sortMerge joins. This patch also adds an embedded test for various durable storage scenarios, including sortMerge join tests that would have caught the original bug. Finally, this patch adjusts the way that Calcites escapes string literals, to use the actual characters more often when possible. This helps format the test SQLs generated by the embedded test more nicely. * dependency analyze. --------- Co-authored-by: Gian Merlino <gianmerlino@gmail.com> Co-authored-by: Karan Kumar <karankumar1100@gmail.com>
Fixes a regression from apache#18144. The refactoring in that patch lost some logic that ensured stage output channels were stored in read-only form. This is important, because the writable form includes a 1MB frame memory allocation buffer. It can add up to a lot of memory if retained across lots of channels. This patch simplifies things by unconditionally converting all stage outputs to read-only before they are retained, by replacing the channel in "stageOutputChannels.add(channel)" with "channel.readOnly()". It also simplifies various other bits of code that deal with intermediate output channels, by converting them to read-only in the constructors of ProcessorsAndChannels and ResultAndChannels, rather than at only some call sites.
Fixes a regression from #18144. The refactoring in that patch lost some logic that ensured stage output channels were stored in read-only form. This is important, because the writable form includes a 1MB frame memory allocation buffer. It can add up to a lot of memory if retained across lots of channels. This patch simplifies things by unconditionally converting all stage outputs to read-only before they are retained, by replacing the channel in "stageOutputChannels.add(channel)" with "channel.readOnly()". It also simplifies various other bits of code that deal with intermediate output channels, by converting them to read-only in the constructors of ProcessorsAndChannels and ResultAndChannels, rather than at only some call sites.
…he#18322) Fixes a regression from apache#18144. The refactoring in that patch lost some logic that ensured stage output channels were stored in read-only form. This is important, because the writable form includes a 1MB frame memory allocation buffer. It can add up to a lot of memory if retained across lots of channels. This patch simplifies things by unconditionally converting all stage outputs to read-only before they are retained, by replacing the channel in "stageOutputChannels.add(channel)" with "channel.readOnly()". It also simplifies various other bits of code that deal with intermediate output channels, by converting them to read-only in the constructors of ProcessorsAndChannels and ResultAndChannels, rather than at only some call sites. (cherry picked from commit 97f9412)
…nels. (#18322) (#18325) Fixes a regression from #18144. The refactoring in that patch lost some logic that ensured stage output channels were stored in read-only form. This is important, because the writable form includes a 1MB frame memory allocation buffer. It can add up to a lot of memory if retained across lots of channels. This patch simplifies things by unconditionally converting all stage outputs to read-only before they are retained, by replacing the channel in "stageOutputChannels.add(channel)" with "channel.readOnly()". It also simplifies various other bits of code that deal with intermediate output channels, by converting them to read-only in the constructors of ProcessorsAndChannels and ResultAndChannels, rather than at only some call sites. (cherry picked from commit 97f9412) Co-authored-by: Gian Merlino <gianmerlino@gmail.com>
This patch pushes shuffle logic from RunWorkOrder into the stage definition. The motivation is to enable optimizations that take advantage of data already being sorted or partitioned in the desired way.
Main changes:
FrameProcessorFactory is renamed StageProcessor, and its "makeProcessors" method is replaced with a more powerful "execute" method. The new "execute" is expected to fully contain partitioning and shuffling logic.
RunWorkOrder, the worker class that manages execution, no longer has any shuffle-related processing code. It delegates this to the StageProcessor.
StandardStageOperations, StandardStageRunner, & StandardStageProcessor are introduced for StageProcessor that want to perform shuffling in the "standard" way that was formerly mandatory.
Other changes:
Moved FrameContext and related classes from the "kernel" package to the "exec" package.
Remove "readableChannelUsableWhenWriting" from OutputChannel, to reduce complexity. It was only used by the standard hashPartition implementation, and wasn't strictly needed there.