Skip to content

Commit

Permalink
loosen predicate pushdown restrictions
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 27, 2022
1 parent 6c6274b commit d9736cf
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ impl PredicatePushDown {
// we should not pass these projections
if exprs
.iter()
.any(|e_n| is_pushdown_boundary(*e_n, expr_arena))
.any(|e_n| other_column_is_pushdown_boundary(*e_n, expr_arena))
{
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::logical_plan::iterator::ArenaExprIter;
use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{
Expand Down Expand Up @@ -109,15 +108,46 @@ pub(super) fn get_insertion_name(
/// For instance shifts | sorts results are influenced by a filter so we do all predicates before the shift | sort
/// The rule of thumb is any operation that changes the order of a column w/r/t other columns should be a
/// predicate pushdown blocker.
pub(super) fn is_pushdown_boundary(node: Node, expr_arena: &Arena<AExpr>) -> bool {
///
/// This checks the boundary of other columns
pub(super) fn other_column_is_pushdown_boundary(node: Node, expr_arena: &Arena<AExpr>) -> bool {
let matches = |e: &AExpr| {
matches!(
e,
AExpr::Shift { .. } | AExpr::Sort { .. } | AExpr::SortBy { .. }
| AExpr::Agg(_) // an aggregation needs all rows
| AExpr::Reverse(_)
// Functions are black boxes for us and we can never pass them
// everything that works on groups likely changes to order of elements w/r/t the other columns
| AExpr::Function {options: FunctionOptions { collect_groups: ApplyOptions::ApplyGroups, .. }, ..}
| AExpr::Function {options: FunctionOptions { collect_groups: ApplyOptions::ApplyList, .. }, ..}
| AExpr::BinaryExpr {..}
| AExpr::Cast {data_type: DataType::Float32 | DataType::Float64, ..}
// still need to investigate this one
| AExpr::Explode {..}
// A groupby needs all rows for aggregation
| AExpr::Window {..}
| AExpr::Literal(LiteralValue::Range {..})
) ||
// a series that is not a singleton would also have a different result
// if filter is applied earlier
matches!(e, AExpr::Literal(LiteralValue::Series(s)) if s.len() > 1
)
};
has_aexpr(node, expr_arena, matches)
}

/// This checks the boundary of same columns. So that means columns that are referred in the predicate
pub(super) fn predicate_column_is_pushdown_boundary(node: Node, expr_arena: &Arena<AExpr>) -> bool {
let matches = |e: &AExpr| {
matches!(
e,
AExpr::Shift { .. } | AExpr::Sort { .. } | AExpr::SortBy { .. }
| AExpr::Agg(_) // an aggregation needs all rows
| AExpr::Reverse(_)
// everything that works on groups likely changes to order of elements w/r/t the other columns
| AExpr::Function {..}
| AExpr::BinaryExpr {..}
| AExpr::Cast {data_type: DataType::Float32 | DataType::Float64, ..}
// still need to investigate this one
| AExpr::Explode {..}
// A groupby needs all rows for aggregation
Expand Down Expand Up @@ -149,12 +179,17 @@ where
// this may be problematic as the aliased column may not yet exist.
for projection_node in &projections {
{
let e = expr_arena.get(*projection_node);
if let AExpr::Alias(e, name) = e {
let projection_aexpr = expr_arena.get(*projection_node);
if let AExpr::Alias(projection_node, name) = projection_aexpr {
// if this alias refers to one of the predicates in the upper nodes
// we rename the column of the predicate before we push it downwards.
if let Some(predicate) = acc_predicates.remove(&*name) {
match aexpr_to_root_column_name(*e, &*expr_arena) {
if predicate_column_is_pushdown_boundary(*projection_node, expr_arena) {
local_predicates.push(predicate);
continue;
}

match aexpr_to_root_column_name(*projection_node, &*expr_arena) {
// we were able to rename the alias column with the root column name
// before pushing down the predicate
Ok(new_name) => {
Expand All @@ -175,19 +210,11 @@ where
}
}

let e = expr_arena.get(*projection_node);
let input_schema = lp_arena.get(input).schema(lp_arena);

// we check if predicates can be done on the input above
// with the following conditions:

// 1. predicate based on current column may only pushed down if simple projection, e.g. col() / col().alias()
let expr_depth = (&*expr_arena).iter(*projection_node).count();
let is_computation = if let AExpr::Alias(_, _) = e {
expr_depth > 2
} else {
expr_depth > 1
};
// this can only be done if the current projection is not a projection boundary
let is_boundary = other_column_is_pushdown_boundary(*projection_node, expr_arena);

// remove predicates that cannot be done on the input above
let to_local = acc_predicates
Expand All @@ -197,7 +224,7 @@ where
if check_input_node(*kv.1, input_schema, expr_arena)
// if this predicate not equals a column that is a computation
// it is ok
&& !is_computation
&& !is_boundary
{
None
} else {
Expand Down
44 changes: 17 additions & 27 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ fn get_arenas() -> (Arena<AExpr>, Arena<ALogicalPlan>) {
(expr_arena, lp_arena)
}

pub(crate) fn predicate_at_scan(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
pub(crate) fn predicate_at_scan(q: LazyFrame) -> bool {
let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();

(&lp_arena).iter(lp).any(|(_, lp)| {
use ALogicalPlan::*;
match lp {
Expand Down Expand Up @@ -44,44 +47,36 @@ fn slice_at_scan(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
fn test_pred_pd_1() -> Result<()> {
let df = fruits_cars();

let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = df
let q = df
.clone()
.lazy()
.select([col("A"), col("B")])
.filter(col("A").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;
.filter(col("A").gt(lit(1)));

assert!(predicate_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(q));

// check if we understand that we can unwrap the alias
let lp = df
let q = df
.clone()
.lazy()
.select([col("A").alias("C"), col("B")])
.filter(col("C").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;
.filter(col("C").gt(lit(1)));

assert!(predicate_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(q));

// check if we pass hstack
let lp = df
let q = df
.clone()
.lazy()
.with_columns([col("A").alias("C"), col("B")])
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;
.filter(col("B").gt(lit(1)));

assert!(predicate_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(q));

// check if we do not pass slice
let lp = df
.lazy()
.limit(10)
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;
let q = df.lazy().limit(10).filter(col("B").gt(lit(1)));

assert!(!predicate_at_scan(&lp_arena, lp));
assert!(!predicate_at_scan(q));

Ok(())
}
Expand Down Expand Up @@ -145,10 +140,7 @@ pub fn test_predicate_block_cast() -> Result<()> {
.filter(col("value").lt(lit(2.5f32)));

for lf in [lf1, lf2] {
// make sure that the predicate is not pushed down
let (mut expr_arena, mut lp_arena) = get_arenas();
let root = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
assert!(!predicate_at_scan(&mut lp_arena, root));
assert!(!predicate_at_scan(lf.clone()));

let out = lf.collect()?;
let s = out.column("value").unwrap();
Expand Down Expand Up @@ -183,9 +175,7 @@ fn test_lazy_filter_and_rename() {
GetOutput::from_type(DataType::Boolean),
));
// the rename function should not interfere with the predicate pushdown
let (mut expr_arena, mut lp_arena) = get_arenas();
let root = lf.clone().optimize(&mut lp_arena, &mut expr_arena).unwrap();
assert!(predicate_at_scan(&mut lp_arena, root));
assert!(predicate_at_scan(lf.clone()));

assert_eq!(lf.collect().unwrap().get_column_names(), &["x", "b", "c"]);
}
24 changes: 21 additions & 3 deletions polars/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ fn test_multiple_roots() -> Result<()> {
let lf = lf.filter(col("bar").lt(lit(110i32)));

// also check if all predicates are combined and pushed down
let root = lf.optimize(&mut lp_arena, &mut expr_arena)?;
assert!(predicate_at_scan(&mut lp_arena, root));
let root = lf.clone().optimize(&mut lp_arena, &mut expr_arena)?;
assert!(predicate_at_scan(lf));
// and that we don't have any filter node
assert!(!(&lp_arena)
.iter(root)
Expand All @@ -24,7 +24,7 @@ fn test_multiple_roots() -> Result<()> {
}

#[test]
#[cfg(feature = "is_in")]
#[cfg(all(feature = "is_in", feature = "strings"))]
fn test_issue_2472() -> Result<()> {
let df = df![
"group" => ["54360-2001-0-20020312-4-1"
Expand Down Expand Up @@ -62,3 +62,21 @@ fn test_issue_2472() -> Result<()> {

Ok(())
}

#[test]
fn test_pass_unrelated_apply() -> Result<()> {
// maps should not influence a predicate of a different column as maps should not depend on previous values
let df = fruits_cars();

let q = df
.lazy()
.with_column(col("A").map(
|s| Ok(s.is_null().into_series()),
GetOutput::from_type(DataType::Boolean),
))
.filter(col("B").gt(lit(10i32)));

assert!(predicate_at_scan(q));

Ok(())
}

0 comments on commit d9736cf

Please sign in to comment.