Skip to content

Commit

Permalink
fix invalid ReplaceDropNulls optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 20, 2021
1 parent ea1d5de commit 8eb817d
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 46 deletions.
117 changes: 109 additions & 8 deletions polars/polars-lazy/src/logical_plan/optimizer/drop_nulls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,45 @@ impl OptimizationRule for ReplaceDropNulls {
use ALogicalPlan::*;
match lp {
Selection { input, predicate } => {
if (&*expr_arena).iter(*predicate).all(|(_, e)| {
// We want to make sure we find this pattern
// A != null AND B != null AND C != null .. etc.
// the outer expression always is a binary and operation and the inner
let iter = (&*expr_arena).iter(*predicate);
let is_binary_and = |e: &AExpr| {
matches!(
e,
AExpr::IsNotNull(_)
| AExpr::BinaryExpr {
op: Operator::And,
..
}
| AExpr::Column(_)
&AExpr::BinaryExpr {
op: Operator::And,
..
}
)
}) {
};
let is_not_null = |e: &AExpr| matches!(e, &AExpr::IsNotNull(_));
let is_column = |e: &AExpr| matches!(e, &AExpr::Column(_));
let is_lit_true =
|e: &AExpr| matches!(e, &AExpr::Literal(LiteralValue::Boolean(true)));

let mut binary_and_count = 0;
let mut not_null_count = 0;
let mut column_count = 0;
let mut other_count = 0;
for (_, e) in iter {
if is_binary_and(e) {
binary_and_count += 1;
} else if is_column(e) {
column_count += 1;
} else if is_not_null(e) {
not_null_count += 1;
} else if is_lit_true(e) {
// do nothing
} else {
other_count += 1;
}
}
if not_null_count == column_count
&& binary_and_count < column_count
&& other_count == 0
{
let subset = aexpr_to_root_names(*predicate, expr_arena)
.iter()
.map(|s| s.to_string())
Expand All @@ -56,3 +84,76 @@ impl OptimizationRule for ReplaceDropNulls {
}
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::prelude::stack_opt::OptimizationRule;
use crate::test::fruits_cars;
use crate::utils::test::optimize_lp;

#[test]
fn test_drop_nulls_optimization() -> Result<()> {
let mut rules: Vec<Box<dyn OptimizationRule>> = vec![Box::new(ReplaceDropNulls {})];
let df = fruits_cars();

for subset in [
Some(vec![col("fruits")]),
Some(vec![col("fruits"), col("cars")]),
Some(vec![col("fruits"), col("cars"), col("A")]),
None,
] {
let lp = df.clone().lazy().drop_nulls(subset).logical_plan;
let out = optimize_lp(lp, &mut rules);
assert!(matches!(out, LogicalPlan::Udf { .. }));
}
Ok(())
}

#[test]
fn test_filter() -> Result<()> {
// This tests if the filter does not accidentally is optimized by ReplaceNulls

let data = vec![
None,
None,
None,
None,
Some(false),
Some(false),
Some(true),
Some(false),
Some(true),
Some(false),
Some(true),
Some(false),
Some(true),
Some(false),
Some(true),
Some(false),
Some(false),
None,
];
let series = Series::new("data", data);
let df = DataFrame::new(vec![series])?;

let column_name = "data";
let shift_col_1 = col(column_name)
.shift_and_fill(1, lit(true))
.lt(col(column_name));
let shift_col_neg_1 = col(column_name).shift(-1).lt(col(column_name));

let out = df
.lazy()
.with_columns(vec![
shift_col_1.clone().alias("shift_1"),
shift_col_neg_1.clone().alias("shift_neg_1"),
])
.with_column(col("shift_1").and(col("shift_neg_1")).alias("diff"))
.filter(col("diff"))
.collect()?;
assert_eq!(out.shape(), (5, 4));

Ok(())
}
}
42 changes: 5 additions & 37 deletions polars/polars-lazy/src/logical_plan/optimizer/type_coercion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,55 +205,23 @@ impl OptimizationRule for TypeCoercionRule {

#[cfg(test)]
mod test {
use crate::logical_plan::optimizer::stack_opt::{OptimizationRule, StackOptimizer};
use crate::logical_plan::optimizer::stack_opt::OptimizationRule;
use crate::prelude::*;
use crate::utils::expr_to_root_column_names;
use crate::utils::test::optimize_expr;
use polars_core::prelude::*;

fn optimize_expr(expr: Expr, schema: Schema) -> Expr {
// initialize arena's
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);
let schema = Arc::new(schema);

// dummy input needed to put the schema
let input = Box::new(LogicalPlan::Projection {
expr: vec![],
input: Box::new(Default::default()),
schema: schema.clone(),
});

let lp = LogicalPlan::Projection {
expr: vec![expr],
input,
schema,
};

let root = to_alp(lp, &mut expr_arena, &mut lp_arena);
let mut rules: Vec<Box<dyn OptimizationRule>> = vec![Box::new(TypeCoercionRule {})];

let opt = StackOptimizer {};
let lp_top = opt.optimize_loop(&mut rules, &mut expr_arena, &mut lp_arena, root);
if let LogicalPlan::Projection { mut expr, .. } =
node_to_lp(lp_top, &mut expr_arena, &mut lp_arena)
{
expr.pop().unwrap()
} else {
unreachable!()
}
}

#[test]
fn test_categorical_utf8() {
let mut rules: Vec<Box<dyn OptimizationRule>> = vec![Box::new(TypeCoercionRule {})];
let schema = Schema::new(vec![Field::new("fruits", DataType::Categorical)]);

let expr = col("fruits").eq(lit("somestr"));
let out = optimize_expr(expr.clone(), schema.clone());
let out = optimize_expr(expr.clone(), schema.clone(), &mut rules);
// we test that the fruits column is not casted to utf8 for the comparison
assert_eq!(out, expr);

let expr = col("fruits") + (lit("somestr"));
let out = optimize_expr(expr.clone(), schema);
let out = optimize_expr(expr.clone(), schema, &mut rules);
let expected = col("fruits").cast(DataType::Utf8) + lit("somestr");
assert_eq!(out, expected);
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn scan_foods_csv() -> LazyFrame {
LazyCsvReader::new(path.to_string()).finish()
}

fn fruits_cars() -> DataFrame {
pub(crate) fn fruits_cars() -> DataFrame {
df!(
"A"=> [1, 2, 3, 4, 5],
"fruits"=> ["banana", "banana", "apple", "apple", "banana"],
Expand Down
54 changes: 54 additions & 0 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,3 +334,57 @@ where
}
single_pred.expect("an empty iterator was passed")
}

#[cfg(test)]
pub(crate) mod test {
use crate::prelude::stack_opt::{OptimizationRule, StackOptimizer};
use crate::prelude::*;
use polars_core::prelude::*;

pub fn optimize_lp(lp: LogicalPlan, rules: &mut [Box<dyn OptimizationRule>]) -> LogicalPlan {
// initialize arena's
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);
let root = to_alp(lp, &mut expr_arena, &mut lp_arena);

let opt = StackOptimizer {};
let lp_top = opt.optimize_loop(rules, &mut expr_arena, &mut lp_arena, root);
node_to_lp(lp_top, &mut expr_arena, &mut lp_arena)
}

pub fn optimize_expr(
expr: Expr,
schema: Schema,
rules: &mut [Box<dyn OptimizationRule>],
) -> Expr {
// initialize arena's
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);
let schema = Arc::new(schema);

// dummy input needed to put the schema
let input = Box::new(LogicalPlan::Projection {
expr: vec![],
input: Box::new(Default::default()),
schema: schema.clone(),
});

let lp = LogicalPlan::Projection {
expr: vec![expr],
input,
schema,
};

let root = to_alp(lp, &mut expr_arena, &mut lp_arena);

let opt = StackOptimizer {};
let lp_top = opt.optimize_loop(rules, &mut expr_arena, &mut lp_arena, root);
if let LogicalPlan::Projection { mut expr, .. } =
node_to_lp(lp_top, &mut expr_arena, &mut lp_arena)
{
expr.pop().unwrap()
} else {
unreachable!()
}
}
}

0 comments on commit 8eb817d

Please sign in to comment.