Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust,python): support horizontal concatenation of LazyFrames #13139

Merged
merged 17 commits into from Dec 28, 2023

Conversation

adamreeve
Copy link
Contributor

@adamreeve adamreeve commented Dec 19, 2023

This fixes #10203

I've included support for slice pushdown and projection pushdown, but predicate pushdown is blocked.

In the projection pushdown implementation, it would be nice to be able to remove inputs where no columns are used, but because inputs can have different lengths we need to keep them all to make sure the final length is correct.

This doesn't implement proper streaming support, which could be added later, but I did have to fix the streaming logic for non-streamable nodes with more than one input in insert_streaming_nodes. The previous code that used insert_file_sink_ptr didn't work correctly when this was incremented by more than 1. For example, if there are 2 inputs, we'd add a new sink node when processing the first input, then on the next iteration of the while loop in insert_streaming_nodes the inserted sink itself would be popped off the stack and the ptr value would be decremented to 0 before getting to the second input node. We also need a new pipeline tree per input rather than only one new tree.

@github-actions github-actions bot added enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars labels Dec 19, 2023
Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice addition and great PR. It really does look great. I have some minor comments here and there and then it should be good to go.

crates/polars-lazy/src/dsl/functions.rs Outdated Show resolved Hide resolved
crates/polars-lazy/src/dsl/functions.rs Outdated Show resolved Hide resolved
crates/polars-lazy/src/physical_plan/file_cache.rs Outdated Show resolved Hide resolved
)?;
}

let schema_size = inputs
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should create a fn merge_schemas(schemas: &[Schema]) function. That can be shared with concat_lf_horizontal and here.

It would also amortize the cost of materializing the schema. For some logical plan nodes this is a full new allocation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea thanks, I've added this although made it take &[SchemaRef] instead to avoid schema copies.

@@ -83,21 +81,18 @@ pub(crate) fn insert_streaming_nodes(
// this allows us to split at joins/unions and share a sink
let root = insert_file_sink(root, lp_arena);

// We use mutation to communicate when we need to insert a file sink.
// We use a bool flag in the stack to communicate when we need to insert a file sink.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has been a while. So if I understand it correctly we now can have multiple streaming subtrees that all sink into a hconcat, where previous with for instance union, the union would be part of the streaming subtree and still produce a single file_sink that would be picked up by the default engine?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that's right, although there are separate sinks for each input stream that become the inputs of the hconcat. It looks like the existing code was intended to work this way, but that behaviour was never actually used as all of the plans that have multiple inputs (join and union) are handled explicitly in the insert_streaming_nodes match expression rather than falling through to process_non_streamable_node.


for schema in schemas {
schema.iter().try_for_each(|(name, dtype)| {
if merged_schema.with_column(name.clone(), dtype.clone()).is_none() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker for this PR, but this can be done elegantly with ok_or_else(|| polars_err!(..).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, this is actually doing the opposite of ok_or_else, a Some result should be mapped to an Err and a None to Ok(()). I couldn't find a more concise way to express this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh right!

Copy link
Member

@ritchie46 ritchie46 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great PR @adamreeve. Thanks a lot.

@ritchie46 ritchie46 merged commit f553c4c into pola-rs:main Dec 28, 2023
26 checks passed
@adamreeve adamreeve deleted the lazy-hconcat branch December 28, 2023 20:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or an improvement of an existing feature python Related to Python Polars rust Related to Rust Polars
Projects
None yet
Development

Successfully merging this pull request may close these issues.

pl.concat(how="horizontal") support for LazyFrame
2 participants