Skip to content

[Epic] Pipeline breaking cancellation support and improvement #16353

@zhuqi-lucas

Description

@zhuqi-lucas
Contributor

Is your feature request related to a problem or challenge?

We have done the first step in #16196 for pipeline breaking cancellation support, this epic trace the remaining sub-task for the remaining improvement. cc @ozankabak @alamb @pepijnve

  • Adding a few tests (maybe SLT?) that show YieldStreamExec being inserted. Also add logs related to built-in YiedStream.

    Improving the documentation to make it clear that any leaf (source) that already yields just has to implement

fn with_cooperative_yields(self: Arc<Self>) -> Option<Arc<dyn ExecutionPlan>> {
    Some(self)
}

to signal the planner that no YieldStream combinator is necessary.

  • Improving InsertYieldExec rule by means of an API that exposes input and output pipelining behaviors of operators effectively

    Investigating whether any already-existing manual yielding (for example, like the one in RepartitionExec) can now be removed

    We will think about supporting cases involving non-volcano (e.g. spill) data flow.

    Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a

Feel free to add more tasks, thanks!

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

Activity

changed the title [-][Epic] Pipeline breaking cancellation support[/-] [+][Epic] Pipeline breaking cancellation support and improvement[/+] on Jun 10, 2025
alamb

alamb commented on Jun 10, 2025

@alamb
Contributor

Thank you @zhuqi-lucas -- I also added this as a wishlist item for #16235

zhuqi-lucas

zhuqi-lucas commented on Jun 11, 2025

@zhuqi-lucas
ContributorAuthor

Thank you @alamb !

pepijnve

pepijnve commented on Jun 11, 2025

@pepijnve
Contributor

At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.

Working on #16322 led me into the tokio implementation, in particular it led me to this line in the Chan implementation. This is the code that handles RecordBatch passing in RecordBatchReceiverStream.

I was immediately reminded of the cancellation discussions. Without realizing it DataFusion is actually already using Tokio's coop mechanism. This strengthens my belief that the PR that was merged is going about things the wrong way. It introduces API which overlaps 100% with something that already exists and is already being used. I don't think it's a good idea to have multiple mechanisms for the same thing. Pipeline-blocking operators exactly match the pattern described in the Tokio cooperative scheduling documentation so why would you not use the solution the runtime provides which you're already using in quite a few place already (everywhere RecordBatchReceiverStream is used)?

pepijnve

pepijnve commented on Jun 11, 2025

@pepijnve
Contributor

A colleague of mine advised me to refer to PR #16301 to show what usage of tokio::coop might look like one more time for people who were not involved with the cancellation work that was already done. I was reluctant to do so myself.
That PR shows the operator changes that would be required. The custom PollBudget thing would be removed and the tokio coop budget would be used instead.

zhuqi-lucas

zhuqi-lucas commented on Jun 11, 2025

@zhuqi-lucas
ContributorAuthor

At the risk of making myself unpopular, I feel it's relevant to share my findings with you guys.

Working on #16322 led me into the tokio implementation, in particular it led me to this line in the Chan implementation. This is the code that handles RecordBatch passing in RecordBatchReceiverStream.

I was immediately reminded of the cancellation discussions. Without realizing it DataFusion is actually already using Tokio's coop mechanism. This strengthens my belief that the PR that was merged is going about things the wrong way. It introduces API which overlaps 100% with something that already exists and is already being used. I don't think it's a good idea to have multiple mechanisms for the same thing. Pipeline-blocking operators exactly match the pattern described in the Tokio cooperative scheduling documentation so why would you not use the solution the runtime provides which you're already using in quite a few place already (everywhere RecordBatchReceiverStream is used)?

Thank you @pepijnve , do you mean we can replace YieldStream with Tokio's coop? Or change the rule for adding Yield also?

I am still not looking into the Tokio's coop, maybe we can also add a sub-task for it, and list the benefit for it:

Such as

  1. The performance will be better after using Tokio's coop with benchmark result?
  2. Or we can handle more corner cases, and automatically handling user-defined exec?
  3. Or we will have more clear and easy API?
  4. ETC

We are open for all improvements, thanks!

pepijnve

pepijnve commented on Jun 11, 2025

@pepijnve
Contributor

Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one.

DataFusion's ReceiverStreamBuilder makes use of tokio::sync::mpsc::Receiver. Whenever Receiver::recv is being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted.
This is the same thing as what YieldStream is trying to do.

The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:

  • There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones.
  • There is no need for additional API. DataFusion is already using this in the current released version.
  • There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter.

The downsides remain:

  • Code that loops may still need to have yield points added to it in order to not yield unnecessarily.
  • It's not yet 100% clear to me how you can use this in manually written Futures and Streams. The required bits for that seem to only be crate visible in the current Tokio release. I've raised the question here Capability to make an existing Stream participate in cooperative scheduling tokio-rs/tokio#7403
  • I have not made a performance analysis of this yet, but since it's used quite extensively already it's likely to be ok. Needs to be evaluated.
zhuqi-lucas

zhuqi-lucas commented on Jun 11, 2025

@zhuqi-lucas
ContributorAuthor

Tokio's cooperative budget is essentially a counter per task that can be decremented at any point in the task. When the counter hits zero you'll return Pending from the function trying to consume the budget. That's basically what YieldStream is doing but with a local counter rather than a task wide one.

DataFusion's ReceiverStreamBuilder makes use of tokio::sync::mpsc::Receiver. Whenever Receiver::recv is being called, that counter is being decremented, and you'll get a Pending result when the budget is depleted. This is the same thing as what YieldStream is trying to do.

The benefits I see of trying to leverage the same mechanism elsewhere in DataFusion are:

  • There is only one cooperative yielding mechanism at play. This is easier to reason about than multiple interacting ones.
  • There is no need for additional API. DataFusion is already using this in the current released version.
  • There are fewer corner cases. Once the budget is depleted, any point in the code checking the budget will yield since all those points are checking the same shared counter.

The downsides remain:

  • Code that loops may still need to have yield points added to it in order to not yield unnecessarily.
  • It's not yet 100% clear to me how you can use this in manually written Futures and Streams. The required bits for that seem to only be crate visible in the current Tokio release. I've raised the question here Example of using cooperative scheduling budget in manual Future/Stream implementations tokio-rs/tokio#7403
  • I have not made a performance analysis of this yet, but since it's used quite extensively already it's likely to be ok. Needs to be evaluated.

You’re right. In DataFusion, only operators that fan out work into multiple spawned tasks and then re-aggregate via a Tokio MPSC channel actually consume the cooperative budget automatically (because each Receiver::recv().await call decrements it). Examples include:

CoalescePartitionsExec 

SortPreservingMergeExec

All of those use RecordBatchReceiverStreamBuilder::run_input, whose .next().await is really rx.recv().await under the hood—and that is what charges the Tokio coop budget.

But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel. They execute pull-based within a single Stream implementation, and never call recv(), so they don’t automatically consume any cooperative budget.

That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.

I believe no major difference for it? Please correct me if i am wrong.

pepijnve

pepijnve commented on Jun 11, 2025

@pepijnve
Contributor

But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.

You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do.
The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept.
The current implementation with the non-task wise budge also breaks when an intermediate operator uses select! (or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up.

I believe no major difference for it? Please correct me if i am wrong.

The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code.
The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators?

It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return any value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times.

  • Project (or any other simple transformation operator) doesn't need to do anything since it takes one record batch in and immediately emits another one.
  • Table scans shouldn't either. They'll yield naturally if their input is not ready, and otherwise they'll return a RecordBatch.
  • Filter in theory should not do anything, the exception being dropping lots of batches entirely.
  • Joins depends. A build/probe style implementation probably should consume during build, not during probe. But it depends on the implementation.
  • Aggregation and sorting do need to consume since those can block for an extended period time.
zhuqi-lucas

zhuqi-lucas commented on Jun 11, 2025

@zhuqi-lucas
ContributorAuthor

But most other operators (Projection, Filter, HashAggregate, HashJoin, WindowAgg, simple TableScans, etc.) do not use an MPSC channel.
That means, we still need to insert explicit yield points YieldStream/PollBudget to avoid starving the thread.

You're indeed 100% dependent on your child streams which is what makes the current solution somewhat brittle. If that happens to use a Receiver (or some other implementation that consumes budget) it will work. If it's some other stream that does not you may have issues again. Because the sources are user definable, I think it's wise to take a defensive stance in the implementation of operators and assume you don't know what they will or will not do. The current implementation attempts to fix this by ensuring the sources have yield points. That breaks when streams are swapped dynamically because you no longer have a way to ensure they contain the necessary yield points. This is a point the DataFusion library cannot currently intercept. The current implementation with the non-task wise budge also breaks when an intermediate operator uses select! (or something similar where you read from whatever stream happens to be ready) since this can obscure the Pending result from a stream. There's no way to guarantee that Pending bubbles all the way up.

I believe no major difference for it? Please correct me if i am wrong.

The point of contention was where you put these yield points. Do you instrument all leave nodes, or do you instrument consumers that may refuse to yield. To make the system robust I really think you need to do this in the consumers. It's also beneficial for locality of reasoning. You can look at the implementation of an operator and assess that it's correct from a cooperative scheduling point of view without having to look at any other code. The objection was that there are many, many operators out there in the wild downstream of DataFusion. That's one that I do not have an answer for. How many people are building custom pipeline blocking operators?

It's important to note that you would only need to take action in operators where you can see from the implementation that it may not return any value, either Ready or Pending, relatively quickly. That's basically anything that loops over input streams an unbounded number of times.

  • Project (or any other simple transformation operator) doesn't need to do anything since it takes one record batch in and immediately emits another one.
  • Table scans shouldn't either. They'll yield naturally if their input is not ready, and otherwise they'll return a RecordBatch.
  • Filter in theory should not do anything, the exception being dropping lots of batches entirely.
  • Joins depends. A build/probe style implementation probably should consume during build, not during probe. But it depends on the implementation.
  • Aggregation and sorting do need to consume since those can block for an extended period time.

Thank you, i may got your point, i was thinking optimize the rule, is it a similar point?

 // traverse all nodes, not just leaves
        plan.transform_down(|plan| {
            // wrap if leaf OR long-running
            if plan.children().is_empty() || is_long_running(plan.as_ref()) {
                // use existing cooperative variant if available
                let wrapped = plan
                    .clone()
                    .with_cooperative_yields()
                    .unwrap_or_else(|| Arc::new(YieldStreamExec::new(Arc::clone(&plan), yield_period)));
                Ok(Transformed::new(wrapped, true, TreeNodeRecursion::Jump))
            } else {
                Ok(Transformed::no(plan))
            }
        })
        .map(|t| t.data)
  1. Leaf-only wrapping can be bypassed if someone plugs in a custom Stream or uses select!‑style combinators.
  2. By also wrapping every consumer that does heavy looping—aggregations, sorts, joins, window funcs—you guarantee that no matter how the streams are composed, there’s always an explicit YieldStreamExec (or the built‑in cooperative variant) in the path. (This can be optimized to PollBudget if possible)
  3. We still avoid unnecessary overhead on “simple” operators like Projection or basic TableScan, because they’re neither leaves with no loops nor in your “long‑running” list.

Thanks!

9 remaining items

zhuqi-lucas

zhuqi-lucas commented on Jun 12, 2025

@zhuqi-lucas
ContributorAuthor

@zhuqi-lucas with all the various sprawling discussion threads I think we may have gotten to a point where it's no longer easy to have an overview of what the problems are and what the final goal may look like. To make matters worse there are multiple options. I was thinking it might be useful to try and put together some kind of design document / overview that clearly describes the cancellation/abort problem, its root cause(s), the existing mitigations and possible future mitigations. Having a bit of a mea culpa moment, so I would like to take a shot at making a first draft. Is that ok for you? Any preference wrt tooling? I usually write my technical docs in asciidoc+plantuml for ease of version control and diffing, but willing to use whatever people prefer.

Thank you @pepijnve , I am ok with it, a design will make it clear.

Both docs in asciidoc+plantuml or just Markdown are fine.

pepijnve

pepijnve commented on Jun 12, 2025

@pepijnve
Contributor

Made some progress on the problem statement already. I gave the AI the facts, it turned it into something I would actually enjoy reading. I'm going to work on the way things work today next. Feedback already welcome.
https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md

zhuqi-lucas

zhuqi-lucas commented on Jun 13, 2025

@zhuqi-lucas
ContributorAuthor

Made some progress on the problem statement already. I gave the AI the facts, it turned it into something I would actually enjoy reading. I'm going to work on the way things work today next. Feedback already welcome. https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md

This is a good start, thanks. May be we also can add the statement for pipeline mode and pipeline breaking.

pepijnve

pepijnve commented on Jun 13, 2025

@pepijnve
Contributor

pipeline mode and pipeline breaking

I'm starting to realize we might have been placing too much emphasis on this aspect. I've been doing my homework by reading the Volcano paper. I had never read that paper in depth (never has a need to), I just knew that people used the term to kind of refer to 'the iterator approach'. The more I read the more I can see DataFusion is basically a modern day Volcano.

One thing DataFusion does not have explicitly, as far as I know, is an exchange operator. I say not explicitly, because the essential demand/data-driven switch part of exchange is present in a couple of operators like Repartition and Coalesce. Perhaps these dataflow change points are a better way of looking at the problem.

I really miss having a white board, but here's an approximation :smiling: This is from the Volcano paper (with annotations by me of course).

Image

Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run.

As we've concluded earlier the output side of the exchange-like operators is already handling this for us implicitly because they consume tokio task budget. The table sources (Scan in the image) do not.

Perhaps this reframing of the problem is the path to a general purpose solution. To verify correct scheduling behavior, you can first subdivide the plan into subplans using the exchange-like operators as cut points. Per sub plan you can then look at all the leave nodes. Each leave node that 'inserts' work into the task needs to consume from the same task-wide tokio budget, not a per operator budget as we're doing today.

So what does all this mean in terms of implementation:

  • Replace the per operator counters with consuming the Tokio task budget. DataFusion is already doing this today so there's precedent for it, and it resolves a bunch of side effects. I've opened a PR in tokio to allow us to use the necessary API for this Make cooperative and poll_proceed public tokio-rs/tokio#7405. I think we can approximate poll_proceed with a combination of 'has budget' and 'consume budget' in the meantime.
  • Remove the configuration option
  • Consider renaming YieldStream to CooperativeStream.
  • I think I would prefer a declarative property on ExecutionPlan that communicates if an operator consumes the task budget (not sure what the best description of this would be) instead of with_cooperative_yielding. It's not really something you want to opt-in to after all and the exchange-like operators have no way of opting out.

The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves.

One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not.

@alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling:

The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system.

alamb

alamb commented on Jun 13, 2025

@alamb
Contributor

As we’ve discussed above the channel receiver is already doing that for us. For some reason file IO was not. I’m not sure I understand why that’s the case and will try to figure out why tomorrow.

This is consistent with our observations at InfluxData: we saw uncancellable queries when feeding our plan from an in memory cache (not a file / memory)

https://github.com/pepijnve/datafusion/blob/cancel_spec/dev/design/cancellation.md

This is a really nice writeup: it matches my understanding / mental model. It would also make the start of a great blog post for the DataFusion blog FWIW and I filed a ticket to track that idea 🎣 :

The more I read the more I can see DataFusion is basically a modern day Volcano.

I think this is an accurate assessment, though I would probably phrase it as "DataFusion uses Volcano-style parallelism where operators are single threaded and Exchange (RepartitionExec) operators handle parallelism". The other prevalent style is called "Morsel Driven Parallelism" popularized by DuckDB and TUM/Umbra in this paper which uses operators that are explicitly multi-threaded.

Each of the colored blocks is an independently executing sub portion of the query. Translated to Tokio each of these colored blocks is a separate concurrent task. Each of those tasks needs to be cooperatively scheduled to guarantee all of them get a fair share of time to run.

This is true in theory -- but I think we also take pains to try and avoid "over scheduling" tasks in tokio -- for example, we purposely only have N input partitions (and hence N streams) per scan, even if there are 100+ files -- the goal is to keep all the cores busy, but not oversubscribed.

So what does all this mean in terms of implementation:

This also sounds fine to me, and would be happy to review PRs, etc. However it is not 100% clear if your proposed design

  1. fixes any bugs / adds features over the current one,
  2. Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).

For example, I wonder if there are additional tests / cases that would be improved with the proposed implementation 🤔

The one thing that we still cannot solve automatically then is dynamic query planning. Operators that create streams dynamically still have to make sure they set things up correctly themselves.

In my opinion this is fine -- if operators are making dynamic streams, that is an advanced usecase that today must still handle canceling / yielding. I think it is ok if we can't find a way to automatically provide yielding behavior to them (they are no worse off then today)

One possible downside to this approach is that the cooperative scheduling budget is implementation specific to the Tokio runtime. DataFusion becomes more tied to Tokio rather than less. Not sure if that's an issue or not.

I personally don't think this is an issue as I don't see any movement and have not heard any desire to move away from tokio.

pepijnve

pepijnve commented on Jun 13, 2025

@pepijnve
Contributor

fixes any bugs / adds features over the current one,
Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).

There are a couple of benefits.

It removes the edge case seen in the interleave operator (or any select! style code in general). With the current per stream counter, one stream might want to yield, but the parent stream may decide to poll another stream in response which happens to be ready. The end result is that two cooperating streams may turn into a non-cooperating when they are merged. To fix this, you would need to adjust the merging operator as well and we're basically back where we started.
If all cooperating streams use the same budget, then this problem goes away. Once the yield point has been hit, all cooperating streams will yield.

Using the task budget also avoids the 'redundant yield' problem in the current version. If you now do a simple SELECT * FROM ... query, by default you'll get a Pending after every 64 Ready(RecordBatch). With the task budget you will only actually inject the Pending when it's actually necessary. The system automatically does the right thing.

Finally it aligns the cooperative yielding strategy across the library. RecordBatchReceiverStream is implicitly already using this strategy in a way you cannot opt out of. It's better to have one consist way of solving this cancellation problem once and for all.

I have a patch almost ready. I'll make a draft PR already so this all becomes a bit more tangible.

zhuqi-lucas

zhuqi-lucas commented on Jun 13, 2025

@zhuqi-lucas
ContributorAuthor

fixes any bugs / adds features over the current one,
Is "just" cleaner way to implement the same thing (this is also a fine thing to contribute as well).

There are a couple of benefits.

It removes the edge case seen in the interleave operator (or any select! style code in general). With the current per stream counter, one stream might want to yield, but the parent stream may decide to poll another stream in response which happens to be ready. The end result is that two cooperating streams may turn into a non-cooperating when they are merged. To fix this, you would need to adjust the merging operator as well and we're basically back where we started. If all cooperating streams use the same budget, then this problem goes away. Once the yield point has been hit, all cooperating streams will yield.

So it means this sub-task corner case can be resolved?

Fix the corner case provided in this link: https://gist.github.com/pepijnve/0e1a66f98033c6c44c62a51fb9dbae5a

Using the task budget also avoids the 'redundant yield' problem in the current version. If you now do a simple SELECT * FROM ... query, by default you'll get a Pending after every 64 Ready(RecordBatch). With the task budget you will only actually inject the Pending when it's actually necessary. The system automatically does the right thing.

I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?

Another question:

If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?

pepijnve

pepijnve commented on Jun 13, 2025

@pepijnve
Contributor

So it means this sub-task corner case can be resolved?

Yes, that's correct.

I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?

@zhuqi-lucas that's correct, you can't configure it at the moment. That's the case for RecordBatchReceiverStream today as well indeed. Tokio hardcodes the magic number 128 (see https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).

If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?

The budget is per spawned task. Every time the tokio scheduler lets a task run it gives it a budget of 128 which the task can then deplete until it hits zero. Then the task is coaxed towards yielding by making all budget aware Tokio resources return Pending.
From the perspective of DataFusion code I don't think this really changes all that much. It's the exact same behavior you have today already when the source streams are RecordBatchReceiverStream. So the moment you have a repartition/coalesce you're getting exactly this with the current code.

zhuqi-lucas

zhuqi-lucas commented on Jun 13, 2025

@zhuqi-lucas
ContributorAuthor

So it means this sub-task corner case can be resolved?

Yes, that's correct.

I am curious what's the budget count since we can't config it from datafusion, will it affect performance or other things? It seems not, because we already use RecordBatchReceiverStream for the budget?

@zhuqi-lucas that's correct, you can't configure it at the moment. That's the case for RecordBatchReceiverStream today as well indeed. Tokio hardcodes the magic number 128 (see https://github.com/tokio-rs/tokio/blob/master/tokio/src/task/coop/mod.rs#L116).

If we have to share the one budget for all leaf nodes, will some leaf node very aggressive consuming budget will affect the total fairness or performance?

The budget is per spawned task. Every time the tokio scheduler lets a task run it gives it a budget of 128 which the task can then deplete until it hits zero. Then the task is coaxed towards yielding by making all budget aware Tokio resources return Pending. From the perspective of DataFusion code I don't think this really changes all that much. It's the exact same behavior you have today already when the source streams are RecordBatchReceiverStream. So the moment you have a repartition/coalesce you're getting exactly this with the current code.

Got it, it makes sense to me @pepijnve! Thanks!

pepijnve

pepijnve commented on Jun 13, 2025

@pepijnve
Contributor

This is true in theory -- but I think we also take pains to try and avoid "over scheduling" tasks in tokio -- for example, we purposely only have N input partitions (and hence N streams) per scan, even if there are 100+ files -- the goal is to keep all the cores busy, but not oversubscribed.

What I was trying to say is that from a scheduling/yielding pov you can reason about each box in isolation. Whether you actually try to make 100s of concurrent (not parallel) tasks or not is a rabbit hole for another thread 😄

ozankabak

ozankabak commented on Jun 13, 2025

@ozankabak
Contributor

@alamb @ozankabak wdyt? Maybe this is what you were going for all along and I'm just slowly catching up :smiling:

The change of heart comes from the realization that Tokio itself also takes a 'consume at the leaves' strategy and having a task wide budget ensures that tasks cannot silently ignore the yield request. Once one resource depletes the budget, it's no longer possible to make progress anywhere else provided all resource participate in the budgeting system.

A draft PR would be good to have, I think I can make better comments then. However, solving this at the stream level in a way transparent to the operator builder would be great in general. That was my original intention, but we weren't able to materialize that solution in a reasonable amount of time. Hence the current solution, which is basically a fallback that has some characteristics of the ideal solution (e.g. "transparency", focusing on leaves etc.), but requires support from the planner via to-be-designed APIs. The current approach can evolve into a decent one with such APIs, but it would always be worse than a proper lower-level solution. It would be good if we can build that, but in the meantime, I am glad that we have some solution that works for many cases.

If it turns out that we can arrive at a proper stream-based solution quickly, we can retire this one quickly. Otherwise, we can incrementally improve what we have today as alternatives go through design/experimentation etc.

pepijnve

pepijnve commented on Jun 13, 2025

@pepijnve
Contributor

@ozankabak draft PR is in good enough shape to review I think. The general idea is still the same as what was there before.

The last thing I'm still working on is the ExecutionPlan API where I would like to replace with_cooperative_yielding with a plan property. Trying to get things to be more declarative.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or requesthelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

      Participants

      @pepijnve@alamb@ozankabak@zhuqi-lucas

      Issue actions

        [Epic] Pipeline breaking cancellation support and improvement · Issue #16353 · apache/datafusion