Skip to content

Commit

Permalink
performance[rust]: don't execute unused with_column expressions (#5026)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 29, 2022
1 parent 4ea2f78 commit 507ddd1
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1038,10 +1038,45 @@ impl ProjectionPushDown {
let builder = ALogicalPlanBuilder::new(root, expr_arena, lp_arena);
Ok(self.finish_node(local_projection, builder))
}
HStack { input, exprs, .. } => {
// Make sure that columns selected with_columns are available
// only if not empty. If empty we already select everything.
HStack {
input,
mut exprs,
schema,
} => {
if !acc_projections.is_empty() {
let mut pruned_with_cols = Vec::with_capacity(exprs.len());

// Check if output names are used upstream
// if not, we can prune the `with_column` expression
// as it is not used in the output.
for node in &exprs {
let output_field = expr_arena
.get(*node)
.to_field(schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();

let is_used_upstream = projected_names.contains(output_name.as_str());

if is_used_upstream {
pruned_with_cols.push(*node);
}
}

if pruned_with_cols.is_empty() {
self.pushdown_and_assign(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
return Ok(lp_arena.take(input));
}

// Make sure that columns selected with_columns are available
// only if not empty. If empty we already select everything.
for expression in &exprs {
add_expr_to_accumulated(
*expression,
Expand All @@ -1050,8 +1085,18 @@ impl ProjectionPushDown {
expr_arena,
);
}
}

exprs = pruned_with_cols
}
// projections that select columns added by
// this `with_column` operation can be dropped
// For instance in:
//
// q
// .with_column(col("a").alias("b")
// .select(["a", "b"])
//
// we can drop the "b" projection at this level
let (acc_projections, _, names) = split_acc_projections(
acc_projections,
&lp_arena.get(input).schema(lp_arena),
Expand Down
22 changes: 15 additions & 7 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,15 +352,23 @@ impl LazyFrame {
.map(
move |mut df: DataFrame| {
let cols = df.get_columns_mut();
let mut removed_count = 0;
for (existing, new) in existing.iter().zip(new.iter()) {
let idx_a = cols
.iter()
.position(|s| s.name() == existing.as_str())
.unwrap();
let idx_b = cols.iter().position(|s| s.name() == new.as_str()).unwrap();
cols.swap(idx_a, idx_b);
let idx_a = cols.iter().position(|s| s.name() == existing.as_str());
let idx_b = cols.iter().position(|s| s.name() == new.as_str());

match (idx_a, idx_b) {
(Some(idx_a), Some(idx_b)) => {
cols.swap(idx_a, idx_b);
}
// renamed columns are removed by predicate pushdown
_ => {
removed_count += 1;
continue;
}
}
}
cols.truncate(cols.len() - existing.len());
cols.truncate(cols.len() - (existing.len() - removed_count));
Ok(df)
},
None,
Expand Down
32 changes: 32 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -453,3 +453,35 @@ fn test_string_addition_to_concat_str() -> PolarsResult<()> {

Ok(())
}
#[test]
fn test_with_column_prune() -> PolarsResult<()> {
// don't
let df = df![
"c0" => [0],
"c1" => [0],
"c2" => [0],
]?;

let q = df.lazy().with_column(col("c0")).select([col("c1")]);

let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();

// check if with_column is pruned
assert!((&lp_arena).iter(lp).all(|(_, lp)| {
use ALogicalPlan::*;
match lp {
ALogicalPlan::MapFunction {
function: FunctionNode::FastProjection { .. },
..
}
| DataFrameScan { .. } => true,
_ => false,
}
}));
assert_eq!(
q.schema().unwrap().as_ref(),
&Schema::from([Field::new("c1", DataType::Int32)])
);
Ok(())
}

0 comments on commit 507ddd1

Please sign in to comment.