Skip to content

Commit

Permalink
perf(rust): Aggregate projection pushdown (#5556)
Browse files Browse the repository at this point in the history
  • Loading branch information
Kuhlwein committed Nov 23, 2022
1 parent f540c94 commit ec79597
Show file tree
Hide file tree
Showing 3 changed files with 68 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::prelude::iterator::ArenaExprIter;
use crate::prelude::*;
use crate::utils::{
aexpr_assign_renamed_root, aexpr_to_leaf_names, aexpr_to_leaf_nodes, check_input_node,
expr_is_projected_upstream,
};

fn init_vec() -> Vec<Node> {
Expand Down Expand Up @@ -295,18 +296,16 @@ impl ProjectionPushDown {
for e in &expr {
if has_pushed_down {
// remove projections that are not used upstream
if projections_seen > 0 {
let input_schema = lp_arena.get(input).schema(lp_arena);
// don't do projection that is not used in upstream selection
let output_field = expr_arena
.get(*e)
.to_field(input_schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();
let is_used_upstream = projected_names.contains(output_name.as_str());
if !is_used_upstream {
continue;
}
if projections_seen > 0
&& !expr_is_projected_upstream(
e,
input,
lp_arena,
expr_arena,
&projected_names,
)
{
continue;
}

// in this branch we check a double projection case
Expand Down Expand Up @@ -740,6 +739,8 @@ impl ProjectionPushDown {
let builder = ALogicalPlanBuilder::new(input, expr_arena, lp_arena);
Ok(self.finish_node(acc_projections, builder))
} else {
let has_pushed_down = !acc_projections.is_empty();

// todo! remove unnecessary vec alloc.
let (mut acc_projections, _local_projections, mut names) =
split_acc_projections(
Expand All @@ -748,8 +749,25 @@ impl ProjectionPushDown {
expr_arena,
);

// add the columns used in the aggregations to the projection
for agg in &aggs {
// add the columns used in the aggregations to the projection only if they are used upstream
let projected_aggs: Vec<Node> = aggs
.into_iter()
.filter(|agg| {
if has_pushed_down && projections_seen > 0 {
expr_is_projected_upstream(
agg,
input,
lp_arena,
expr_arena,
&projected_names,
)
} else {
true
}
})
.collect();

for agg in &projected_aggs {
add_expr_to_accumulated(*agg, &mut acc_projections, &mut names, expr_arena);
}

Expand Down Expand Up @@ -784,7 +802,7 @@ impl ProjectionPushDown {

let builder = ALogicalPlanBuilder::new(input, expr_arena, lp_arena).groupby(
keys,
aggs,
projected_aggs,
apply,
maintain_order,
options,
Expand Down
17 changes: 17 additions & 0 deletions polars/polars-lazy/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,20 @@ where
}
single_pred.expect("an empty iterator was passed")
}

pub fn expr_is_projected_upstream(
e: &Node,
input: Node,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
projected_names: &PlHashSet<Arc<str>>,
) -> bool {
let input_schema = lp_arena.get(input).schema(lp_arena);
// don't do projection that is not used in upstream selection
let output_field = expr_arena
.get(*e)
.to_field(input_schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();
projected_names.contains(output_name.as_str())
}
18 changes: 18 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,24 @@ def test_double_projection_pushdown() -> None:
)


def test_groupby_projection_pushdown() -> None:
assert (
"PROJECT 2/3 COLUMNS"
in (
pl.DataFrame({"c0": [], "c1": [], "c2": []})
.lazy()
.groupby("c0")
.agg(
[
pl.col("c1").sum().alias("sum(c1)"),
pl.col("c2").mean().alias("mean(c2)"),
]
)
.select(["sum(c1)"])
).describe_optimized_plan()
)


def test_unnest_projection_pushdown() -> None:
lf = pl.DataFrame({"x|y|z": [1, 2], "a|b|c": [2, 3]}).lazy()

Expand Down

0 comments on commit ec79597

Please sign in to comment.