Skip to content

Commit

Permalink
refactor[rust]: keep imports private in polars-lazy::logical_plan::op…
Browse files Browse the repository at this point in the history
…timize (#4989)
  • Loading branch information
ritchie46 committed Sep 26, 2022
1 parent 1a4f0f3 commit 0885f56
Show file tree
Hide file tree
Showing 23 changed files with 265 additions and 277 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ polars-arrow = { version = "0.24.2", path = "../polars-arrow", features = ["comp
polars-utils = { version = "0.24.2", path = "../polars-utils" }
rand = { version = "0.8", optional = true, features = ["small_rng", "std"] }
rand_distr = { version = "0.4", optional = true }
rayon.worspace = true
rayon.workspace = true
regex = { version = "1.5", optional = true }
# activate if you want serde support for Series and DataFrames
serde = { version = "1", features = ["derive"], optional = true }
Expand Down
176 changes: 4 additions & 172 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,38 +27,18 @@ pub use ndjson::*;
#[cfg(feature = "parquet")]
pub use parquet::*;
use polars_arrow::prelude::QuantileInterpolOptions;
#[cfg(any(feature = "parquet", feature = "csv-file", feature = "ipc"))]
use polars_core::datatypes::PlHashMap;
use polars_core::frame::explode::MeltArgs;
use polars_core::frame::hash_join::JoinType;
use polars_core::prelude::*;
use polars_io::RowCount;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};

use crate::logical_plan::optimizer::aggregate_pushdown::AggregatePushdown;
#[cfg(any(
feature = "parquet",
feature = "csv-file",
feature = "ipc",
feature = "cse"
))]
use crate::logical_plan::optimizer::file_caching::FileCacher;
use crate::logical_plan::optimizer::predicate_pushdown::PredicatePushDown;
use crate::logical_plan::optimizer::projection_pushdown::ProjectionPushDown;
use crate::logical_plan::optimizer::simplify_expr::SimplifyExprRule;
use crate::logical_plan::optimizer::stack_opt::{OptimizationRule, StackOptimizer};
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
use crate::logical_plan::optimizer::file_caching::collect_fingerprints;
use crate::logical_plan::optimizer::optimize;
use crate::logical_plan::FETCH_ROWS;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::delay_rechunk::DelayRechunk;
use crate::prelude::drop_nulls::ReplaceDropNulls;
use crate::prelude::fast_projection::FastProjectionAndCollapse;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
use crate::prelude::file_caching::collect_fingerprints;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
use crate::prelude::file_caching::find_column_union_and_fingerprints;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::prelude::slice_pushdown_lp::SlicePushDown;
use crate::prelude::*;
use crate::utils::{combine_predicates_expr, expr_to_leaf_column_names};

Expand Down Expand Up @@ -563,155 +543,7 @@ impl LazyFrame {
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
) -> PolarsResult<Node> {
// get toggle values
let predicate_pushdown = self.opt_state.predicate_pushdown;
let projection_pushdown = self.opt_state.projection_pushdown;
let type_coercion = self.opt_state.type_coercion;
let simplify_expr = self.opt_state.simplify_expr;
let slice_pushdown = self.opt_state.slice_pushdown;
#[cfg(feature = "cse")]
let cse = self.opt_state.common_subplan_elimination;

let agg_scan_projection = self.opt_state.file_caching;
let aggregate_pushdown = self.opt_state.aggregate_pushdown;

let logical_plan = self.get_plan_builder().build();
let mut scratch = vec![];

// gradually fill the rules passed to the optimizer
let opt = StackOptimizer {};
let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);

// during debug we check if the optimizations have not modified the final schema
#[cfg(debug_assertions)]
let prev_schema = logical_plan.schema()?.into_owned();

let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena)?;

#[cfg(feature = "cse")]
let cse_changed = if cse {
let (lp, changed) = cse::elim_cmn_subplans(lp_top, lp_arena, expr_arena);
lp_top = lp;
changed
} else {
false
};
#[cfg(not(feature = "cse"))]
let cse_changed = false;

// we do simplification
if simplify_expr {
rules.push(Box::new(SimplifyExprRule {}));
}

// should be run before predicate pushdown
if projection_pushdown {
let projection_pushdown_opt = ProjectionPushDown {};
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
cache_states::set_cache_states(lp_top, lp_arena, expr_arena, &mut scratch, cse_changed);
}

if predicate_pushdown {
let predicate_pushdown_opt = PredicatePushDown::default();
let alp = lp_arena.take(lp_top);
let alp = predicate_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;
lp_arena.replace(lp_top, alp);
}

// make sure its before slice pushdown.
if projection_pushdown {
rules.push(Box::new(FastProjectionAndCollapse {}));
}
rules.push(Box::new(DelayRechunk {}));

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt.optimize(alp, lp_arena, expr_arena)?;

lp_arena.replace(lp_top, alp);

// expressions use the stack optimizer
rules.push(Box::new(slice_pushdown_opt));
}
if type_coercion {
rules.push(Box::new(TypeCoercionRule {}))
}
// this optimization removes branches, so we must do it when type coercion
// is completed
if simplify_expr {
rules.push(Box::new(SimplifyBooleanRule {}));
}

if aggregate_pushdown {
rules.push(Box::new(AggregatePushdown::new()))
}

// make sure that we do that once slice pushdown
// and predicate pushdown are done. At that moment
// the file fingerprints are finished.
#[cfg(any(
feature = "cse",
feature = "parquet",
feature = "ipc",
feature = "csv-file"
))]
if agg_scan_projection || cse_changed {
// we do this so that expressions are simplified created by the pushdown optimizations
// we must clean up the predicates, because the agg_scan_projection
// uses them in the hashtable to determine duplicates.
let simplify_bools =
&mut [Box::new(SimplifyBooleanRule {}) as Box<dyn OptimizationRule>];
lp_top = opt.optimize_loop(simplify_bools, expr_arena, lp_arena, lp_top);

// scan the LP to aggregate all the column used in scans
// these columns will be added to the state of the AggScanProjection rule
let mut file_predicate_to_columns_and_count = PlHashMap::with_capacity(32);
find_column_union_and_fingerprints(
lp_top,
&mut file_predicate_to_columns_and_count,
lp_arena,
expr_arena,
);

let mut file_cacher = FileCacher::new(file_predicate_to_columns_and_count);
file_cacher.assign_unions(lp_top, lp_arena, expr_arena, &mut scratch, false);
scratch.clear();

#[cfg(feature = "cse")]
if cse_changed {
// this must run after cse
cse::decrement_file_counters_by_cache_hits(
lp_top,
lp_arena,
expr_arena,
0,
&mut scratch,
);
}
}

rules.push(Box::new(ReplaceDropNulls {}));

lp_top = opt.optimize_loop(&mut rules, expr_arena, lp_arena, lp_top);

// during debug we check if the optimizations have not modified the final schema
#[cfg(debug_assertions)]
{
// only check by names because we may supercast types.
assert_eq!(
prev_schema.iter_names().collect::<Vec<_>>(),
lp_arena
.get(lp_top)
.schema(lp_arena)
.iter_names()
.collect::<Vec<_>>()
);
};

Ok(lp_top)
optimize(self, lp_arena, expr_arena)
}

fn prepare_collect(self) -> PolarsResult<(ExecutionState, Box<dyn Executor>)> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::logical_plan::Context;
use crate::prelude::*;
use crate::utils::{aexpr_to_leaf_nodes, has_aexpr};

pub(crate) struct AggregatePushdown {
pub(super) struct AggregatePushdown {
accumulated_projections: Vec<Node>,
processed_state: bool,
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/logical_plan/optimizer/cache_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::sync::Arc;

use polars_core::prelude::PlIndexSet;

use crate::prelude::*;
use super::*;

fn get_upper_projections(
parent: Node,
Expand All @@ -29,7 +29,7 @@ fn get_upper_projections(

/// This will ensure that all equal caches communicate the amount of columns
/// they need to project.
pub(crate) fn set_cache_states(
pub(super) fn set_cache_states(
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
expr_arena: &mut Arena<AExpr>,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/optimizer/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type Trail = Vec<Node>;
// we traverse left first, so the `id` remains the same for an all left traversal.
// every right node may increment `id` and because it's shared mutable there will
// be no collisions as the increment is communicated upward with mutation.
pub(crate) fn collect_trails(
pub(super) fn collect_trails(
root: Node,
lp_arena: &Arena<ALogicalPlan>,
// every branch gets its own trail
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use polars_utils::arena::{Arena, Node};

use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use super::*;

pub(crate) struct DelayRechunk {}
pub(super) struct DelayRechunk {}

impl OptimizationRule for DelayRechunk {
fn optimize_plan(
Expand Down
8 changes: 3 additions & 5 deletions polars/polars-lazy/src/logical_plan/optimizer/drop_nulls.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
use std::sync::Arc;

use super::*;
use crate::dsl::function_expr::FunctionExpr;
use crate::logical_plan::functions::FunctionNode;
use crate::logical_plan::iterator::*;
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use crate::utils::aexpr_to_leaf_names;

/// If we realize that a predicate drops nulls on a subset
/// we replace it with an explicit df.drop_nulls call, as this
/// has a fast path for the no null case
pub struct ReplaceDropNulls {}
pub(super) struct ReplaceDropNulls {}

impl OptimizationRule for ReplaceDropNulls {
fn optimize_plan(
Expand Down Expand Up @@ -98,9 +97,7 @@ mod test {
use polars_core::prelude::*;

use super::*;
use crate::prelude::stack_opt::OptimizationRule;
use crate::tests::fruits_cars;
use crate::utils::test::optimize_lp;

#[test]
fn test_drop_nulls_optimization() -> PolarsResult<()> {
Expand All @@ -114,6 +111,7 @@ mod test {
None,
] {
let lp = df.clone().lazy().drop_nulls(subset).logical_plan;

let out = optimize_lp(lp, &mut rules);
assert!(matches!(out, LogicalPlan::MapFunction { .. }));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use polars_core::prelude::*;

use super::*;
use crate::logical_plan::alp::ALogicalPlan;
use crate::logical_plan::functions::FunctionNode;
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;

/// Projection in the physical plan is done by selecting an expression per thread.
/// In case of many projections and columns this can be expensive when the expressions are simple
Expand All @@ -14,7 +13,7 @@ use crate::prelude::*;
/// It is important that this optimization is ran after projection pushdown.
///
/// The schema reported after this optimization is also
pub(crate) struct FastProjectionAndCollapse {}
pub(super) struct FastProjectionAndCollapse {}

fn impl_fast_projection(
input: Node,
Expand Down

0 comments on commit 0885f56

Please sign in to comment.