Skip to content

Commit

Permalink
fix bug in predicate pushdown when multiple predicates have the same …
Browse files Browse the repository at this point in the history
…root column
  • Loading branch information
ritchie46 committed Jan 12, 2022
1 parent dfe86ef commit ddc0d3f
Show file tree
Hide file tree
Showing 9 changed files with 65 additions and 44 deletions.
1 change: 0 additions & 1 deletion polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ impl fmt::Debug for LogicalPlan {
if let Some(columns) = &options.with_columns {
n_columns = format!("{}", columns.len());
}
dbg!(options);
write!(
f,
"PARQUET SCAN {}; PROJECT {}/{} COLUMNS; SELECTION: {:?}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,20 @@ use std::sync::Arc;
fn process_with_columns(
path: &Path,
with_columns: &Option<Vec<String>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<String>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
) {
if let Some(with_columns) = &with_columns {
let cols = columns
.entry(path.to_owned())
.or_insert_with(PlHashSet::new);
cols.extend(with_columns.iter().cloned());
cols.extend(with_columns.iter().enumerate().map(|t| (t.0, t.1.clone())));
}
}

/// Aggregate all the projections in an LP
pub(crate) fn agg_projection(
root: Node,
columns: &mut PlHashMap<PathBuf, PlHashSet<String>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
lp_arena: &Arena<ALogicalPlan>,
) {
use ALogicalPlan::*;
Expand Down Expand Up @@ -51,7 +51,7 @@ pub(crate) fn agg_projection(
/// Due to self joins there can be multiple Scans of the same file in a LP. We already cache the scans
/// in the PhysicalPlan, but we need to make sure that the first scan has all the columns needed.
pub struct AggScanProjection {
pub columns: PlHashMap<PathBuf, PlHashSet<String>>,
pub columns: PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
}

impl AggScanProjection {
Expand Down Expand Up @@ -107,8 +107,8 @@ impl OptimizationRule for AggScanProjection {
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
});
// prevent infinite loop
if options.with_columns == with_columns {
Expand Down Expand Up @@ -153,8 +153,8 @@ impl OptimizationRule for AggScanProjection {
let mut with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
});
// prevent infinite loop
if options.with_columns == with_columns {
Expand Down Expand Up @@ -199,8 +199,8 @@ impl OptimizationRule for AggScanProjection {
let with_columns = self.columns.get(&path).map(|agg| {
let mut columns = agg.iter().cloned().collect::<Vec<_>>();
// make sure that the columns are sorted because they come from a hashmap
columns.sort_unstable_by_key(|name| schema.index_of(name).ok());
columns
columns.sort_unstable_by_key(|k| k.0);
columns.into_iter().map(|k| k.1).collect()
});
if options.with_columns == with_columns {
let lp = ALogicalPlan::CsvScan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{
aexpr_to_root_column_name, aexpr_to_root_names, check_input_node, has_aexpr,
rename_aexpr_root_name,
rename_aexpr_root_names,
};
use polars_core::datatypes::PlHashMap;
use polars_core::prelude::*;
Expand Down Expand Up @@ -161,8 +161,7 @@ where
// we were able to rename the alias column with the root column name
// before pushing down the predicate
Ok(new_name) => {
rename_aexpr_root_name(predicate, expr_arena, new_name.clone())
.unwrap();
rename_aexpr_root_names(predicate, expr_arena, new_name.clone());

insert_and_combine_predicate(
acc_predicates,
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::*;
use crate::logical_plan::Context;
use crate::prelude::utils::as_aggregated;
use crate::utils::rename_aexpr_root_name;
use crate::utils::rename_aexpr_root_names;
use polars_core::utils::{accumulate_dataframes_vertical, split_df};
use polars_core::POOL;
use rayon::prelude::*;
Expand Down Expand Up @@ -190,7 +190,7 @@ fn get_outer_agg_exprs(
let out_field = e.to_field(&schema, Context::Aggregation)?;
let out_name: Arc<str> = Arc::from(out_field.name().as_str());
let node = to_aexpr(e.clone(), &mut expr_arena);
rename_aexpr_root_name(node, &mut expr_arena, out_name.clone())?;
rename_aexpr_root_names(node, &mut expr_arena, out_name.clone());
Ok((node, out_name))
})
.collect::<Result<Vec<_>>>()?;
Expand Down
7 changes: 7 additions & 0 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
#[cfg(feature = "parquet")]
mod io;
mod optimization_checks;
mod predicate_queries;
mod projection_queries;
mod queries;

use optimization_checks::*;

use polars_core::prelude::*;
use polars_io::prelude::*;
use std::io::Cursor;
Expand All @@ -19,6 +22,10 @@ use polars_core::df;
use polars_core::export::chrono::{NaiveDate, NaiveDateTime, NaiveTime};
use std::iter::FromIterator;

static GLOB_PARQUET: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.parquet";
static GLOB_CSV: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.csv";
static GLOB_IPC: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.ipc";

fn scan_foods_csv() -> LazyFrame {
let path = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
LazyCsvReader::new(path.to_string()).finish().unwrap()
Expand Down
10 changes: 5 additions & 5 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::*;

fn projection_at_scan(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
pub(crate) fn predicate_at_scan(lp_arena: &Arena<ALogicalPlan>, lp: Node) -> bool {
(&lp_arena).iter(lp).any(|(_, lp)| {
use ALogicalPlan::*;
match lp {
Expand Down Expand Up @@ -46,7 +46,7 @@ fn test_pred_pd_1() -> Result<()> {
.filter(col("A").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(&lp_arena, lp));

// check if we understand that we can unwrap the alias
let lp = df
Expand All @@ -56,7 +56,7 @@ fn test_pred_pd_1() -> Result<()> {
.filter(col("C").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(&lp_arena, lp));

// check if we pass hstack
let lp = df
Expand All @@ -66,7 +66,7 @@ fn test_pred_pd_1() -> Result<()> {
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(projection_at_scan(&lp_arena, lp));
assert!(predicate_at_scan(&lp_arena, lp));

// check if we do not pass slice
let lp = df
Expand All @@ -75,7 +75,7 @@ fn test_pred_pd_1() -> Result<()> {
.filter(col("B").gt(lit(1)))
.optimize(&mut lp_arena, &mut expr_arena)?;

assert!(!projection_at_scan(&lp_arena, lp));
assert!(!predicate_at_scan(&lp_arena, lp));

Ok(())
}
Expand Down
24 changes: 24 additions & 0 deletions polars/polars-lazy/src/tests/predicate_queries.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use super::*;

#[test]
fn test_foo() -> Result<()> {
let mut expr_arena = Arena::with_capacity(16);
let mut lp_arena = Arena::with_capacity(8);

let lf = scan_foods_parquet(false).select([col("calories").alias("bar")]);

// this produces a predicate with two root columns, this test if we can
// deal with multiple roots
let lf = lf.filter(col("bar").gt(lit(45i32)));
let lf = lf.filter(col("bar").lt(lit(110i32)));

// also check if all predicates are combined and pushed down
let root = lf.optimize(&mut lp_arena, &mut expr_arena)?;
assert!(predicate_at_scan(&mut lp_arena, root));
// and that we don't have any filter node
assert!(!(&lp_arena)
.iter(root)
.any(|(_, lp)| matches!(lp, ALogicalPlan::Selection { .. })));

Ok(())
}
9 changes: 3 additions & 6 deletions polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,9 @@ fn test_union_and_agg_projections() -> Result<()> {
init_files();
// a union vstacks columns and aggscan optimization determines columns to aggregate in a
// hashmap, if that doesn't set them sorted the vstack will panic.
let glob = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.parquet";
let lf1 = LazyFrame::scan_parquet(glob.into(), Default::default())?;
let glob = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.ipc";
let lf2 = LazyFrame::scan_ipc(glob.into(), Default::default())?;
let glob = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.csv";
let lf3 = LazyCsvReader::new(glob.into()).finish()?;
let lf1 = LazyFrame::scan_parquet(GLOB_PARQUET.into(), Default::default())?;
let lf2 = LazyFrame::scan_ipc(GLOB_IPC.into(), Default::default())?;
let lf3 = LazyCsvReader::new(GLOB_CSV.into()).finish()?;

for lf in [lf1, lf2, lf3] {
let lf = lf.filter(col("category").eq(lit("vegetables"))).select([
Expand Down
29 changes: 12 additions & 17 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,24 +177,19 @@ pub(crate) fn aexpr_to_root_nodes(root: Node, arena: &Arena<AExpr>) -> Vec<Node>
out
}

pub(crate) fn rename_aexpr_root_name(
node: Node,
arena: &mut Arena<AExpr>,
new_name: Arc<str>,
) -> Result<()> {
/// Rename the roots of the expression to a single name.
/// Most of the times used with columns that have a single root.
/// In some cases we can have multiple roots.
/// For instance in predicate pushdown the predicates are combined by their root column
/// When combined they may be a binary expression with the same root columns
pub(crate) fn rename_aexpr_root_names(node: Node, arena: &mut Arena<AExpr>, new_name: Arc<str>) {
let roots = aexpr_to_root_nodes(node, arena);
match roots.len() {
1 => {
let node = roots[0];
arena.replace_with(node, |ae| match ae {
AExpr::Column(_) => AExpr::Column(new_name),
_ => panic!("should be only a column"),
});
Ok(())
}
_ => {
panic!("had more than one root columns");
}

for node in roots {
arena.replace_with(node, |ae| match ae {
AExpr::Column(_) => AExpr::Column(new_name.clone()),
_ => panic!("should be only a column"),
});
}
}

Expand Down

0 comments on commit ddc0d3f

Please sign in to comment.