Skip to content

Commit

Permalink
fix bug in predicate pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 16, 2022
1 parent 1727b2b commit aff81cd
Show file tree
Hide file tree
Showing 14 changed files with 224 additions and 178 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ name = "polars"
version = "0.18.0"
authors = ["ritchie46 <ritchie46@gmail.com>"]
edition = "2018"
keywords = ["dataframe", "query-engine", "arrow"]
license = "MIT"
readme = "../README.md"
repository = "https://github.com/pola-rs/polars"
Expand Down
5 changes: 5 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ impl DataFrame {
&self.columns
}

#[cfg(feature = "private")]
pub fn get_columns_mut(&mut self) -> &mut Vec<Series> {
&mut self.columns
}

/// Iterator over the columns as `Series`.
///
/// # Example
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -866,7 +866,7 @@ impl Expr {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: false,
auto_explode: false,
fmt_str: "",
fmt_str: "map",
},
}
}
Expand Down
44 changes: 26 additions & 18 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,25 +280,12 @@ impl LazyFrame {
}

/// Rename a column in the DataFrame
#[deprecated(note = "use rename")]
pub fn with_column_renamed(self, existing_name: &str, new_name: &str) -> Self {
let schema = self.logical_plan.schema();
let schema = schema
.rename(&[existing_name], &[new_name])
.expect("cannot rename non existing column");

// first make sure that the column is projected, then we
let init = self.with_column(col(existing_name));

let existing_name = existing_name.to_string();
let new_name = new_name.to_string();
let f = move |mut df: DataFrame| {
df.rename(&existing_name, &new_name)?;
Ok(df)
};
init.map(f, Some(AllowedOptimizations::default()), Some(schema))
self.rename([existing_name], [new_name])
}

/// Rename columns in the DataFrame. This does not preserve ordering.
/// Rename columns in the DataFrame.
pub fn rename<I, J, T, S>(self, existing: I, new: J) -> Self
where
I: IntoIterator<Item = T> + Clone,
Expand All @@ -311,14 +298,35 @@ impl LazyFrame {
.map(|name| name.as_ref().to_string())
.collect();

let new: Vec<String> = new
.into_iter()
.map(|name| name.as_ref().to_string())
.collect();

self.with_columns(
existing
.iter()
.zip(new)
.zip(&new)
.map(|(old, new)| col(old).alias(new.as_ref()))
.collect::<Vec<_>>(),
)
.drop_columns_impl(&existing)
.map(
move |mut df: DataFrame| {
let cols = df.get_columns_mut();
for (existing, new) in existing.iter().zip(new.iter()) {
let idx_a = cols
.iter()
.position(|s| s.name() == existing.as_str())
.unwrap();
let idx_b = cols.iter().position(|s| s.name() == new.as_str()).unwrap();
cols.swap(idx_a, idx_b);
}
cols.truncate(cols.len() - existing.len());
Ok(df)
},
None,
None,
)
}

/// Removes columns from the DataFrame.
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ impl LogicalPlanBuilder {

let right_names: HashSet<_, RandomState> = right_on
.iter()
.map(|e| utils::output_name(e).expect("could not find name"))
.map(|e| utils::expr_output_name(e).expect("could not find name"))
.collect();

for f in schema_right.fields() {
Expand Down
11 changes: 8 additions & 3 deletions polars/polars-lazy/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::prelude::*;
use std::borrow::Cow;
use std::fmt;
use std::fmt::{Debug, Formatter};

Expand Down Expand Up @@ -91,11 +92,15 @@ impl fmt::Debug for LogicalPlan {
if let Some(columns) = projection {
n_columns = format!("{}", columns.len());
}
let selection = match selection {
Some(s) => Cow::Owned(format!("{:?}", s)),
None => Cow::Borrowed("None"),
};

write!(
f,
"TABLE: {:?}; PROJECT {}/{} COLUMNS; SELECTION: {:?}\\n
PROJECTION: {:?}",
"MEMTABLE: {:?};\n\tproject {}/{} columns\t|\tdetails: {:?};\n\
\tselection: {:?}\n\n",
schema
.fields()
.iter()
Expand All @@ -104,8 +109,8 @@ impl fmt::Debug for LogicalPlan {
.collect::<Vec<_>>(),
n_columns,
total_columns,
projection,
selection,
projection
)
}
Projection { expr, input, .. } => {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
mod utils;

use crate::logical_plan::optimizer::ALogicalPlanBuilder;
use crate::logical_plan::{optimizer, Context};
use crate::prelude::*;
use crate::utils::{aexpr_to_root_names, aexprs_to_schema, check_input_node, has_aexpr};
Expand Down Expand Up @@ -46,59 +45,73 @@ impl PredicatePushDown {
fn pushdown_and_continue(
&self,
lp: ALogicalPlan,
acc_predicates: PlHashMap<Arc<str>, Node>,
mut acc_predicates: PlHashMap<Arc<str>, Node>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
has_projections: bool,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

// we should get pass these projections
if has_projections
&& exprs
if has_projections {
// we should not pass these projections
if exprs
.iter()
.any(|e_n| is_pushdown_boundary(*e_n, expr_arena))
{
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
}
{
return self.no_pushdown_restart_opt(lp, acc_predicates, lp_arena, expr_arena);
}

let mut local_predicates = Vec::with_capacity(acc_predicates.len());
// projections should only have a single input.
assert_eq!(inputs.len(), 1);
let input = inputs[0];
let (local_predicates, projections) =
rewrite_projection_node(expr_arena, lp_arena, &mut acc_predicates, exprs, input);

// determine new inputs by pushing down predicates
let new_inputs = inputs
.iter()
.map(|&node| {
// first we check if we are able to push down the predicate pass this node
// it could be that this node just added the column where we base the predicate on
let input_schema = lp_arena.get(node).schema(lp_arena);
let mut pushdown_predicates = optimizer::init_hashmap();
for &predicate in acc_predicates.values() {
// we can pushdown the predicate
if check_input_node(predicate, input_schema, expr_arena) {
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
&mut pushdown_predicates,
name,
predicate,
expr_arena,
)
}
// we cannot pushdown the predicate we do it here
else {
local_predicates.push(predicate);
let alp = lp_arena.take(input);
let alp = self.push_down(alp, acc_predicates, lp_arena, expr_arena)?;
lp_arena.replace(input, alp);

let lp = lp.from_exprs_and_input(projections, inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
} else {
let mut local_predicates = Vec::with_capacity(acc_predicates.len());

// determine new inputs by pushing down predicates
let new_inputs = inputs
.iter()
.map(|&node| {
// first we check if we are able to push down the predicate pass this node
// it could be that this node just added the column where we base the predicate on
let input_schema = lp_arena.get(node).schema(lp_arena);
let mut pushdown_predicates = optimizer::init_hashmap();
for &predicate in acc_predicates.values() {
// we can pushdown the predicate
if check_input_node(predicate, input_schema, expr_arena) {
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
&mut pushdown_predicates,
name,
predicate,
expr_arena,
)
}
// we cannot pushdown the predicate we do it here
else {
local_predicates.push(predicate);
}
}
}

let alp = lp_arena.take(node);
let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;
let alp = lp_arena.take(node);
let alp = self.push_down(alp, pushdown_predicates, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;

let lp = lp.from_exprs_and_input(exprs, new_inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
let lp = lp.from_exprs_and_input(exprs, new_inputs);
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
}

/// Filter will be done at this node, but we continue optimization
Expand Down Expand Up @@ -155,39 +168,6 @@ impl PredicatePushDown {
let alp = lp_arena.take(input);
self.push_down(alp, acc_predicates, lp_arena, expr_arena)
}

Projection {
expr,
input,
schema,
} => {
for node in &expr {
if is_pushdown_boundary(*node, expr_arena) {
let lp = ALogicalPlanBuilder::new(input, expr_arena, lp_arena)
.project(expr)
.build();
// do all predicates here
let local_predicates = acc_predicates.into_iter().map(|(_, v)| v).collect();
return Ok(self.optional_apply_predicate(
lp,
local_predicates,
lp_arena,
expr_arena,
));
}
}

let (local_predicates, expr) =
rewrite_projection_node(expr_arena, lp_arena, &mut acc_predicates, expr, input);
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;

let lp = ALogicalPlan::Projection {
expr,
input,
schema,
};
Ok(self.optional_apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
DataFrameScan {
df,
schema,
Expand Down Expand Up @@ -444,7 +424,7 @@ impl PredicatePushDown {
predicate_pd: true, ..
} = lp
{
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
} else {
Ok(lp)
}
Expand All @@ -453,7 +433,7 @@ impl PredicatePushDown {
lp @ Cache { .. } | lp @ Union { .. } | lp @ Sort { .. } => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, false)
}
lp @ HStack {..} => {
lp @ HStack {..} | lp @ Projection {..} => {
self.pushdown_and_continue(lp, acc_predicates, lp_arena, expr_arena, true)
}
// NOT Pushed down passed these nodes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ pub(super) fn rewrite_projection_node(
expr_arena: &mut Arena<AExpr>,
lp_arena: &mut Arena<ALogicalPlan>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
expr: Vec<Node>,
projections: Vec<Node>,
input: Node,
) -> (Vec<Node>, Vec<Node>)
where
Expand All @@ -150,9 +150,9 @@ where
// maybe update predicate name if a projection is an alias
// aliases change the column names and because we push the predicates downwards
// this may be problematic as the aliased column may not yet exist.
for node in &expr {
for projection_node in &projections {
{
let e = expr_arena.get(*node);
let e = expr_arena.get(*projection_node);
if let AExpr::Alias(e, name) = e {
// if this alias refers to one of the predicates in the upper nodes
// we rename the column of the predicate before we push it downwards.
Expand All @@ -178,14 +178,14 @@ where
}
}

let e = expr_arena.get(*node);
let e = expr_arena.get(*projection_node);
let input_schema = lp_arena.get(input).schema(lp_arena);

// we check if predicates can be done on the input above
// with the following conditions:

// 1. predicate based on current column may only pushed down if simple projection, e.g. col() / col().alias()
let expr_depth = (&*expr_arena).iter(*node).count();
let expr_depth = (&*expr_arena).iter(*projection_node).count();
let is_computation = if let AExpr::Alias(_, _) = e {
expr_depth > 2
} else {
Expand Down Expand Up @@ -216,14 +216,14 @@ where

// remove predicates that are based on column modifications
no_pushdown_preds(
*node,
*projection_node,
expr_arena,
|e| matches!(e, AExpr::Explode(_)) || matches!(e, AExpr::Ternary { .. }),
&mut local_predicates,
acc_predicates,
);
}
(local_predicates, expr)
(local_predicates, projections)
}

pub(super) fn no_pushdown_preds<F>(
Expand Down

0 comments on commit aff81cd

Please sign in to comment.