-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
Is your feature request related to a problem or challenge?
We have done the first step in #16196 for pipeline breaking cancellation support, this epic trace the remaining sub-task for the remaining improvement. cc @ozankabak @alamb @pepijnve
Adding a few tests (maybe SLT?) that show YieldStreamExec being inserted. Also add logs related to built-in YiedStream.
Improving the documentation to make it clear that any leaf (source) that already yields just has to implement
To pick up a draggable item, press the space bar. While dragging, use the arrow keys to move the item. Press space again to drop the item in its new position, or press escape to cancel.
fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
Some(self)
}
to signal the planner that no YieldStream combinator is necessary.
Improving InsertYieldExec rule by means of an API that exposes input and output pipelining behaviors of operators effectively
Investigating whether any already-existing manual yielding (for example, like the one in RepartitionExec) can now be removed
We will think about supporting cases involving non-volcano (e.g. spill) data flow.
Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
To pick up a draggable item, press the space bar. While dragging, use the arrow keys to move the item. Press space again to drop the item in its new position, or press escape to cancel.
Feel free to add more tasks, thanks!
Describe the solution you'd like
No response
Describe alternatives you've considered
No response
Additional context
No response
Activity
[-][Epic] Pipeline breaking cancellation support[/-][+][Epic] Pipeline breaking cancellation support and improvement[/+]49.0.0
(July 2025) #16235alamb commentedon Jun 10, 2025
Thank you @zhuqi-lucas -- I also added this as a wishlist item for #16235
zhuqi-lucas commentedon Jun 11, 2025
Thank you @alamb !
pepijnve commentedon Jun 11, 2025
At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.
Working on #16322 led me into the tokio implementation, in particular it led me to this line in the Chan implementation. This is the code that handles RecordBatch passing in RecordBatchReceiverStream.
I was immediately reminded of the cancellation discussions. Without realizing it DataFusion is actually already using Tokio's coop mechanism. This strengthens my belief that the PR that was merged is going about things the wrong way. It introduces API which overlaps 100% with something that already exists and is already being used. I don't think it's a good idea to have multiple mechanisms for the same thing. Pipeline-blocking operators exactly match the pattern described in the Tokio cooperative scheduling documentation so why would you not use the solution the runtime provides which you're already using in quite a few place already (everywhere RecordBatchReceiverStream is used)?
pepijnve commentedon Jun 11, 2025
A colleague of mine advised me to refer to PR #16301 to show what usage of
tokio::coop
might look like one more time for people who were not involved with the cancellation work that was already done. I was reluctant to do so myself.That PR shows the operator changes that would be required. The custom PollBudget thing would be removed and the tokio coop budget would be used instead.
zhuqi-lucas commentedon Jun 11, 2025
Thank you @pepijnve , do you mean we can replace YieldStream with Tokio's coop? Or change the rule for adding Yield also?
I am still not looking into the Tokio's coop, maybe we can also add a sub-task for it, and list the benefit for it:
Such as
We are open for all improvements, thanks!
pepijnve commentedon Jun 11, 2025
Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one.
DataFusion's
ReceiverStreamBuilder
makes use oftokio::sync::mpsc::Receiver
. WheneverReceiver::recv
is being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted.This is the same thing as what YieldStream is trying to do.
The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:
The downsides remain:
zhuqi-lucas commentedon Jun 11, 2025
You’re right. In DataFusion, only operators that fan out work into multiple spawned tasks and then re-aggregate via a Tokio MPSC channel actually consume the cooperative budget automatically (because each Receiver::recv().await call decrements it). Examples include:
All of those use RecordBatchReceiverStreamBuilder::run_input, whose .next().await is really rx.recv().await under the hood—and that is what charges the Tokio coop budget.
But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute pull-based within a single Stream implementation, and never call recv(), so they don’t automatically consume any cooperative budget.
That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.
I believe no major difference for it? Please correct me if i am wrong.
pepijnve commentedon Jun 11, 2025
You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do.
The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept.
The current implementation with the non-task wise budge also breaks when an intermediate operator uses
select!
(or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up.The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code.
The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators?
It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return any value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times.
zhuqi-lucas commentedon Jun 11, 2025
Thank you, i may got your point, i was thinking optimize the rule, is it a similar point?
Thanks!
9 remaining items
zhuqi-lucas commentedon Jun 12, 2025
Thank you @pepijnve , I am ok with it, a design will make it clear.
Both docs in asciidoc+plantuml or just Markdown are fine.
pepijnve commentedon Jun 12, 2025
Made some progress on the problem statement already. I gave the AI the facts, it turned it into something I would actually enjoy reading. I'm going to work on the way things work today next. Feedback already welcome.
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md
zhuqi-lucas commentedon Jun 13, 2025
This is a good start, thanks. May be we also can add the statement for pipeline mode and pipeline breaking.
pepijnve commentedon Jun 13, 2025
I'm starting to realize we might have been placing too much emphasis on this aspect. I've been doing my homework by reading the Volcano paper. I had never read that paper in depth (never has a need to), I just knew that people used the term to kind of refer to 'the iterator approach'. The more I read the more I can see DataFusion is basically a modern day Volcano.
One thing DataFusion does not have explicitly, as far as I know, is an exchange operator. I say not explicitly, because the essential demand/data-driven switch part of exchange is present in a couple of operators like Repartition and Coalesce. Perhaps these dataflow change points are a better way of looking at the problem.
I really miss having a white board, but here's an approximation :smiling: This is from the Volcano paper (with annotations by me of course).
Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run.
As we've concluded earlier the output side of the exchange-like operators is already handling this for us implicitly because they consume tokio task budget. The table sources (Scan in the image) do not.
Perhaps this reframing of the problem is the path to a general purpose solution. To verify correct scheduling behavior, you can first subdivide the plan into subplans using the exchange-like operators as cut points. Per sub plan you can then look at all the leave nodes. Each leave node that 'inserts' work into the task needs to consume from the same task-wide tokio budget, not a per operator budget as we're doing today.
So what does all this mean in terms of implementation:
poll_proceed
with a combination of 'has budget' and 'consume budget' in the meantime.ExecutionPlan
that communicates if an operator consumes the task budget (not sure what the best description of this would be) instead ofwith_cooperative_yielding
. It's not really something you want to opt-in to after all and the exchange-like operators have no way of opting out.The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves.
One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not.
@alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling:
The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system.
alamb commentedon Jun 13, 2025
This is consistent with our observations at InfluxData: we saw uncancellable queries when feeding our plan from an in memory cache (not a file / memory)
This is a really nice writeup: it matches my understanding / mental model. It would also make the start of a great blog post for the DataFusion blog FWIW and I filed a ticket to track that idea 🎣 :
I think this is an accurate assessment, though I would probably phrase it as "DataFusion uses Volcano-style parallelism where operators are single threaded and Exchange (
RepartitionExec
) operators handle parallelism". The other prevalent style is called "Morsel Driven Parallelism" popularized by DuckDB and TUM/Umbra in this paper which uses operators that are explicitly multi-threaded.This is true in theory -- but I think we also take pains to try and avoid "over scheduling" tasks in tokio -- for example, we purposely only have
N
input partitions (and hence N streams) per scan, even if there are 100+ files -- the goal is to keep all the cores busy, but not oversubscribed.This also sounds fine to me, and would be happy to review PRs, etc. However it is not 100% clear if your proposed design
For example, I wonder if there are additional tests / cases that would be improved with the proposed implementation 🤔
In my opinion this is fine -- if operators are making dynamic streams, that is an advanced usecase that today must still handle canceling / yielding. I think it is ok if we can't find a way to automatically provide yielding behavior to them (they are no worse off then today)
I personally don't think this is an issue as I don't see any movement and have not heard any desire to move away from tokio.
pepijnve commentedon Jun 13, 2025
There are a couple of benefits.
It removes the edge case seen in the interleave operator (or any
select!
style code in general). With the current per stream counter, one stream might want to yield, but the parent stream may decide to poll another stream in response which happens to be ready. The end result is that two cooperating streams may turn into a non-cooperating when they are merged. To fix this, you would need to adjust the merging operator as well and we're basically back where we started.If all cooperating streams use the same budget, then this problem goes away. Once the yield point has been hit, all cooperating streams will yield.
Using the task budget also avoids the 'redundant yield' problem in the current version. If you now do a simple
SELECT * FROM ...
query, by default you'll get aPending
after every 64Ready(RecordBatch)
. With the task budget you will only actually inject thePending
when it's actually necessary. The system automatically does the right thing.Finally it aligns the cooperative yielding strategy across the library.
RecordBatchReceiverStream
is implicitly already using this strategy in a way you cannot opt out of. It's better to have one consist way of solving this cancellation problem once and for all.I have a patch almost ready. I'll make a draft PR already so this all becomes a bit more tangible.
zhuqi-lucas commentedon Jun 13, 2025
So it means this sub-task corner case can be resolved?
Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a
I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?
Another question:
If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?
pepijnve commentedon Jun 13, 2025
Yes, that's correct.
@zhuqi-lucas that's correct, you can't configure it at the moment. That's the case for
RecordBatchReceiverStream
today as well indeed. Tokio hardcodes the magic number128
(see https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).The budget is per spawned task. Every time the tokio scheduler lets a task run it gives it a budget of 128 which the task can then deplete until it hits zero. Then the task is coaxed towards yielding by making all budget aware Tokio resources return
Pending
.From the perspective of DataFusion code I don't think this really changes all that much. It's the exact same behavior you have today already when the source streams are
RecordBatchReceiverStream
. So the moment you have a repartition/coalesce you're getting exactly this with the current code.zhuqi-lucas commentedon Jun 13, 2025
Got it, it makes sense to me @pepijnve! Thanks!
pepijnve commentedon Jun 13, 2025
What I was trying to say is that from a scheduling/yielding pov you can reason about each box in isolation. Whether you actually try to make 100s of concurrent (not parallel) tasks or not is a rabbit hole for another thread 😄
ozankabak commentedon Jun 13, 2025
A draft PR would be good to have, I think I can make better comments then. However, solving this at the stream level in a way transparent to the operator builder would be great in general. That was my original intention, but we weren't able to materialize that solution in a reasonable amount of time. Hence the current solution, which is basically a fallback that has some characteristics of the ideal solution (e.g. "transparency", focusing on leaves etc.), but requires support from the planner via to-be-designed APIs. The current approach can evolve into a decent one with such APIs, but it would always be worse than a proper lower-level solution. It would be good if we can build that, but in the meantime, I am glad that we have some solution that works for many cases.
If it turns out that we can arrive at a proper stream-based solution quickly, we can retire this one quickly. Otherwise, we can incrementally improve what we have today as alternatives go through design/experimentation etc.
pepijnve commentedon Jun 13, 2025
@ozankabak draft PR is in good enough shape to review I think. The general idea is still the same as what was there before.
The last thing I'm still working on is the ExecutionPlan API where I would like to replace
with_cooperative_yielding
with a plan property. Trying to get things to be more declarative.