Skip to content

Commit

Permalink
fix(rust, python): deal with unnest schema expansion in projection pd (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 5, 2023
1 parent a609f25 commit 2feb833
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub(super) fn process_groupby(
acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
false,
);

// add the columns used in the aggregations to the projection only if they are used upstream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ pub(super) fn process_hstack(
acc_projections,
&lp_arena.get(input).schema(lp_arena),
expr_arena,
false,
);

proj_pd.pushdown_and_assign(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub(super) fn process_melt(
acc_projections,
lp_arena.get(input).schema(lp_arena).as_ref(),
expr_arena,
false,
);

if !local_projections.is_empty() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,16 @@ fn get_scan_columns(
///
/// # Returns
/// accumulated_projections, local_projections, accumulated_names
///
/// - `expands_schema`. An unnest adds more columns to a schema, so we cannot use fast path
fn split_acc_projections(
acc_projections: Vec<Node>,
down_schema: &Schema,
expr_arena: &mut Arena<AExpr>,
expands_schema: bool,
) -> (Vec<Node>, Vec<Node>, PlHashSet<Arc<str>>) {
// If node above has as many columns as the projection there is nothing to pushdown.
if down_schema.len() == acc_projections.len() {
if !expands_schema && down_schema.len() == acc_projections.len() {
let local_projections = acc_projections;
(vec![], local_projections, PlHashSet::new())
} else {
Expand Down Expand Up @@ -254,11 +257,14 @@ impl ProjectionPushDown {
projections_seen: usize,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
// an unnest changes/expands the schema
expands_schema: bool,
) -> PolarsResult<Vec<Node>> {
let alp = lp_arena.take(input);
let down_schema = alp.schema(lp_arena);

let (acc_projections, local_projections, names) =
split_acc_projections(acc_projections, &down_schema, expr_arena);
split_acc_projections(acc_projections, &down_schema, expr_arena, expands_schema);

let lp = self.push_down(
alp,
Expand Down Expand Up @@ -687,6 +693,7 @@ impl ProjectionPushDown {
projections_seen,
lp_arena,
expr_arena,
false,
)?;

let mut new_schema = lp_arena
Expand Down Expand Up @@ -720,6 +727,9 @@ impl ProjectionPushDown {
function: function.clone(),
};
if function.allow_projection_pd() && !acc_projections.is_empty() {
let original_acc_projection_len = acc_projections.len();

// add columns needed for the function.
for name in function.additional_projection_pd_columns() {
let node = expr_arena.add(AExpr::Column(name.clone()));
add_expr_to_accumulated(
Expand All @@ -729,20 +739,21 @@ impl ProjectionPushDown {
expr_arena,
)
}
let expands_schema = matches!(function, FunctionNode::Unnest { .. });

let acc_projection_len = acc_projections.len();
let local_projections = self.pushdown_and_assign_check_schema(
input,
acc_projections,
projections_seen,
lp_arena,
expr_arena,
expands_schema,
)?;
if local_projections.is_empty() {
Ok(lp)
} else {
// if we would project, we would remove pushed down predicates
if local_projections.len() < acc_projection_len {
if local_projections.len() < original_acc_projection_len {
Ok(ALogicalPlanBuilder::from_lp(lp, expr_arena, lp_arena)
.with_columns(local_projections)
.build())
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-lazy/polars-plan/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,8 @@ pub(crate) fn check_input_node(
input_schema: &Schema,
expr_arena: &Arena<AExpr>,
) -> bool {
aexpr_to_leaf_names(node, expr_arena)
.iter()
.all(|name| input_schema.index_of(name).is_some())
aexpr_to_leaf_names_iter(node, expr_arena)
.all(|name| input_schema.index_of(name.as_ref()).is_some())
}

pub(crate) fn aexprs_to_schema(
Expand Down
19 changes: 19 additions & 0 deletions polars/tests/it/lazy/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,22 @@ fn test_projection_5086() -> PolarsResult<()> {

Ok(())
}

#[test]
fn test_unnest_pushdown() -> PolarsResult<()> {
let df = df![
"collection" => Series::full_null("", 1, &DataType::Int32),
"users" => Series::full_null("", 1, &DataType::List(Box::new(DataType::Struct(vec![Field::new("email", DataType::Utf8)])))),
]?;

let out = df
.lazy()
.explode(["users"])
.unnest(["users"])
.select([col("email")])
.collect()?;

assert_eq!(out.get_column_names(), &["email"]);

Ok(())
}

0 comments on commit 2feb833

Please sign in to comment.