Skip to content

Commit

Permalink
make sure that projection pushdown checks input schema of black box udf
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 9, 2022
1 parent f73e8fe commit f7c5062
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 9 deletions.
13 changes: 13 additions & 0 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,19 @@ impl<'a> ALogicalPlanBuilder<'a> {
}
}

pub(crate) fn from_lp(
lp: ALogicalPlan,
expr_arena: &'a mut Arena<AExpr>,
lp_arena: &'a mut Arena<ALogicalPlan>,
) -> Self {
let root = lp_arena.add(lp);
ALogicalPlanBuilder {
root,
expr_arena,
lp_arena,
}
}

pub fn melt(self, id_vars: Arc<Vec<String>>, value_vars: Arc<Vec<String>>) -> Self {
let schema = det_melt_schema(&value_vars, self.schema());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl ProjectionPushDown {
pushed_at_least_one
}

/// Helper method. This pushes down current node and assigns the result to this node.
/// This pushes down current node and assigns the result to this node.
fn pushdown_and_assign(
&self,
input: Node,
Expand All @@ -176,6 +176,34 @@ impl ProjectionPushDown {
Ok(())
}

/// This pushes down the projection that are validated
/// that they can be done successful at the schema above
/// The result is assigned to this node.
fn pushdown_and_assign_check_schema(
&self,
input: Node,
acc_projections: Vec<Node>,
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<Vec<Node>> {
let alp = lp_arena.take(input);
let down_schema = alp.schema(lp_arena);
let (acc_projections, local_projections, names) =
split_acc_projections(acc_projections, down_schema, expr_arena);

let lp = self.push_down(
alp,
acc_projections,
names,
projections_seen,
lp_arena,
expr_arena,
)?;
lp_arena.replace(input, lp);
Ok(local_projections)
}

/// Projection pushdown optimizer
///
/// # Arguments
Expand Down Expand Up @@ -833,22 +861,30 @@ impl ProjectionPushDown {
options,
schema,
} => {
let lp = Udf {
input,
function,
options,
schema,
};
if options.projection_pd {
self.pushdown_and_assign(
let local_projections = self.pushdown_and_assign_check_schema(
input,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)?;
if local_projections.is_empty() {
Ok(lp)
} else {
Ok(ALogicalPlanBuilder::from_lp(lp, expr_arena, lp_arena)
.project(local_projections)
.build())
}
} else {
Ok(lp)
}
Ok(Udf {
input,
function,
options,
schema,
})
}
// Slice and Cache have only inputs and exprs, so we can use same logic.
lp @ Slice { .. } | lp @ Cache { .. } | lp @ Union { .. } => {
Expand Down
18 changes: 18 additions & 0 deletions polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,21 @@ fn test_cross_join_pd() -> Result<()> {
assert!(out.frame_equal(&expected));
Ok(())
}

#[test]
fn test_row_count_pd() -> Result<()> {
let df = df![
"x" => [1, 2, 3],
"y" => [3, 2, 1],
]?;

let df = df
.lazy()
.with_row_count("row_count", None)
.select([col("row_count"), col("x") * lit(3i32)])
.collect()?;

dbg!(df);

Ok(())
}

0 comments on commit f7c5062

Please sign in to comment.