Skip to content

Commit

Permalink
fix bug in predicate pushdown (#2529)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 3, 2022
1 parent 43865fc commit 7220b8e
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ default = ["compile"]
parquet = ["polars-core/parquet", "polars-io/parquet"]
ipc = ["polars-io/ipc"]
csv-file = ["polars-io/csv-file"]
temporal = ["polars-core/temporal", "polars-time"]
temporal = ["polars-core/temporal", "polars-time", "dtype-datetime"]
# debugging purposesses
fmt = ["polars-core/plain_fmt"]
strings = ["polars-core/strings"]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{
aexpr_to_root_column_name, aexpr_to_root_names, check_input_node, has_aexpr,
rename_aexpr_root_names,
};
use crate::utils::{aexpr_to_root_names, check_input_node, has_aexpr, rename_aexpr_root_names};
use polars_core::datatypes::PlHashMap;
use polars_core::prelude::*;

Expand Down Expand Up @@ -178,33 +175,34 @@ where
// aliases change the column names and because we push the predicates downwards
// this may be problematic as the aliased column may not yet exist.
for projection_node in &projections {
let projection_is_boundary =
predicate_column_is_pushdown_boundary(*projection_node, expr_arena);
let projection_roots = aexpr_to_root_names(*projection_node, expr_arena);
{
let projection_aexpr = expr_arena.get(*projection_node);
if let AExpr::Alias(projection_node, name) = projection_aexpr {
if let AExpr::Alias(_, name) = projection_aexpr {
// if this alias refers to one of the predicates in the upper nodes
// we rename the column of the predicate before we push it downwards.
if let Some(predicate) = acc_predicates.remove(&*name) {
if predicate_column_is_pushdown_boundary(*projection_node, expr_arena) {
if projection_is_boundary {
local_predicates.push(predicate);
continue;
}

match aexpr_to_root_column_name(*projection_node, &*expr_arena) {
if projection_roots.len() == 1 {
// we were able to rename the alias column with the root column name
// before pushing down the predicate
Ok(new_name) => {
rename_aexpr_root_names(predicate, expr_arena, new_name.clone());
rename_aexpr_root_names(predicate, expr_arena, projection_roots[0].clone());

insert_and_combine_predicate(
acc_predicates,
new_name,
predicate,
expr_arena,
);
}
insert_and_combine_predicate(
acc_predicates,
projection_roots[0].clone(),
predicate,
expr_arena,
);
} else {
// this may be a complex binary function. The predicate may only be valid
// on this projected column so we do filter locally.
Err(_) => local_predicates.push(predicate),
local_predicates.push(predicate)
}
}
}
Expand All @@ -219,16 +217,24 @@ where
// remove predicates that cannot be done on the input above
let to_local = acc_predicates
.iter()
.filter_map(|kv| {
// if they can be executed on input node above its ok
if check_input_node(*kv.1, input_schema, expr_arena)
// if this predicate not equals a column that is a computation
// it is ok
&& !is_boundary
.filter_map(|(name, predicate)| {
// there are some conditions we need to check for every predicate we try to push down
// 1. does the column exist on the node above
// 2. if the projection is a computation/transformation and the predicate is based on that column
// we must block because the predicate would be incorrect.
// 3. if applying the predicate earlier does not influence the result of this projection
// this is the case for instance with a sum operation (filtering out rows influences the result)

// checks 1.
if check_input_node(*predicate, input_schema, expr_arena)
// checks 2.
&& !(projection_roots.contains(name) && projection_is_boundary)
// checks 3.
&& !is_boundary
{
None
} else {
Some(kv.0.clone())
Some(name.clone())
}
})
.collect::<Vec<_>>();
Expand Down
25 changes: 25 additions & 0 deletions polars/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,28 @@ fn filter_blocked_by_map() -> Result<()> {

Ok(())
}

#[test]
#[cfg(all(feature = "temporal", feature = "strings"))]
fn test_strptime_block_predicate() -> Result<()> {
let df = df![
"date" => ["2021-01-01", "2021-01-02"]
]?;

let q = df
.lazy()
.with_column(col("date").str().strptime(StrpTimeOptions {
date_dtype: DataType::Date,
..Default::default()
}))
.filter(col("date").gt(Expr::Literal(LiteralValue::DateTime(
NaiveDate::from_ymd(2021, 1, 1).and_hms(0, 0, 0),
TimeUnit::Milliseconds,
))));

assert!(!predicate_at_scan(q.clone()));
let df = q.collect()?;
assert_eq!(df.shape(), (1, 1));

Ok(())
}

0 comments on commit 7220b8e

Please sign in to comment.