Skip to content

Commit

Permalink
Improve lazy state struct (#4008)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 14, 2022
1 parent 05aab4d commit 394c738
Show file tree
Hide file tree
Showing 30 changed files with 152 additions and 112 deletions.
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ test_all = [

[dependencies]
ahash = "0.7"
bitflags = "1.3"
glob = "0.3"
parking_lot = "0.12"
pyo3 = { version = "0.16", optional = true }
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,8 @@ impl LazyFrame {
let mut physical_plan =
planner.create_physical_plan(lp_top, &mut lp_arena, &mut expr_arena)?;

let state = ExecutionState::with_finger_prints(finger_prints);
let out = physical_plan.execute(&state);
let mut state = ExecutionState::with_finger_prints(finger_prints);
let out = physical_plan.execute(&mut state);
#[cfg(feature = "dtype-categorical")]
if use_string_cache {
toggle_string_cache(!use_string_cache);
Expand Down
5 changes: 2 additions & 3 deletions polars/polars-lazy/src/physical_plan/executors/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::physical_plan::executors::POLARS_VERBOSE;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::prelude::*;
Expand All @@ -9,15 +8,15 @@ pub struct CacheExec {
}

impl Executor for CacheExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
if let Some(df) = state.cache_hit(&self.key) {
return Ok(df);
}

// cache miss
let df = self.input.execute(state)?;
state.store_cache(std::mem::take(&mut self.key), df.clone());
if std::env::var(POLARS_VERBOSE).is_ok() {
if state.verbose() {
println!("cache set {:?}", self.key);
}
Ok(df)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) struct DropDuplicatesExec {
}

impl Executor for DropDuplicatesExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let subset = self.options.subset.as_ref().map(|v| &***v);
let keep = self.options.keep_strategy;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/explode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) struct ExplodeExec {
}

impl Executor for ExplodeExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
df.explode(&self.columns)
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/physical_plan/executors/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ impl FilterExec {
}

impl Executor for FilterExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let s = self.predicate.evaluate(&df, state)?;
let mask = s.bool().expect("filter predicate wasn't of type boolean");
let df = df.filter(mask)?;
if state.verbose {
if state.verbose() {
eprintln!("dataframe filtered");
}
Ok(df)
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(super) fn groupby_helper(
keys: Vec<Series>,
aggs: &[Arc<dyn PhysicalExpr>],
apply: Option<&Arc<dyn DataFrameUdf>>,
state: &ExecutionState,
state: &mut ExecutionState,
maintain_order: bool,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
Expand Down Expand Up @@ -91,8 +91,8 @@ pub(super) fn groupby_helper(
}

impl Executor for GroupByExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
if state.verbose {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
if state.verbose() {
eprintln!("keys/aggregates are not partitionable: running default HASH AGGREGATION")
}
let df = self.input.execute(state)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@ pub(crate) struct GroupByDynamicExec {

impl Executor for GroupByDynamicExec {
#[cfg(not(feature = "dynamic_groupby"))]
fn execute(&mut self, _state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, _state: &mut ExecutionState) -> Result<DataFrame> {
panic!("activate feature dynamic_groupby")
}

#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();
state.set_schema(self.input_schema.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,17 +140,17 @@ fn estimate_unique_count(keys: &[Series], mut sample_size: usize) -> usize {
// by sampling data.
fn can_run_partitioned(keys: &[Series], original_df: &DataFrame, state: &ExecutionState) -> bool {
if std::env::var("POLARS_NO_PARTITION").is_ok() {
if state.verbose {
if state.verbose() {
eprintln!("POLARS_NO_PARTITION set: running default HASH AGGREGATION")
}
false
} else if std::env::var("POLARS_FORCE_PARTITION").is_ok() {
if state.verbose {
if state.verbose() {
eprintln!("POLARS_FORCE_PARTITION set: running partitioned HASH AGGREGATION")
}
true
} else if original_df.height() < 1000 && !cfg!(test) {
if state.verbose {
if state.verbose() {
eprintln!("DATAFRAME < 1000 rows: running default HASH AGGREGATION")
}
false
Expand All @@ -176,12 +176,12 @@ fn can_run_partitioned(keys: &[Series], original_df: &DataFrame, state: &Executi
(estimate_unique_count(keys, sample_size), "estimated")
}
};
if state.verbose {
if state.verbose() {
eprintln!("{} unique values: {}", sampled_method, unique_estimate);
}

if unique_estimate > unique_count_boundary {
if state.verbose {
if state.verbose() {
eprintln!("estimated unique count: {} exceeded the boundary: {}, running default HASH AGGREGATION",unique_estimate, unique_count_boundary)
}
false
Expand All @@ -192,7 +192,7 @@ fn can_run_partitioned(keys: &[Series], original_df: &DataFrame, state: &Executi
}

impl Executor for PartitionGroupByExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let dfs = {
let original_df = self.input.execute(state)?;

Expand All @@ -213,7 +213,7 @@ impl Executor for PartitionGroupByExec {
self.slice,
);
}
if state.verbose {
if state.verbose() {
eprintln!("run PARTITIONED HASH AGGREGATION")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ pub(crate) struct GroupByRollingExec {

impl Executor for GroupByRollingExec {
#[cfg(not(feature = "dynamic_groupby"))]
fn execute(&mut self, _state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, _state: &mut ExecutionState) -> Result<DataFrame> {
panic!("activate feature dynamic_groupby")
}

#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();
state.set_schema(self.input_schema.clone());
Expand Down
18 changes: 9 additions & 9 deletions polars/polars-lazy/src/physical_plan/executors/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,28 +42,28 @@ impl JoinExec {
}

impl Executor for JoinExec {
fn execute<'a>(&'a mut self, state: &'a ExecutionState) -> Result<DataFrame> {
if state.verbose {
fn execute<'a>(&'a mut self, state: &'a mut ExecutionState) -> Result<DataFrame> {
if state.verbose() {
eprintln!("join parallel: {}", self.parallel);
};
let mut input_left = self.input_left.take().unwrap();
let mut input_right = self.input_right.take().unwrap();

let (df_left, df_right) = if self.parallel {
let state_left = state;
let mut state_right = state.clone();
state_right.join_branch += 1;
let mut state_right = state.split();
let mut state_left = state.split();
state_right.branch_idx += 1;
// propagate the fetch_rows static value to the spawning threads.
let fetch_rows = FETCH_ROWS.with(|fetch_rows| fetch_rows.get());

POOL.join(
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_left.execute(state_left)
input_left.execute(&mut state_left)
},
move || {
FETCH_ROWS.with(|fr| fr.set(fetch_rows));
input_right.execute(&state_right)
input_right.execute(&mut state_right)
},
)
} else {
Expand Down Expand Up @@ -131,10 +131,10 @@ impl Executor for JoinExec {
Some(self.suffix.clone().into_owned()),
self.slice,
true,
state.verbose,
state.verbose(),
);

if state.verbose {
if state.verbose() {
eprintln!("{:?} join dataframes finished", self.how);
};
df
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/melt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct MeltExec {
}

impl Executor for MeltExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let args = std::mem::take(Arc::make_mut(&mut self.args));
df.melt2(args)
Expand Down
26 changes: 15 additions & 11 deletions polars/polars-lazy/src/physical_plan/executors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,11 @@ pub(super) use self::{

use super::*;
use crate::logical_plan::FETCH_ROWS;
use crate::physical_plan::state::StateFlags;
use polars_core::POOL;
use rayon::prelude::*;
use std::path::PathBuf;

const POLARS_VERBOSE: &str = "POLARS_VERBOSE";

fn set_n_rows(n_rows: Option<usize>) -> Option<usize> {
let fetch_rows = FETCH_ROWS.with(|fetch_rows| fetch_rows.get());
match fetch_rows {
Expand Down Expand Up @@ -102,14 +101,14 @@ fn execute_projection_cached_window_fns(

for mut partition in windows {
// clear the cache for every partitioned group
let mut state = state.clone();
let mut state = state.split();
state.clear_expr_cache();

// don't bother caching if we only have a single window function in this partition
if partition.1.len() == 1 {
state.cache_window = false;
state.flags.remove(StateFlags::CACHE_WINDOW_EXPR)
} else {
state.cache_window = true;
state.flags.insert(StateFlags::CACHE_WINDOW_EXPR);
}

partition.1.sort_unstable_by_key(|(_idx, explode, _)| {
Expand All @@ -119,14 +118,19 @@ fn execute_projection_cached_window_fns(
});

for (index, _, e) in partition.1 {
// caching more than one window expression is a complicated topic for another day
// see issue #2523
state.cache_window = e
.as_expression()
if e.as_expression()
.into_iter()
.filter(|e| matches!(e, Expr::Window { .. }))
.count()
== 1;
== 1
{
state.flags.insert(StateFlags::CACHE_WINDOW_EXPR)
}
// caching more than one window expression is a complicated topic for another day
// see issue #2523
else {
state.flags.remove(StateFlags::CACHE_WINDOW_EXPR)
}

let s = e.evaluate(df, &state)?;
selected_columns.push((index, s));
Expand All @@ -141,7 +145,7 @@ fn execute_projection_cached_window_fns(
pub(crate) fn evaluate_physical_expressions(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
state: &mut ExecutionState,
has_windows: bool,
) -> Result<DataFrame> {
let zero_length = df.height() == 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub struct ProjectionExec {
}

impl Executor for ProjectionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
state.set_schema(self.input_schema.clone());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ pub(crate) struct PythonScanExec {
}

impl Executor for PythonScanExec {
fn execute(&mut self, _cache: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, _state: &mut ExecutionState) -> Result<DataFrame> {
let with_columns = self.options.with_columns.take();
Python::with_gil(|py| {
let pl = PyModule::import(py, "polars").unwrap();
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl CsvExec {
}

impl Executor for CsvExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/scan/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl IpcExec {
}

impl Executor for IpcExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/physical_plan/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ pub struct DataFrameExec {
}

impl Executor for DataFrameExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = mem::take(&mut self.df);
let mut df = Arc::try_unwrap(df).unwrap_or_else(|df| (*df).clone());

Expand Down Expand Up @@ -108,7 +108,7 @@ pub(crate) struct AnonymousScanExec {
}

impl Executor for AnonymousScanExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let mut df = self.function.scan(self.options.clone())?;
if let Some(predicate) = &self.predicate {
let s = predicate.evaluate(&df, state)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl ParquetExec {
}

impl Executor for ParquetExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let finger_print = FileFingerPrint {
path: self.path.clone(),
predicate: self.predicate.as_ref().map(|ae| ae.as_expression().clone()),
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub struct SliceExec {
}

impl Executor for SliceExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
Ok(df.slice(self.offset, self.len as usize))
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) struct SortExec {
}

impl Executor for SortExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/stack.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pub struct StackExec {
}

impl Executor for StackExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &mut ExecutionState) -> Result<DataFrame> {
let mut df = self.input.execute(state)?;

state.set_schema(self.input_schema.clone());
Expand Down

0 comments on commit 394c738

Please sign in to comment.