Skip to content

Commit

Permalink
fix bug in projection pushdown on join suffixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 4, 2022
1 parent adf6e2e commit d097c04
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 21 deletions.
4 changes: 0 additions & 4 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,6 @@ mod test {
print_plans(&lf);
// implicitly checks logical plan == optimized logical plan
let df = lf.collect().unwrap();
println!("{:?}", df);
}

// check if optimization succeeds with selection
Expand All @@ -325,9 +324,7 @@ mod test {
.left_join(right.clone().lazy(), col("days"), col("days"))
.select(&[col("temp")]);

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}

// check if optimization succeeds with selection of a renamed column due to the join
Expand All @@ -339,7 +336,6 @@ mod test {

print_plans(&lf);
let df = lf.collect().unwrap();
println!("{:?}", df);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{aexpr_to_root_names, aexpr_to_root_nodes, check_input_node, has_aexpr};
use crate::utils::{
aexpr_assign_renamed_root, aexpr_to_root_names, aexpr_to_root_nodes, check_input_node,
has_aexpr,
};
use polars_core::{datatypes::PlHashSet, prelude::*};

fn init_vec() -> Vec<Node> {
Expand Down Expand Up @@ -246,15 +249,15 @@ impl ProjectionPushDown {
// the projections should all be done at the latest projection node to keep the same schema order
if projections_seen == 0 {
let schema = lp.schema(lp_arena);
for expr in expr {
for node in expr {
// Due to the pushdown, a lot of projections cannot be done anymore at the final
// node and should be skipped
if expr_arena
.get(expr)
.get(node)
.to_field(schema, Context::Default, expr_arena)
.is_ok()
{
local_projection.push(expr);
local_projection.push(node);
}
}
// only aliases should be projected locally in the rest of the projections.
Expand Down Expand Up @@ -718,13 +721,7 @@ impl ProjectionPushDown {
if names_right.insert(Arc::from(downwards_name)) {
pushdown_right.push(downwards_name_column);
}

// locally we project and alias
let projection = expr_arena.add(AExpr::Alias(
downwards_name_column,
Arc::from(format!("{}{}", downwards_name, suffix)),
));
local_projection.push(projection);
local_projection.push(proj);
}
} else if add_local {
// always also do the projection locally, because the join columns may not be
Expand Down Expand Up @@ -758,12 +755,43 @@ impl ProjectionPushDown {
expr_arena,
)?;

let builder = ALogicalPlanBuilder::new(input_left, expr_arena, lp_arena).join(
input_right,
left_on,
right_on,
options,
);
// Because we do a projection pushdown
// We may influence the suffixes.
// For instance if a join would have created a schema
//
// "foo", "foo_right"
//
// but we only project the "foo_right" column, the join will not produce
// a "name_right" because we did not project its left name duplicate "foo"
//
// The code below checks if can do the suffixed projections on the schema that
// we have after the join. If we cannot then we modify the projection:
//
// col("foo_right") to col("foo").alias("foo_right")
let suffix = options.suffix.clone();

let alp = ALogicalPlanBuilder::new(input_left, expr_arena, lp_arena)
.join(input_right, left_on, right_on, options)
.build();
let schema_after_join = alp.schema(lp_arena);

for proj in &mut local_projection {
for name in aexpr_to_root_names(*proj, expr_arena) {
if name.contains(suffix.as_ref())
&& schema_after_join.column_with_name(&*name).is_none()
{
let new_name = &name.as_ref()[..name.len() - suffix.len()];

let renamed =
aexpr_assign_renamed_root(*proj, expr_arena, &*name, new_name);

let aliased = expr_arena.add(AExpr::Alias(renamed, name));
*proj = aliased;
}
}
}
let root = lp_arena.add(alp);
let builder = ALogicalPlanBuilder::new(root, expr_arena, lp_arena);
Ok(self.finish_node(local_projection, builder))
}
HStack { input, exprs, .. } => {
Expand Down
29 changes: 29 additions & 0 deletions polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,32 @@ fn test_join_suffix_and_drop() -> Result<()> {

Ok(())
}

#[test]
fn test_cross_join_pd() -> Result<()> {
let food = df![
"name"=> ["Omelette", "Fried Egg"],
"price" => [8, 5]
]?;

let drink = df![
"name" => ["Orange Juice", "Tea"],
"price" => [5, 4]
]?;

let q = food.lazy().cross_join(drink.lazy()).select([
col("name").alias("food"),
col("name_right").alias("beverage"),
(col("price") + col("price_right")).alias("total"),
]);

let out = q.collect()?;
let expected = df![
"food" => ["Omelette", "Omelette", "Fried Egg", "Fried Egg"],
"beverage" => ["Orange Juice", "Tea", "Orange Juice", "Tea"],
"total" => [13, 12, 10, 9]
]?;

assert!(out.frame_equal(&expected));
Ok(())
}
21 changes: 21 additions & 0 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,27 @@ pub(crate) fn rename_aexpr_root_names(node: Node, arena: &mut Arena<AExpr>, new_
}
}

/// Rename the root of the expression from `current` to `new` and assign to new node in arena.
/// Returns `Node` on first sucessful rename.
pub(crate) fn aexpr_assign_renamed_root(
node: Node,
arena: &mut Arena<AExpr>,
current: &str,
new_name: &str,
) -> Node {
let roots = aexpr_to_root_nodes(node, arena);

for node in roots {
match arena.get(node) {
AExpr::Column(name) if &**name == current => {
return arena.add(AExpr::Column(Arc::from(new_name)))
}
_ => {}
}
}
panic!("should be a root column that is renamed");
}

/// Get all root column expressions in the expression tree.
pub(crate) fn expr_to_root_column_exprs(expr: &Expr) -> Vec<Expr> {
let mut out = vec![];
Expand Down

0 comments on commit d097c04

Please sign in to comment.