Skip to content

Commit

Permalink
make predicate pushdown more DRY
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 12, 2022
1 parent eaedab6 commit 663b5a1
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 103 deletions.
173 changes: 70 additions & 103 deletions polars/polars-lazy/src/logical_plan/optimizer/predicate_pushdown/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use utils::*;
pub(crate) struct PredicatePushDown {}

impl PredicatePushDown {
fn apply_predicate(
fn optional_apply_predicate(
&self,
lp: ALogicalPlan,
local_predicates: Vec<Node>,
Expand Down Expand Up @@ -41,7 +41,7 @@ impl PredicatePushDown {
0 => lp,
_ => {
let local_predicates = acc_predicates.into_iter().map(|t| t.1).collect();
self.apply_predicate(lp, local_predicates, lp_arena, expr_arena)
self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena)
}
}
}
Expand All @@ -59,6 +59,52 @@ impl PredicatePushDown {
Ok(())
}

/// Filter will be pushed down.
fn pushdown_and_continue(
&self,
lp: ALogicalPlan,
acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let mut local_predicates = Vec::with_capacity(acc_predicates.len());

// determine new inputs by pushing down predicates
let new_inputs = inputs
.iter()
.map(|&node| {
// first we check if we are able to push down the predicate pass this node
// it could be that this node just added the column where we base the predicate on
let input_schema = lp_arena.get(node).schema(lp_arena);
let mut pushdown_predicates = optimizer::init_hashmap();
for (_, &predicate) in &acc_predicates {
if check_input_node(predicate, input_schema, expr_arena) {
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
&mut pushdown_predicates,
name,
predicate,
expr_arena,
)
} else {
local_predicates.push(predicate);
}
}

let alp = lp_arena.take(node);
let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;

let lp = lp.from_exprs_and_input(exprs, new_inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}

/// Predicate pushdown optimizer
///
/// # Arguments
Expand All @@ -72,14 +118,14 @@ impl PredicatePushDown {
/// * `expr_arena` - The local memory arena for the expressions.
fn push_down(
&self,
logical_plan: ALogicalPlan,
lp: ALogicalPlan,
mut acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
use ALogicalPlan::*;

match logical_plan {
match lp {
Selection { predicate, input } => {
let name = roots_to_key(&aexpr_to_root_names(predicate, expr_arena));
insert_and_combine_predicate(&mut acc_predicates, name, predicate, expr_arena);
Expand All @@ -99,7 +145,7 @@ impl PredicatePushDown {
.build();
// do all predicates here
let local_predicates = acc_predicates.into_iter().map(|(_, v)| v).collect();
return Ok(self.apply_predicate(
return Ok(self.optional_apply_predicate(
lp,
local_predicates,
lp_arena,
Expand All @@ -117,7 +163,7 @@ impl PredicatePushDown {
input,
schema,
};
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
DataFrameScan {
df,
Expand Down Expand Up @@ -159,7 +205,7 @@ impl PredicatePushDown {
value_vars,
schema,
};
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
LocalProjection { expr, input, .. } => {
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
Expand Down Expand Up @@ -248,7 +294,7 @@ impl PredicatePushDown {

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = Explode { input, columns };
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Distinct {
input,
Expand Down Expand Up @@ -280,7 +326,7 @@ impl PredicatePushDown {
maintain_order,
subset,
};
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Aggregate {
input,
Expand Down Expand Up @@ -391,104 +437,25 @@ impl PredicatePushDown {
schema,
options,
};
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
HStack { input, exprs, .. } => {
// First we get all names of added columns in this HStack operation
// and then we remove the predicates from the eligible container if they are
// dependent on data we've added in this node.

for node in &exprs {
if is_pushdown_boundary(*node, expr_arena) {
let lp = ALogicalPlanBuilder::new(input, expr_arena, lp_arena)
.with_columns(exprs)
.build();
// do all predicates here
let local_predicates = acc_predicates.into_iter().map(|(_, v)| v).collect();
return Ok(self.apply_predicate(
lp,
local_predicates,
lp_arena,
expr_arena,
));
}
lp @ Udf { .. } => {
if let ALogicalPlan::Udf {
predicate_pd: true, ..
} = lp
{
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena)
} else {
Ok(lp)
}

let (local_predicates, exprs) = rewrite_projection_node(
expr_arena,
lp_arena,
&mut acc_predicates,
exprs,
input,
);

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = ALogicalPlanBuilder::new(input, expr_arena, lp_arena)
.with_columns(exprs)
.build();

Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}

Udf {
input,
function,
predicate_pd,
projection_pd,
schema,
} => {
if predicate_pd {
let input_schema = lp_arena.get(input).schema(lp_arena);
let mut pushdown_predicates = optimizer::init_hashmap();
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
for (_, predicate) in acc_predicates {
if check_input_node(predicate, input_schema, expr_arena) {
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
&mut pushdown_predicates,
name,
predicate,
expr_arena,
)
} else {
local_predicates.push(predicate);
}
}
self.pushdown_and_assign(input, pushdown_predicates, lp_arena, expr_arena)?;
let lp = Udf {
input,
function,
predicate_pd,
projection_pd,
schema,
};

return Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena));
}
Ok(Udf {
input,
function,
predicate_pd,
projection_pd,
schema,
})
}
lp @ Slice { .. } | lp @ Cache { .. } | lp @ Union { .. } | lp @ Sort { .. } => {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp =
self.push_down(alp, acc_predicates.clone(), lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;

Ok(lp.from_exprs_and_input(exprs, new_inputs))
lp @ Slice { .. }
| lp @ HStack { .. }
| lp @ Cache { .. }
| lp @ Union { .. }
| lp @ Sort { .. } => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena)
}
}
}
Expand Down
10 changes: 10 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,23 @@ fn test_pred_pd_1() -> Result<()> {

// check if we understand that we can unwrap the alias
let lp = df
.clone()
.lazy()
.select([col("A").alias("C"), col("B")])
.filter(col("C").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));

// check if we pass hstack
let lp = df
.lazy()
.with_columns([col("A").alias("C"), col("B")])
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));

Ok(())
}

Expand Down

0 comments on commit 663b5a1

Please sign in to comment.