Skip to content

Commit

Permalink
Implement projection pushdown for horizontal concatenation
Browse files Browse the repository at this point in the history
  • Loading branch information
adamreeve committed Dec 22, 2023
1 parent db8863d commit 7e7630a
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_hconcat(
proj_pd: &mut ProjectionPushDown,
inputs: Vec<Node>,
schema: SchemaRef,
options: HConcatOptions,
acc_projections: Vec<Node>,
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<ALogicalPlan> {
// When applying projection pushdown to horizontal concatenation,
// we apply pushdown to all of the inputs using the subset of accumulated projections relevant to each input,
// then rebuild the concatenated schema.

let schema = if acc_projections.is_empty() {
schema
} else {
let mut remaining_projections: PlHashSet<Node> = acc_projections.into_iter().collect();

for input in inputs.iter() {
let mut input_pushdown = Vec::new();

for proj in remaining_projections.iter() {
let input_schema = lp_arena.get(*input).schema(lp_arena);
if check_input_node(*proj, input_schema.as_ref(), expr_arena) {
input_pushdown.push(*proj);
}
}

let mut input_names = PlHashSet::new();
for proj in &input_pushdown {
remaining_projections.remove(proj);
for name in aexpr_to_leaf_names(*proj, expr_arena) {
input_names.insert(name);
}
}
proj_pd.pushdown_and_assign(
*input,
input_pushdown,
input_names,
projections_seen,
lp_arena,
expr_arena,
)?;
}

let schema_size = inputs
.iter()
.map(|input| lp_arena.get(*input).schema(lp_arena).len())
.sum();
let mut new_schema = Schema::with_capacity(schema_size);
for input in inputs.iter() {
let schema = lp_arena.get(*input).schema(lp_arena);
schema.as_ref().iter().for_each(|(name, dtype)| {
new_schema.with_column(name.clone(), dtype.clone());
});
}

Arc::new(new_schema)
};

Ok(ALogicalPlan::HConcat {
inputs,
schema,
options,
})
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod functions;
mod generic;
mod group_by;
#[cfg(feature = "horizontal_concat")]
mod hconcat;
mod hstack;
mod joins;
mod projection;
Expand All @@ -18,6 +20,7 @@ use crate::logical_plan::Context;
use crate::prelude::iterator::ArenaExprIter;
use crate::prelude::optimizer::projection_pushdown::generic::process_generic;
use crate::prelude::optimizer::projection_pushdown::group_by::process_group_by;
use crate::prelude::optimizer::projection_pushdown::hconcat::process_hconcat;
use crate::prelude::optimizer::projection_pushdown::hstack::process_hstack;
use crate::prelude::optimizer::projection_pushdown::joins::process_join;
use crate::prelude::optimizer::projection_pushdown::projection::process_projection;
Expand Down Expand Up @@ -646,6 +649,21 @@ impl ProjectionPushDown {
lp_arena,
expr_arena,
),
#[cfg(feature = "horizontal_concat")]
HConcat {
inputs,
schema,
options,
} => process_hconcat(
self,
inputs,
schema,
options,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
),
lp @ Union { .. } => process_generic(
self,
lp,
Expand All @@ -655,8 +673,6 @@ impl ProjectionPushDown {
lp_arena,
expr_arena,
),
#[cfg(feature = "horizontal_concat")]
lp @ HConcat { .. } => Ok(lp), // TODO
// These nodes only have inputs and exprs, so we can use same logic.
lp @ Slice { .. } | lp @ Sink { .. } => process_generic(
self,
Expand Down
29 changes: 29 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import numpy as np

import polars as pl
from polars.testing import assert_frame_equal


def test_projection_on_semi_join_4789() -> None:
Expand Down Expand Up @@ -84,6 +85,34 @@ def test_unnest_projection_pushdown() -> None:
}


def test_hconcat_projection_pushdown() -> None:
lf1 = pl.LazyFrame({"a": [0, 1, 2], "b": [3, 4, 5]})
lf2 = pl.LazyFrame({"c": [6, 7, 8], "d": [9, 10, 11]})
query = pl.concat([lf1, lf2], how="horizontal").select(["a", "d"])

explanation = query.explain()
assert explanation.count("PROJECT 1/2 COLUMNS") == 2

out = query.collect()
expected = pl.DataFrame({"a": [0, 1, 2], "d": [9, 10, 11]})
assert_frame_equal(out, expected)


def test_hconcat_projection_pushdown_length_maintained() -> None:
# We can't eliminate the second input completely as this affects
# the length of the result, even though no columns are used.
lf1 = pl.LazyFrame({"a": [0, 1], "b": [2, 3]})
lf2 = pl.LazyFrame({"c": [4, 5, 6, 7], "d": [8, 9, 10, 11]})
query = pl.concat([lf1, lf2], how="horizontal").select(["a"])

explanation = query.explain()
assert "PROJECT 1/2 COLUMNS" in explanation

out = query.collect()
expected = pl.DataFrame({"a": [0, 1, None, None]})
assert_frame_equal(out, expected)


def test_unnest_columns_available() -> None:
df = pl.DataFrame(
{
Expand Down

0 comments on commit 7e7630a

Please sign in to comment.