Skip to content

Commit

Permalink
[LAZY] do not let predicate pushdown pass slice node
Browse files Browse the repository at this point in the history
This would lead to different slice size dependent on
selectivity.
  • Loading branch information
ritchie46 committed Jan 12, 2022
1 parent 663b5a1 commit ddaf6f3
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,23 +29,6 @@ impl PredicatePushDown {
}
}

fn finish_at_leaf(
&self,
lp: ALogicalPlan,
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> ALogicalPlan {
match acc_predicates.len() {
// No filter in the logical plan
0 => lp,
_ => {
let local_predicates = acc_predicates.into_iter().map(|t| t.1).collect();
self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)
}
}
}

fn pushdown_and_assign(
&self,
input: Node,
Expand All @@ -66,10 +49,20 @@ impl PredicatePushDown {
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
has_projections: bool,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

// we should get pass these projections
if has_projections
&& exprs
.iter()
.any(|e_n| is_pushdown_boundary(*e_n, expr_arena))
{
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
}

let mut local_predicates = Vec::with_capacity(acc_predicates.len());

// determine new inputs by pushing down predicates
Expand All @@ -80,7 +73,8 @@ impl PredicatePushDown {
// it could be that this node just added the column where we base the predicate on
let input_schema = lp_arena.get(node).schema(lp_arena);
let mut pushdown_predicates = optimizer::init_hashmap();
for (_, &predicate) in &acc_predicates {
for &predicate in acc_predicates.values() {
// we can pushdown the predicate
if check_input_node(predicate, input_schema, expr_arena) {
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
Expand All @@ -89,7 +83,9 @@ impl PredicatePushDown {
predicate,
expr_arena,
)
} else {
}
// we cannot pushdown the predicate we do it here
else {
local_predicates.push(predicate);
}
}
Expand All @@ -105,6 +101,33 @@ impl PredicatePushDown {
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}

/// Filter will be done at this node, but we continue optimization
fn no_pushdown_restart_opt(
&self,
lp: ALogicalPlan,
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp = self.push_down(alp, init_hashmap(), lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;
let lp = lp.from_exprs_and_input(exprs, new_inputs);

// all predicates are done locally
let local_predicates = acc_predicates.values().copied().collect::<Vec<_>>();
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}

/// Predicate pushdown optimizer
///
/// # Arguments
Expand Down Expand Up @@ -328,30 +351,6 @@ impl PredicatePushDown {
};
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Aggregate {
input,
keys,
aggs,
schema,
apply,
maintain_order,
dynamic_options,
} => {
// start with a new empty predicate aggregator
self.pushdown_and_assign(input, optimizer::init_hashmap(), lp_arena, expr_arena)?;

// dont push down predicates. An aggregation needs all rows
let lp = Aggregate {
input,
keys,
aggs,
schema,
apply,
maintain_order,
dynamic_options,
};
Ok(self.finish_at_leaf(lp, acc_predicates, lp_arena, expr_arena))
}
Join {
input_left,
input_right,
Expand Down Expand Up @@ -445,17 +444,24 @@ impl PredicatePushDown {
predicate_pd: true, ..
} = lp
{
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena)
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
} else {
Ok(lp)
}
}
// Pushed down passed these nodes
lp @ Cache { .. } | lp @ Union { .. } | lp @ Sort { .. } => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
}
lp @ HStack {..} => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
}
// NOT Pushed down passed these nodes
// predicates influence slice sizes
lp @ Slice { .. }
| lp @ HStack { .. }
| lp @ Cache { .. }
| lp @ Union { .. }
| lp @ Sort { .. } => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena)
// dont push down predicates. An aggregation needs all rows
| lp @ Aggregate {..} => {
self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,23 @@ fn test_pred_pd_1() -> Result<()> {

// check if we pass hstack
let lp = df
.clone()
.lazy()
.with_columns([col("A").alias("C"), col("B")])
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));

// check if we do not pass slice
let lp = df
.lazy()
.limit(10)
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(!projection_at_scan(&lp_arena, lp));

Ok(())
}

Expand Down

0 comments on commit ddaf6f3

Please sign in to comment.