Skip to content

Commit

Permalink
fix(rust, python): don't run cse cache_states if no projections found (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 7, 2023
1 parent cf70386 commit 822e8c9
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ pub(super) fn set_cache_states(
// and finally remove that last projection and stitch the subplan
// back to the cache node again
if !cache_schema_and_children.is_empty() {
let pd = projection_pushdown::ProjectionPushDown {};
let mut pd = ProjectionPushDown::new();
for (_cache_id, (children, columns)) in cache_schema_and_children {
if !columns.is_empty() {
let projection = columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,14 @@ pub fn optimize(

// should be run before predicate pushdown
if projection_pushdown {
let projection_pushdown_opt = ProjectionPushDown {};
let mut projection_pushdown_opt = ProjectionPushDown::new();
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed);

if projection_pushdown_opt.changed {
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, scratch, cse_changed);
}
}

if predicate_pushdown {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_generic(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
lp: ALogicalPlan,
acc_projections: Vec<Node>,
projected_names: PlHashSet<Arc<str>>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_groupby(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
keys: Vec<Node>,
aggs: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_hstack(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
mut exprs: Vec<Node>,
mut acc_projections: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn add_nodes_to_accumulated_state(

#[allow(clippy::too_many_arguments)]
pub(super) fn process_join(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input_left: Node,
input_right: Node,
left_on: Vec<Node>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_melt(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
args: Arc<MeltArgs>,
schema: SchemaRef,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,18 @@ fn update_scan_schema(
Ok(new_schema)
}

pub struct ProjectionPushDown {}
pub struct ProjectionPushDown {
pub(crate) changed: bool,
}

impl ProjectionPushDown {
pub(super) fn new() -> Self {
Self { changed: false }
}

/// Projection will be done at this node, but we continue optimization
fn no_pushdown_restart_opt(
&self,
&mut self,
lp: ALogicalPlan,
acc_projections: Vec<Node>,
projections_seen: usize,
Expand Down Expand Up @@ -177,7 +183,7 @@ impl ProjectionPushDown {
}

fn finish_node(
&self,
&mut self,
local_projections: Vec<Node>,
builder: ALogicalPlanBuilder,
) -> ALogicalPlan {
Expand All @@ -190,7 +196,7 @@ impl ProjectionPushDown {

#[allow(clippy::too_many_arguments)]
fn join_push_down(
&self,
&mut self,
schema_left: &Schema,
schema_right: &Schema,
proj: Node,
Expand Down Expand Up @@ -224,7 +230,7 @@ impl ProjectionPushDown {

/// This pushes down current node and assigns the result to this node.
fn pushdown_and_assign(
&self,
&mut self,
input: Node,
acc_projections: Vec<Node>,
names: PlHashSet<Arc<str>>,
Expand All @@ -251,7 +257,7 @@ impl ProjectionPushDown {
///
/// The local projections are return and still have to be applied
fn pushdown_and_assign_check_schema(
&self,
&mut self,
input: Node,
acc_projections: Vec<Node>,
projections_seen: usize,
Expand Down Expand Up @@ -290,7 +296,7 @@ impl ProjectionPushDown {
/// * `expr_arena` - The local memory arena for the expressions.
///
fn push_down(
&self,
&mut self,
logical_plan: ALogicalPlan,
mut acc_projections: Vec<Node>,
mut projected_names: PlHashSet<Arc<str>>,
Expand All @@ -301,16 +307,19 @@ impl ProjectionPushDown {
use ALogicalPlan::*;

match logical_plan {
Projection { expr, input, .. } => process_projection(
self,
input,
expr,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
),
Projection { expr, input, .. } => {
self.changed = true;
process_projection(
self,
input,
expr,
acc_projections,
projected_names,
projections_seen,
lp_arena,
expr_arena,
)
}
LocalProjection { expr, input, .. } => {
self.pushdown_and_assign(
input,
Expand Down Expand Up @@ -804,7 +813,7 @@ impl ProjectionPushDown {
}

pub fn optimize(
&self,
&mut self,
logical_plan: ALogicalPlan,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::*;

#[allow(clippy::too_many_arguments)]
pub(super) fn process_projection(
proj_pd: &ProjectionPushDown,
proj_pd: &mut ProjectionPushDown,
input: Node,
exprs: Vec<Node>,
mut acc_projections: Vec<Node>,
Expand Down
27 changes: 27 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import re
from datetime import date

import polars as pl

Expand Down Expand Up @@ -31,3 +32,29 @@ def test_union_duplicates() -> None:
)
== 10
)


def test_cse_schema_6081() -> None:
df = pl.DataFrame(
data=[
[date(2022, 12, 12), 1, 1],
[date(2022, 12, 12), 1, 2],
[date(2022, 12, 13), 5, 2],
],
columns=["date", "id", "value"],
orient="row",
).lazy()

min_value_by_group = df.groupby(["date", "id"]).agg(
pl.col("value").min().alias("min_value")
)

result = df.join(min_value_by_group, on=["date", "id"], how="left")
assert result.collect(
common_subplan_elimination=True, projection_pushdown=True
).to_dict(False) == {
"date": [date(2022, 12, 12), date(2022, 12, 12), date(2022, 12, 13)],
"id": [1, 1, 5],
"value": [1, 2, 2],
"min_value": [1, 1, 2],
}

0 comments on commit 822e8c9

Please sign in to comment.