Skip to content

Commit

Permalink
fix[rust]: block projection pd on melt if no values-args given (#5005)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 27, 2022
1 parent 5613f48 commit 9d8ff8e
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 36 deletions.
133 changes: 97 additions & 36 deletions polars/polars-lazy/src/logical_plan/optimizer/projection_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,40 @@ fn update_scan_schema(
pub(crate) struct ProjectionPushDown {}

impl ProjectionPushDown {
/// Projection will be done at this node, but we continue optimization
fn no_pushdown_restart_opt(
&self,
lp: ALogicalPlan,
acc_projections: Vec<Node>,
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp = self.push_down(
alp,
Default::default(),
Default::default(),
projections_seen,
lp_arena,
expr_arena,
)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<PolarsResult<Vec<_>>>()?;
let lp = lp.with_exprs_and_input(exprs, new_inputs);

let builder = ALogicalPlanBuilder::from_lp(lp, expr_arena, lp_arena);
Ok(self.finish_node(acc_projections, builder))
}

fn finish_node(
&self,
local_projections: Vec<Node>,
Expand Down Expand Up @@ -629,46 +663,66 @@ impl ProjectionPushDown {
)?;
Ok(Selection { predicate, input })
}
Melt { input, args, .. } => {
let (mut acc_projections, mut local_projections, names) = split_acc_projections(
acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
);

if !local_projections.is_empty() {
local_projections.extend_from_slice(&acc_projections);
}

// make sure that the requested columns are projected
args.id_vars.iter().for_each(|name| {
add_str_to_accumulated(
name,
&mut acc_projections,
&mut projected_names,
Melt {
input,
args,
schema,
} => {
// all columns are used in melt
if args.value_vars.is_empty() {
// restart projection pushdown
self.no_pushdown_restart_opt(
Melt {
input,
args,
schema,
},
acc_projections,
projections_seen,
lp_arena,
expr_arena,
)
});
args.value_vars.iter().for_each(|name| {
add_str_to_accumulated(
name,
&mut acc_projections,
&mut projected_names,
} else {
let (mut acc_projections, mut local_projections, names) = split_acc_projections(
acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
)
});
);

self.pushdown_and_assign(
input,
acc_projections,
names,
projections_seen,
lp_arena,
expr_arena,
)?;
if !local_projections.is_empty() {
local_projections.extend_from_slice(&acc_projections);
}

let builder = ALogicalPlanBuilder::new(input, expr_arena, lp_arena).melt(args);
Ok(self.finish_node(local_projections, builder))
// make sure that the requested columns are projected
args.id_vars.iter().for_each(|name| {
add_str_to_accumulated(
name,
&mut acc_projections,
&mut projected_names,
expr_arena,
)
});
args.value_vars.iter().for_each(|name| {
add_str_to_accumulated(
name,
&mut acc_projections,
&mut projected_names,
expr_arena,
)
});

self.pushdown_and_assign(
input,
acc_projections,
names,
projections_seen,
lp_arena,
expr_arena,
)?;

let builder = ALogicalPlanBuilder::new(input, expr_arena, lp_arena).melt(args);
Ok(self.finish_node(local_projections, builder))
}
}
Aggregate {
input,
Expand Down Expand Up @@ -1083,7 +1137,14 @@ impl ProjectionPushDown {
}
}
} else {
Ok(lp)
// restart projection pushdown
self.no_pushdown_restart_opt(
lp,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
)
}
}
// Slice and Unions have only inputs and exprs, so we can use same logic.
Expand Down
12 changes: 12 additions & 0 deletions py-polars/tests/unit/test_projections.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,15 @@ def test_projection_on_semi_join_4789() -> None:
q = ab.join(intermediate_agg, on="a")

assert q.collect().to_dict(False) == {"a": [1], "p": [1], "seq": [[1]]}


def test_melt_projection_pd_block_4997() -> None:
assert (
pl.DataFrame({"col1": ["a"], "col2": ["b"]})
.with_row_count()
.lazy()
.melt(id_vars="row_nr")
.groupby("row_nr")
.agg(pl.col("variable").alias("result"))
.collect()
).to_dict(False) == {"row_nr": [0], "result": [["col1", "col2"]]}

0 comments on commit 9d8ff8e

Please sign in to comment.