Skip to content

Commit

Permalink
fix(rust): double projections should be checked on input schema (#5058)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 1, 2022
1 parent 716e3e4 commit 821fef2
Showing 1 changed file with 28 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -281,25 +281,39 @@ impl ProjectionPushDown {
use ALogicalPlan::*;

match logical_plan {
Projection {
expr,
input,
schema,
} => {
Projection { expr, input, .. } => {
// A projection can consist of a chain of expressions followed by an alias.
// We want to do the chain locally because it can have complicated side effects.
// The only thing we push down is the root name of the projection.
// So we:
// - add the root of the projections to accumulation,
// - also do the complete projection locally to keep the schema (column order) and the alias.
for e in &expr {
// in this branch we check a double projection case
// df
// .select(col("foo").alias("bar"))
// .select(col("bar")
//
// In this query, bar cannot pass this projection, as it would not exist in DF.
if !acc_projections.is_empty() {
// remove projections that are not used upstream
if projections_seen > 0 {
let input_schema = lp_arena.get(input).schema(lp_arena);
// don't do projection that is not used in upstream selection
let output_field = expr_arena
.get(*e)
.to_field(input_schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();
let is_used_upstream = projected_names.contains(output_name.as_str());
if !is_used_upstream {
continue;
}
}

// in this branch we check a double projection case
// df
// .select(col("foo").alias("bar"))
// .select(col("bar")
//
// In this query, bar cannot pass this projection, as it would not exist in DF.
// THE ORDER IS IMPORTANT HERE!
// this removes projection names, so any checks ot upstream names should
// be done befor this branch.
for (_, ae) in (&*expr_arena).iter(*e) {
if let AExpr::Alias(_, name) = ae {
if projected_names.remove(name) {
Expand All @@ -309,26 +323,6 @@ impl ProjectionPushDown {
}
}
}

// don't do projection that is not used in upstream selection
if projections_seen > 0 {
// TODO! investigate why this can fail
// TODO! make it return an option.
let output_field = expr_arena.get(*e).to_field(
schema.as_ref(),
Context::Default,
expr_arena,
);

if let Ok(output_field) = output_field {
let output_name = output_field.name();
let is_used_upstream =
projected_names.contains(output_name.as_str());
if !is_used_upstream {
continue;
}
};
}
}

add_expr_to_accumulated(
Expand Down Expand Up @@ -1058,11 +1052,10 @@ impl ProjectionPushDown {
Ok(self.finish_node(local_projection, builder))
}
HStack {
input,
mut exprs,
schema,
input, mut exprs, ..
} => {
if !acc_projections.is_empty() {
let input_schema = lp_arena.get(input).schema(lp_arena);
let mut pruned_with_cols = Vec::with_capacity(exprs.len());

// Check if output names are used upstream
Expand All @@ -1071,7 +1064,7 @@ impl ProjectionPushDown {
for node in &exprs {
let output_field = expr_arena
.get(*node)
.to_field(schema.as_ref(), Context::Default, expr_arena)
.to_field(input_schema.as_ref(), Context::Default, expr_arena)
.unwrap();
let output_name = output_field.name();

Expand Down

0 comments on commit 821fef2

Please sign in to comment.