Skip to content

Commit

Permalink
cleanup predicate pushdown
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 17, 2021
1 parent 065f9aa commit 1020626
Showing 1 changed file with 74 additions and 113 deletions.
187 changes: 74 additions & 113 deletions polars/polars-lazy/src/logical_plan/optimizer/predicate_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,17 @@ fn roots_to_key(roots: &[Arc<String>]) -> Arc<String> {
}
}

fn get_insertion_name(expr_arena: &Arena<AExpr>, predicate: Node, schema: &Schema) -> Arc<String> {
Arc::new(
expr_arena
.get(predicate)
.to_field(schema, Context::Default, expr_arena)
.unwrap()
.name()
.clone(),
)
}

pub(crate) struct PredicatePushDown {}

impl Default for PredicatePushDown {
Expand All @@ -116,25 +127,38 @@ fn no_pushdown_preds<F>(
let columns = aexpr_to_root_names(node, &arena);
debug_assert_eq!(columns.len(), 1);

// keep track of the predicates that should be removed from pushed down predicates
// these predicates will be added to local predicates
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

for (key, predicate) in &*acc_predicates {
let root_names = aexpr_to_root_names(*predicate, arena);
let condition = |name: Arc<String>| columns.contains(&name);
local_predicates.extend(transfer_to_local(arena, acc_predicates, condition));
}
}

for name in root_names {
if columns.contains(&name) {
remove_keys.push(key.clone());
continue;
}
/// Transfer a predicate from `acc_predicates` that will be pushed down
/// to a local_predicates vec based on a condition.
fn transfer_to_local<F>(
expr_arena: &Arena<AExpr>,
acc_predicates: &mut HashMap<Arc<String>, Node, RandomState>,
mut condition: F,
) -> Vec<Node>
where
F: FnMut(Arc<String>) -> bool,
{
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

for (key, predicate) in &*acc_predicates {
let root_names = aexpr_to_root_names(*predicate, expr_arena);
for name in root_names {
if condition(name) {
remove_keys.push(key.clone());
continue;
}
}
for key in remove_keys {
let pred = acc_predicates.remove(&*key).expect("we know it exists");
local_predicates.push(pred);
}
}
let mut local_predicates = Vec::with_capacity(remove_keys.len());
for key in remove_keys {
let pred = acc_predicates.remove(&*key).unwrap();
local_predicates.push(pred)
}
local_predicates
}

impl PredicatePushDown {
Expand Down Expand Up @@ -283,24 +307,12 @@ impl PredicatePushDown {
schema,
} => {
// predicates that will be done at this level
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

for (key, predicate) in &acc_predicates {
let root_names = aexpr_to_root_names(*predicate, expr_arena);
for name in root_names {
if (&*name == "variable")
|| (&*name == "value")
|| value_vars.contains(&*name)
{
remove_keys.push(key.clone());
}
}
}
let mut local_predicates = Vec::with_capacity(remove_keys.len());
for key in remove_keys {
let pred = acc_predicates.remove(&*key).unwrap();
local_predicates.push(pred)
}
let condition = |name: Arc<String>| {
let name = &*name;
name == "variable" || name == "value" || value_vars.contains(name)
};
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;

Expand Down Expand Up @@ -383,32 +395,14 @@ impl PredicatePushDown {
Ok(lp)
}
Explode { input, columns } => {
// we remove predicates that are done in one of the exploded columns.
let mut remove_keys = Vec::with_capacity(acc_predicates.len());

for (key, predicate) in &acc_predicates {
let root_names = aexpr_to_root_names(*predicate, expr_arena);
for name in root_names {
if columns.contains(&*name) {
remove_keys.push(key.clone());
continue;
}
}
}
let mut local_predicates = Vec::with_capacity(remove_keys.len());
for key in remove_keys {
let pred = acc_predicates.remove(&*key).unwrap();
local_predicates.push(pred)
}
let condition = |name: Arc<String>| columns.contains(&*name);
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = Explode { input, columns };
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
Cache { input } => {
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
Ok(Cache { input })
}
Distinct {
input,
subset,
Expand All @@ -418,19 +412,22 @@ impl PredicatePushDown {
// this may have influence on the pushed down predicates. If the pushed down predicates
// contain a binary expression (thus depending on values in multiple columns)
// the final result may differ if it is pushed down.
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
let mut new_acc_predicates = optimizer::init_hashmap();

for (name, predicate) in acc_predicates {
let matches = |e: &AExpr| matches!(e, AExpr::BinaryExpr { .. });
if has_aexpr(predicate, expr_arena, matches) {
local_predicates.push(predicate)
let mut root_count = 0;

// if this condition is called more than once, its a binary or ternary operation.
let condition = |_| {
if root_count == 0 {
root_count += 1;
false
} else {
new_acc_predicates.insert(name, predicate);
true
}
}
};
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

self.pushdown_and_assign(input, new_acc_predicates, lp_arena, expr_arena)?;
self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = Distinct {
input,
maintain_order,
Expand Down Expand Up @@ -487,14 +484,7 @@ impl PredicatePushDown {

// no else if. predicate can be in both tables.
if check_down_node(predicate, schema_left, expr_arena) {
let name = Arc::new(
expr_arena
.get(predicate)
.to_field(schema_left, Context::Default, expr_arena)
.unwrap()
.name()
.clone(),
);
let name = get_insertion_name(expr_arena, predicate, schema_left);
insert_and_combine_predicate(
&mut pushdown_left,
name,
Expand All @@ -504,14 +494,7 @@ impl PredicatePushDown {
filter_left = true;
}
if check_down_node(predicate, schema_right, expr_arena) {
let name = Arc::new(
expr_arena
.get(predicate)
.to_field(schema_right, Context::Default, expr_arena)
.unwrap()
.name()
.clone(),
);
let name = get_insertion_name(expr_arena, predicate, schema_right);
insert_and_combine_predicate(
&mut pushdown_right,
name,
Expand Down Expand Up @@ -551,21 +534,6 @@ impl PredicatePushDown {
Ok(self.apply_predicate(lp, local_predicates, lp_arena, expr_arena))
}
HStack { input, exprs, .. } => {
// local predicates will be executed in this node of the LP
let len = acc_predicates.len();
let mut local_predicates = Vec::with_capacity(len);
let mut local_keys = Vec::with_capacity(len);

for (key, predicate) in &acc_predicates {
if !check_down_node(
*predicate,
lp_arena.get(input).schema(lp_arena),
expr_arena,
) {
local_keys.push(key.clone());
}
}

// First we get all names of added columns in this HStack operation
// and then we remove the predicates from the eligible container if they are
// dependent on data we've added in this node.
Expand Down Expand Up @@ -593,18 +561,18 @@ impl PredicatePushDown {
added_cols.push(name);
}
}
// remove predicates that are dependent on columns added in this HStack.
for key in acc_predicates.keys() {
if added_cols.contains(key) {
local_keys.push(key.clone())
}
}

for key in local_keys {
if let Some(val) = acc_predicates.remove(&key) {
local_predicates.push(val);
}
}
let condition = |name: Arc<String>| {
// remove predicates that are dependent on columns added in this HStack.
added_cols.contains(&name)
|| lp_arena
.get(input)
.schema(lp_arena)
.field_with_name(&*name)
.is_err()
};
let local_predicates =
transfer_to_local(expr_arena, &mut acc_predicates, condition);

self.pushdown_and_assign(input, acc_predicates, lp_arena, expr_arena)?;
let lp = ALogicalPlanBuilder::new(input, expr_arena, lp_arena)
Expand All @@ -627,14 +595,7 @@ impl PredicatePushDown {
let mut local_predicates = Vec::with_capacity(acc_predicates.len());
for (_, predicate) in acc_predicates {
if check_down_node(predicate, input_schema, expr_arena) {
let name = Arc::new(
expr_arena
.get(predicate)
.to_field(input_schema, Context::Default, expr_arena)
.unwrap()
.name()
.clone(),
);
let name = get_insertion_name(expr_arena, predicate, input_schema);
insert_and_combine_predicate(
&mut pushdown_predicates,
name,
Expand Down

0 comments on commit 1020626

Please sign in to comment.