Skip to content

Commit

Permalink
small lazy refactors
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 18, 2021
1 parent a76f0be commit 9e04ae4
Show file tree
Hide file tree
Showing 7 changed files with 136 additions and 173 deletions.
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ pub struct OptState {
pub predicate_pushdown: bool,
pub type_coercion: bool,
pub simplify_expr: bool,
/// Make sure that all needed columns are scannedn
pub agg_scan_projection: bool,
pub aggregate_pushdown: bool,
pub global_string_cache: bool,
Expand All @@ -188,6 +189,7 @@ impl Default for OptState {
predicate_pushdown: true,
type_coercion: true,
simplify_expr: true,
// will be toggled by a scan operation such as csv scan or parquet scan
agg_scan_projection: false,
aggregate_pushdown: false,
global_string_cache: true,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl<'a> IntoIterator for &'a Expr {
}

impl AExpr {
/// Push nodes to a pre-allocated stack
/// Push nodes at this level to a pre-allocated stack
pub(crate) fn nodes<'a>(&'a self, container: &mut Vec<Node>) {
let mut push = |e: &'a Node| container.push(*e);
use AExpr::*;
Expand Down
26 changes: 13 additions & 13 deletions polars/polars-lazy/src/logical_plan/optimizer/aggregate_pushdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,20 @@ use crate::utils::{aexpr_to_root_nodes, has_aexpr};
use polars_core::prelude::*;

pub(crate) struct AggregatePushdown {
state: Vec<Node>,
accumulated_projections: Vec<Node>,
processed_state: bool,
}

impl AggregatePushdown {
pub(crate) fn new() -> Self {
AggregatePushdown {
state: vec![],
accumulated_projections: vec![],
processed_state: false,
}
}
fn drain_nodes(&mut self) -> impl Iterator<Item = Node> {
fn process_nodes(&mut self) -> Vec<Node> {
self.processed_state = true;
let state = std::mem::take(&mut self.state);
state.into_iter()
std::mem::take(&mut self.accumulated_projections)
}

fn pushdown_projection(
Expand All @@ -41,7 +40,7 @@ impl AggregatePushdown {
})
{
// add to state
self.state.extend_from_slice(&expr);
self.accumulated_projections.extend_from_slice(&expr);
// swap projection with the input node
let lp = lp_arena.take(input);
Some(lp)
Expand Down Expand Up @@ -84,16 +83,17 @@ impl OptimizationRule for AggregatePushdown {
} => self.pushdown_projection(node, expr, input, schema, lp_arena, expr_arena),
// todo! hstack should pushown not dependent columns
Join { .. } | Aggregate { .. } | HStack { .. } | DataFrameScan { .. } => {
if self.state.is_empty() {
if self.accumulated_projections.is_empty() {
lp_arena.replace(node, lp);
None
} else {
// we cannot pass a join or GroupBy so we do the projection here
let new_node = lp_arena.add(lp.clone());
let input_schema = lp_arena.get(new_node).schema(lp_arena);

let nodes: Vec<_> = self.drain_nodes().collect();
let fields = nodes
let nodes: Vec<_> = self.process_nodes();
let fields = self
.accumulated_projections
.iter()
.map(|n| {
expr_arena
Expand Down Expand Up @@ -122,7 +122,7 @@ impl OptimizationRule for AggregatePushdown {
predicate,
aggregate,
cache,
} => match self.state.is_empty() {
} => match self.accumulated_projections.is_empty() {
true => {
lp_arena.replace(
node,
Expand All @@ -143,7 +143,7 @@ impl OptimizationRule for AggregatePushdown {
None
}
false => {
let aggregate: Vec<_> = self.drain_nodes().collect();
let aggregate: Vec<_> = self.process_nodes();
Some(ALogicalPlan::CsvScan {
path,
schema,
Expand All @@ -168,7 +168,7 @@ impl OptimizationRule for AggregatePushdown {
aggregate,
stop_after_n_rows,
cache,
} => match self.state.is_empty() {
} => match self.accumulated_projections.is_empty() {
true => {
lp_arena.replace(
node,
Expand All @@ -185,7 +185,7 @@ impl OptimizationRule for AggregatePushdown {
None
}
false => {
let aggregate = self.drain_nodes().collect();
let aggregate = self.process_nodes();
Some(ALogicalPlan::ParquetScan {
path,
schema,
Expand Down
70 changes: 42 additions & 28 deletions polars/polars-lazy/src/logical_plan/optimizer/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ impl ALogicalPlan {
}
}

/// Get expressions in this node.
pub fn get_exprs(&self) -> Vec<Node> {
/// Push exprs in this node to an existing container.
pub fn collect_exprs(&self, container: &mut Vec<Node>) {
use ALogicalPlan::*;
match self {
Melt { .. }
Expand All @@ -195,62 +195,66 @@ impl ALogicalPlan {
| Explode { .. }
| Cache { .. }
| Distinct { .. }
| Udf { .. } => vec![],
Selection { predicate, .. } => vec![*predicate],
Projection { expr, .. } => expr.clone(),
LocalProjection { expr, .. } => expr.clone(),
| Udf { .. } => {}
Selection { predicate, .. } => container.push(*predicate),
Projection { expr, .. } => container.extend_from_slice(expr),
LocalProjection { expr, .. } => container.extend_from_slice(expr),
Aggregate { keys, aggs, .. } => {
keys.iter().copied().chain(aggs.iter().copied()).collect()
let iter = keys.iter().copied().chain(aggs.iter().copied());
container.extend(iter)
}
Join {
left_on, right_on, ..
} => left_on
.iter()
.copied()
.chain(right_on.iter().copied())
.collect(),
HStack { exprs, .. } => exprs.clone(),
} => {
let iter = left_on.iter().copied().chain(right_on.iter().copied());
container.extend(iter)
}
HStack { exprs, .. } => container.extend_from_slice(exprs),
#[cfg(feature = "parquet")]
ParquetScan {
predicate,
aggregate,
..
} => {
let mut exprs = aggregate.clone();
container.extend_from_slice(aggregate);
if let Some(node) = predicate {
exprs.push(*node)
container.push(*node)
}
exprs
}
CsvScan {
predicate,
aggregate,
..
} => {
let mut exprs = aggregate.clone();
container.extend_from_slice(aggregate);
if let Some(node) = predicate {
exprs.push(*node)
container.push(*node)
}
exprs
}
DataFrameScan {
projection,
selection,
..
} => {
let mut exprs = vec![];
if let Some(expr) = projection {
exprs.extend_from_slice(expr)
container.extend_from_slice(expr)
}
if let Some(expr) = selection {
exprs.push(*expr)
container.push(*expr)
}
exprs
}
}
}

pub fn get_inputs(&self) -> Vec<Node> {
/// Get expressions in this node.
pub fn get_exprs(&self) -> Vec<Node> {
let mut exprs = Vec::new();
self.collect_exprs(&mut exprs);
exprs
}

/// Push inputs in this node to an existing container.
pub fn collect_inputs(&self, container: &mut Vec<Node>) {
use ALogicalPlan::*;
let input = match self {
Melt { input, .. } => *input,
Expand All @@ -266,14 +270,24 @@ impl ALogicalPlan {
input_left,
input_right,
..
} => return vec![*input_left, *input_right],
} => {
container.push(*input_left);
container.push(*input_right);
return;
}
HStack { input, .. } => *input,
Distinct { input, .. } => *input,
Udf { input, .. } => *input,
#[cfg(feature = "parquet")]
ParquetScan { .. } => return vec![],
CsvScan { .. } | DataFrameScan { .. } => return vec![],
ParquetScan { .. } => return,
CsvScan { .. } | DataFrameScan { .. } => return,
};
vec![input]
container.push(input)
}

pub fn get_inputs(&self) -> Vec<Node> {
let mut inputs = Vec::new();
self.collect_inputs(&mut inputs);
inputs
}
}
84 changes: 7 additions & 77 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,11 @@ impl StackOptimizer {
) -> Node {
let mut changed = true;

let mut plans = Vec::with_capacity(64);
let mut plans = Vec::with_capacity(32);

// nodes of expressions and lp node from which the expressions are a member of
let mut exprs = Vec::with_capacity(64);
let mut exprs = Vec::with_capacity(32);

// used to get the nodes of an expression
let mut inter_mediate_stack = Vec::with_capacity(8);
// run loop until reaching fixed point
while changed {
// recurse into sub plans and expressions and apply rules
Expand All @@ -69,82 +67,18 @@ impl StackOptimizer {
let plan = lp_arena.get(current_node);

// traverse subplans and expressions and add to the stack
match plan {
ALogicalPlan::Slice { input, .. } => {
plans.push(*input);
}
ALogicalPlan::Selection { input, predicate } => {
plans.push(*input);
exprs.push((*predicate, *input));
}
ALogicalPlan::Projection { expr, input, .. } => {
plans.push(*input);
exprs.extend(expr.iter().map(|e| (*e, *input)));
}
ALogicalPlan::LocalProjection { expr, input, .. } => {
plans.push(*input);
exprs.extend(expr.iter().map(|e| (*e, *input)));
}
ALogicalPlan::Sort { input, .. } => {
plans.push(*input);
}
ALogicalPlan::Explode { input, .. } => {
plans.push(*input);
}
ALogicalPlan::Cache { input } => {
plans.push(*input);
}
ALogicalPlan::Aggregate {
input, aggs, keys, ..
} => {
plans.push(*input);
exprs.extend(aggs.iter().map(|e| (*e, *input)));
exprs.extend(keys.iter().map(|e| (*e, *input)));
}
ALogicalPlan::Join {
input_left,
input_right,
..
} => {
plans.push(*input_left);
plans.push(*input_right);
}
ALogicalPlan::HStack {
input, exprs: e2, ..
} => {
plans.push(*input);
exprs.extend(e2.iter().map(|e| (*e, *input)));
}
ALogicalPlan::Distinct { input, .. } => plans.push(*input),
ALogicalPlan::DataFrameScan { selection, .. } => {
if let Some(selection) = *selection {
exprs.push((selection, current_node))
}
}
ALogicalPlan::CsvScan { predicate, .. } => {
if let Some(predicate) = *predicate {
exprs.push((predicate, current_node))
}
}
#[cfg(feature = "parquet")]
ALogicalPlan::ParquetScan { predicate, .. } => {
if let Some(predicate) = *predicate {
exprs.push((predicate, current_node))
}
}
ALogicalPlan::Melt { input, .. } => plans.push(*input),
ALogicalPlan::Udf { input, .. } => plans.push(*input),
}
plan.collect_exprs(&mut exprs);
plan.collect_inputs(&mut plans);

// process the expressions on the stack and apply optimizations.
while let Some((current_expr_node, current_lp_node)) = exprs.pop() {
while let Some(current_expr_node) = exprs.pop() {
for rule in rules.iter() {
// keep iterating over same rule
while let Some(x) = rule.optimize_expr(
expr_arena,
current_expr_node,
&lp_arena,
current_lp_node,
current_node,
) {
expr_arena.replace(current_expr_node, x);
changed = true;
Expand All @@ -153,11 +87,7 @@ impl StackOptimizer {

// traverse subexpressions and add to the stack
let expr = expr_arena.get(current_expr_node);

expr.nodes(&mut inter_mediate_stack);
for node in inter_mediate_stack.drain(..) {
exprs.push((node, current_lp_node))
}
expr.nodes(&mut exprs);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl ProjectionPushDown {
subset,
} => {
// make sure that the set of unique columns is projected
(&*subset).as_ref().map(|subset| {
if let Some(subset) = (&*subset).as_ref() {
subset.iter().for_each(|name| {
add_str_to_accumulated(
name,
Expand All @@ -412,7 +412,8 @@ impl ProjectionPushDown {
expr_arena,
)
})
});
}

self.pushdown_and_assign(
input,
acc_projections,
Expand Down

0 comments on commit 9e04ae4

Please sign in to comment.