Skip to content

Commit

Permalink
[LAZY] slice pushdown
Browse files Browse the repository at this point in the history
Make slice pushdown optimization also work for simple projectons
like 'col("foo"), 'col("bar").alias("spam").
These projections will not block the pusdhown.
  • Loading branch information
ritchie46 committed Jan 12, 2022
1 parent 1995dbc commit eaedab6
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 64 deletions.
160 changes: 96 additions & 64 deletions polars/polars-lazy/src/logical_plan/optimizer/slice_pushdown.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::prelude::*;
use crate::utils::aexpr_is_simple_projection;
use polars_core::prelude::*;

pub(crate) struct SlicePushDown {}
Expand All @@ -10,7 +11,80 @@ struct State {
}

impl SlicePushDown {
fn push_down(
// slice will be done at this node if we found any
// we also stop optimization
fn no_pushdown_finish_opt(
&self,
lp: ALogicalPlan,
state: Option<State>,
lp_arena: &mut Arena<ALogicalPlan>,
) -> Result<ALogicalPlan> {
match state {
Some(state) => {
let input = lp_arena.add(lp);

let lp = ALogicalPlan::Slice {
input,
offset: state.offset,
len: state.len,
};
Ok(lp)
}
None => Ok(lp),
}
}

/// slice will be done at this node, but we continue optimization
fn no_pushdown_restart_opt(
&self,
lp: ALogicalPlan,
state: Option<State>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
// No state, so we do not push down the slice here.
let state = None;
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;
let lp = lp.from_exprs_and_input(exprs, new_inputs);

self.no_pushdown_finish_opt(lp, state, lp_arena)
}

/// slice will be pushed down.
fn pushdown_and_continue(
&self,
lp: ALogicalPlan,
state: Option<State>,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp = self.pushdown(alp, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;
Ok(lp.from_exprs_and_input(exprs, new_inputs))
}

fn pushdown(
&self,
lp: ALogicalPlan,
state: Option<State>,
Expand Down Expand Up @@ -110,7 +184,7 @@ impl SlicePushDown {
offset,
len
});
let lp = self.push_down(alp, state, lp_arena, expr_arena)?;
let lp = self.pushdown(alp, state, lp_arena, expr_arena)?;
let input = lp_arena.add(lp);
Ok(Slice {
input,
Expand All @@ -128,13 +202,16 @@ impl SlicePushDown {
offset,
len
});
self.push_down(alp, state, lp_arena, expr_arena)
self.pushdown(alp, state, lp_arena, expr_arena)
}

// [Do not pushdown] boundary
// here we do not pushdown.
// we reset the state and then start the optimization again
m @ (Selection { .. }, _)
// let's be conservative. projections may do aggregations and a pushed down slice
// will lead to incorrect aggregations
| m @ (LocalProjection {..},_)
// other blocking nodes
| m @ (Join { .. }, _)
| m @ (Aggregate {..}, _)
| m @ (DataFrameScan {..}, _)
Expand All @@ -143,44 +220,10 @@ impl SlicePushDown {
| m @ (Cache {..}, _)
| m @ (Distinct {..}, _)
| m @ (Udf {predicate_pd: false, ..}, _)
// let's be conservative. projections may do aggregations and a pushed down slice
// will lead to incorrect aggregations
| m @ (Projection {..}, _)
| m @ (LocalProjection {..},_)
| m @ (HStack {..},_)
=> {
let (lp, state) = m;
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
// No state, so we do not push down the slice here.
let state = None;
let alp = self.push_down(alp, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)

})
.collect::<Result<Vec<_>>>()?;
let lp = lp.from_exprs_and_input(exprs, new_inputs);

if let Some(state) = state {
let input = lp_arena.add(lp);

let lp = ALogicalPlan::Slice {
input,
offset: state.offset,
len: state.len,
};
Ok(lp)

} else {
Ok(lp)
}

self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
}
// [Pushdown]
// these nodes will be pushed down.
Expand All @@ -189,33 +232,22 @@ impl SlicePushDown {

=> {
let (lp, state) = m;
let inputs = lp.get_inputs();
let exprs = lp.get_exprs();

let new_inputs = inputs
.iter()
.map(|&node| {
let alp = lp_arena.take(node);
let alp = self.push_down(alp, state, lp_arena, expr_arena)?;
lp_arena.replace(node, alp);
Ok(node)
})
.collect::<Result<Vec<_>>>()?;
Ok(lp.from_exprs_and_input(exprs, new_inputs))
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
}
(catch_all, state) => {
match state {
Some(state) => {
let input = lp_arena.add(catch_all);
Ok(Slice {
input,
offset: state.offset,
len: state.len
})
}
None => Ok(catch_all)
m @ (Projection {..}, _) => {
let (lp, state) = m;
// The slice operation may only pass on simple projections. col("foo").alias("bar")
if lp.get_exprs().iter().all(|root| {
aexpr_is_simple_projection(*root, expr_arena)
}) {
self.pushdown_and_continue(lp, state, lp_arena, expr_arena)
} else {
self.no_pushdown_restart_opt(lp, state, lp_arena, expr_arena)
}
}
(catch_all, state) => {
self.no_pushdown_finish_opt(catch_all, state, lp_arena)
}

}
}
Expand All @@ -226,6 +258,6 @@ impl SlicePushDown {
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> Result<ALogicalPlan> {
self.push_down(logical_plan, None, lp_arena, expr_arena)
self.pushdown(logical_plan, None, lp_arena, expr_arena)
}
}
7 changes: 7 additions & 0 deletions polars/polars-lazy/src/tests/optimization_checks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,5 +92,12 @@ pub fn test_simple_slice() -> Result<()> {
let out = q.collect()?;
assert_eq!(out.height(), 3);

let q = scan_foods_parquet(false)
.select([col("category"), col("calories").alias("bar")])
.limit(3);
assert!(slice_at_scan(&lp_arena, root));
let out = q.collect()?;
assert_eq!(out.height(), 3);

Ok(())
}
7 changes: 7 additions & 0 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,13 @@ impl PushNode for &mut [Option<Node>] {
}
}

/// A projection that only takes a column or a column + alias.
pub(crate) fn aexpr_is_simple_projection(current_node: Node, arena: &Arena<AExpr>) -> bool {
arena
.iter(current_node)
.all(|(_node, e)| matches!(e, AExpr::Column(_) | AExpr::Alias(_, _)))
}

pub(crate) fn has_aexpr<F>(current_node: Node, arena: &Arena<AExpr>, matches: F) -> bool
where
F: Fn(&AExpr) -> bool,
Expand Down

0 comments on commit eaedab6

Please sign in to comment.