Skip to content

Commit

Permalink
shallow implementation of join prune
Browse files Browse the repository at this point in the history
When a join is done on tables from the same src
the join can be dropped in specific cases.
This commit drops the join if the previous node
in the logical plan is an aggregation or a projection.
(on certain conditions)
A later commit will inspect, and combine lower nodes as
well. #449
  • Loading branch information
ritchie46 committed Apr 20, 2021
1 parent 106bf58 commit 0dd22ed
Show file tree
Hide file tree
Showing 10 changed files with 266 additions and 112 deletions.
9 changes: 9 additions & 0 deletions polars/polars-core/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ impl DataFrame {
}
true
}

/// Checks if the Arc ptrs of the Series are equal
#[allow(clippy::vtable_address_comparisons)]
pub fn fast_equal(&self, other: &DataFrame) -> bool {
self.columns
.iter()
.zip(other.columns.iter())
.all(|(a, b)| Arc::ptr_eq(&a.0, &b.0) && a.name() == b.name() && a.len() == b.len())
}
}

#[cfg(test)]
Expand Down
12 changes: 12 additions & 0 deletions polars/polars-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ impl<T> Arena<T> {
Node(idx)
}

pub fn pop(&mut self) -> Option<T> {
self.items.pop()
}

pub fn len(&self) -> usize {
self.items.len()
}

pub fn is_empty(&self) -> bool {
self.items.is_empty()
}

pub fn new() -> Self {
Arena { items: vec![] }
}
Expand Down
26 changes: 23 additions & 3 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use crate::logical_plan::optimizer::{
predicate_pushdown::PredicatePushDown, projection_pushdown::ProjectionPushDown,
};
use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::join_pruning::JoinPrune;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::utils::combine_predicates_expr;
use crate::{logical_plan::FETCH_ROWS, prelude::*};
Expand Down Expand Up @@ -183,6 +184,7 @@ pub struct OptState {
pub agg_scan_projection: bool,
pub aggregate_pushdown: bool,
pub global_string_cache: bool,
pub join_pruning: bool,
}

impl Default for OptState {
Expand All @@ -196,6 +198,7 @@ impl Default for OptState {
agg_scan_projection: false,
aggregate_pushdown: false,
global_string_cache: true,
join_pruning: true,
}
}
}
Expand All @@ -221,8 +224,8 @@ impl LazyFrame {
let mut logical_plan = self.clone().get_plan_builder().build();
if optimized {
// initialize arena's
let mut expr_arena = Arena::with_capacity(512);
let mut lp_arena = Arena::with_capacity(512);
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);

let lp_top = self.clone().optimize(&mut lp_arena, &mut expr_arena)?;
logical_plan = node_to_lp(lp_top, &mut expr_arena, &mut lp_arena);
Expand All @@ -248,6 +251,14 @@ impl LazyFrame {
}
}

#[cfg(test)]
pub(crate) fn into_alp(self) -> (Node, Arena<AExpr>, Arena<ALogicalPlan>) {
let mut expr_arena = Arena::with_capacity(64);
let mut lp_arena = Arena::with_capacity(32);
let root = to_alp(self.logical_plan, &mut expr_arena, &mut lp_arena);
(root, expr_arena, lp_arena)
}

/// Toggle projection pushdown optimization.
pub fn with_projection_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.projection_pushdown = toggle;
Expand Down Expand Up @@ -284,6 +295,12 @@ impl LazyFrame {
self
}

/// Toggle join pruning optimization
pub fn with_join_pruning(mut self, toggle: bool) -> Self {
self.opt_state.join_pruning = toggle;
self
}

/// Describe the logical plan.
pub fn describe_plan(&self) -> String {
self.logical_plan.describe()
Expand Down Expand Up @@ -413,6 +430,7 @@ impl LazyFrame {
let simplify_expr = self.opt_state.simplify_expr;
let agg_scan_projection = self.opt_state.agg_scan_projection;
let aggregate_pushdown = self.opt_state.aggregate_pushdown;
let join_pruning = self.opt_state.join_pruning;

let logical_plan = self.get_plan_builder().build();

Expand Down Expand Up @@ -452,10 +470,12 @@ impl LazyFrame {
rules.push(Box::new(SimplifyExprRule {}));
rules.push(Box::new(SimplifyBooleanRule {}));
}

if aggregate_pushdown {
rules.push(Box::new(AggregatePushdown::new()))
}
if join_pruning {
rules.push(Box::new(JoinPrune {}))
}

if agg_scan_projection {
// scan the LP to aggregate all the column used in scans
Expand Down
20 changes: 19 additions & 1 deletion polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,34 @@ impl AExpr {
}
}

/// Check if AExpr equality. The nodes may differ.
/// j
/// For instance: there can be two columns "foo" in the memory arena. These are equal,
/// but would have different node values.
pub(crate) fn eq(node_left: Node, node_right: Node, expr_arena: &Arena<AExpr>) -> bool {
let cmp = |(node_left, node_right)| {
use AExpr::*;
// TODO! more variants
match (expr_arena.get(node_left), expr_arena.get(node_right)) {
(Alias(_, name_l), Alias(_, name_r)) => name_l == name_r,
(Column(name_l), Column(name_r)) => name_l == name_r,
(Literal(left), Literal(right)) => left == right,
(BinaryExpr { op: l, .. }, BinaryExpr { op: r, .. }) => l == r,
(Cast { data_type: l, .. }, Cast { data_type: r, .. }) => l == r,
(Sort { reverse: l, .. }, Sort { reverse: r, .. }) => l == r,
(SortBy { reverse: l, .. }, SortBy { reverse: r, .. }) => l == r,
(Shift { periods: l, .. }, Shift { periods: r, .. }) => l == r,
(
Slice {
offset: offset_l,
length: length_l,
..
},
Slice {
offset: offset_r,
length: length_r,
..
},
) => offset_l == offset_r && length_l == length_r,
(a, b) => std::mem::discriminant(a) == std::mem::discriminant(b),
}
};
Expand Down
42 changes: 40 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::logical_plan::iterator::ArenaLpIter;
use crate::logical_plan::{det_melt_schema, Context};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
Expand All @@ -6,6 +7,7 @@ use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
use polars_core::utils::{Arena, Node};
use std::collections::HashSet;
use std::fs::canonicalize;
use std::sync::Arc;

// ALogicalPlan is a representation of LogicalPlan with Nodes which are allocated in an Arena
Expand Down Expand Up @@ -56,8 +58,8 @@ pub enum ALogicalPlan {
selection: Option<Node>,
},
Projection {
expr: Vec<Node>,
input: Node,
expr: Vec<Node>,
schema: SchemaRef,
},
LocalProjection {
Expand Down Expand Up @@ -152,6 +154,38 @@ impl ALogicalPlan {
},
}
}

/// Check ALogicalPlan equallity. The nodes may differ.
///
/// For instance: there can be two columns "foo" in the memory arena. These are equal,
/// but would have different node values.
pub(crate) fn eq(node_left: Node, node_right: Node, lp_arena: &Arena<ALogicalPlan>) -> bool {
let cmp = |(node_left, node_right)| {
use ALogicalPlan::*;
match (lp_arena.get(node_left), lp_arena.get(node_right)) {
(CsvScan { path: path_a, .. }, CsvScan { path: path_b, .. }) => {
canonicalize(path_a).unwrap() == canonicalize(path_b).unwrap()
}
#[cfg(feature = "parquet")]
(ParquetScan { path: path_a, .. }, ParquetScan { path: path_b, .. }) => {
canonicalize(path_a).unwrap() == canonicalize(path_b).unwrap()
}
(DataFrameScan { df: df_a, .. }, DataFrameScan { df: df_b, .. }) => {
df_a.fast_equal(df_b)
}
(a, b) => {
std::mem::discriminant(a) == std::mem::discriminant(b)
&& a.schema(lp_arena) == b.schema(lp_arena)
}
}
};

lp_arena
.iter(node_left)
.zip(lp_arena.iter(node_right))
.map(|(tpll, tplr)| (tpll.0, tplr.0))
.all(cmp)
}
}

impl ALogicalPlan {
Expand Down Expand Up @@ -521,7 +555,11 @@ impl<'a> ALogicalPlanBuilder<'a> {
}

pub fn build(self) -> ALogicalPlan {
self.lp_arena.take(self.root)
if self.root.0 == self.lp_arena.len() {
self.lp_arena.pop().unwrap()
} else {
self.lp_arena.take(self.root)
}
}

pub(crate) fn schema(&self) -> &Schema {
Expand Down
125 changes: 125 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/join_pruning.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use crate::utils::equal_aexprs;

/// Optimization rule that prunes a join, if the latest operation could be merged and the rest of
/// the LP is equal.
// See: https://github.com/ritchie46/polars/issues/449
pub struct JoinPrune {}

impl OptimizationRule for JoinPrune {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
node: Node,
) -> Option<ALogicalPlan> {
let lp = lp_arena.get(node);

// We check if:
// 1: join the same tables,
// 2: join on the same columns
// 3: inputs of joins can be combined
// * AGGREGATION if keys are equal
// * PROJECTION can always be combined.
// 4: the nodes in the LP before (3) are equal.
use ALogicalPlan::*;
match lp {
Join {
input_left,
input_right,
left_on,
right_on,
..
} if equal_aexprs(left_on, right_on, expr_arena) => {
match (lp_arena.get(*input_left), lp_arena.get(*input_right)) {
(
Aggregate {
input: input_l,
keys: keys_l,
aggs: aggs_l,
apply: apply_l,
..
},
Aggregate {
input: input_r,
keys: keys_r,
aggs: aggs_r,
apply: apply_r,
..
},
// skip if we have custom functions
) if apply_l.is_none()
&& apply_r.is_none()
// check if aggregation keys can be combined.
&& equal_aexprs(keys_l, keys_r, expr_arena)
// check if all previous nodes/ transformations are equal
&& ALogicalPlan::eq(*input_l, *input_r, lp_arena)
=>
{
let keys = keys_l.clone();
let aggs = aggs_l
.iter()
.copied()
.chain(aggs_r.iter().copied())
.collect();
Some(
ALogicalPlanBuilder::new(*input_l, expr_arena, lp_arena)
.groupby(keys, aggs, None)
.build(),
)
}
(Projection {input: input_l, expr: expr_l, ..},
Projection {input: input_r, expr: expr_r, ..})
// check if all previous nodes/ transformations are equal
if ALogicalPlan::eq(*input_l, *input_r, lp_arena)
=> {
let exprs = expr_l.iter().copied().chain(expr_r.iter().copied()).collect();
Some(ALogicalPlanBuilder::new(*input_l, expr_arena, lp_arena)
.project(exprs)
.build())
}
_ => None,
}
}
_ => None,
}
}
}

#[cfg(test)]
mod test {
use super::*;
use polars_core::df;
use polars_core::prelude::*;

#[test]
fn test_join_prune() -> Result<()> {
let df = df!(
"a" => [1, 2, 3, 4, 5],
"b" => [1, 1, 2, 2, 2]
)?;

let q1 = df
.clone()
.lazy()
.groupby(vec![col("b")])
.agg(vec![col("a").first()]);

let q2 = df
.clone()
.lazy()
.groupby(vec![col("b")])
.agg(vec![col("a").last()]);

let (root, mut expr_arena, mut alp_arena) =
q1.left_join(q2, col("b"), col("b"), None).into_alp();
dbg!(alp_arena.get(root));
let mut opt = JoinPrune {};
let out = opt
.optimize_plan(&mut alp_arena, &mut expr_arena, root)
.unwrap();
assert!(matches!(out, ALogicalPlan::Aggregate { .. }));
Ok(())
}
}
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use std::collections::HashMap;

pub(crate) mod aggregate_pushdown;
pub(crate) mod aggregate_scan_projections;
pub(crate) mod join_pruning;
pub(crate) mod predicate_pushdown;
pub(crate) mod projection_pushdown;
pub(crate) mod prune_join;
pub(crate) mod simplify_expr;
pub(crate) mod stack_opt;
pub(crate) mod type_coercion;
Expand Down

0 comments on commit 0dd22ed

Please sign in to comment.