Skip to content

Commit

Permalink
[Lazy] Pass execution state as args
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 24, 2021
1 parent 9473558 commit 54019ff
Show file tree
Hide file tree
Showing 4 changed files with 179 additions and 103 deletions.
38 changes: 20 additions & 18 deletions polars/polars-lazy/src/physical_plan/executors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ impl FilterExec {
impl Executor for FilterExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let s = self.predicate.evaluate(&df)?;
let s = self.predicate.evaluate(&df, state)?;
let mask = s.bool().expect("filter predicate wasn't of type boolean");
let df = df.filter(mask)?;
if std::env::var(POLARS_VERBOSE).is_ok() {
Expand Down Expand Up @@ -302,17 +302,17 @@ impl DataFrameExec {
}

impl Executor for DataFrameExec {
fn execute(&mut self, _: &ExecutionState) -> Result<DataFrame> {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = mem::take(&mut self.df);
let mut df = Arc::try_unwrap(df).unwrap_or_else(|df| (*df).clone());

// projection should be before selection as those are free
if let Some(projection) = &self.projection {
df = evaluate_physical_expressions(&df, projection)?;
df = evaluate_physical_expressions(&df, projection, state)?;
}

if let Some(selection) = &self.selection {
let s = selection.evaluate(&df)?;
let s = selection.evaluate(&df, state)?;
let mask = s.bool().map_err(|_| {
PolarsError::Other("filter predicate was not of type boolean".into())
})?;
Expand Down Expand Up @@ -353,11 +353,12 @@ impl StandardExec {
pub(crate) fn evaluate_physical_expressions(
df: &DataFrame,
exprs: &[Arc<dyn PhysicalExpr>],
state: &ExecutionState,
) -> Result<DataFrame> {
let height = df.height();
let mut selected_columns = exprs
.par_iter()
.map(|expr| expr.evaluate(df))
.map(|expr| expr.evaluate(df, state))
.collect::<Result<Vec<Series>>>()?;

// If all series are the same length it is ok. If not we can broadcast Series of length one.
Expand All @@ -384,7 +385,7 @@ impl Executor for StandardExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;

let df = evaluate_physical_expressions(&df, &self.expr);
let df = evaluate_physical_expressions(&df, &self.expr, state);
if std::env::var(POLARS_VERBOSE).is_ok() {
println!("operation {} on dataframe finished", self.operation);
}
Expand Down Expand Up @@ -462,6 +463,7 @@ fn groupby_helper(
keys: Vec<Series>,
aggs: &[Arc<dyn PhysicalExpr>],
apply: Option<&Arc<dyn DataFrameUdf>>,
state: &ExecutionState,
) -> Result<DataFrame> {
let gb = df.groupby_with_series(keys, true)?;
if let Some(f) = apply {
Expand All @@ -477,7 +479,7 @@ fn groupby_helper(
.par_iter()
.map(|expr| {
let agg_expr = expr.as_agg_expr()?;
let opt_agg = agg_expr.aggregate(&df, groups)?;
let opt_agg = agg_expr.aggregate(&df, groups, state)?;
if let Some(agg) = &opt_agg {
if agg.len() != groups.len() {
panic!(
Expand All @@ -504,9 +506,9 @@ impl Executor for GroupByExec {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df))
.map(|e| e.evaluate(&df, state))
.collect::<Result<_>>()?;
groupby_helper(df, keys, &self.aggs, self.apply.as_ref())
groupby_helper(df, keys, &self.aggs, self.apply.as_ref(), state)
}
}

Expand Down Expand Up @@ -545,7 +547,7 @@ impl Executor for PartitionGroupByExec {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&original_df))
.map(|e| e.evaluate(&original_df, state))
.collect::<Result<Vec<_>>>()?;

debug_assert_eq!(keys.len(), 1);
Expand All @@ -557,7 +559,7 @@ impl Executor for PartitionGroupByExec {
let frac = cat_map.len() as f32 / ca.len() as f32;
// TODO! proper benchmark which boundary should be chosen.
if frac > 0.3 {
return groupby_helper(original_df, keys, &self.phys_aggs, None);
return groupby_helper(original_df, keys, &self.phys_aggs, None, state);
}
}
let mut expr_arena = Arena::with_capacity(64);
Expand Down Expand Up @@ -597,7 +599,7 @@ impl Executor for PartitionGroupByExec {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df))
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;
let phys_aggs = &self.phys_aggs;
let gb = df.groupby_with_series(keys, false)?;
Expand All @@ -608,7 +610,7 @@ impl Executor for PartitionGroupByExec {
.iter()
.map(|expr| {
let agg_expr = expr.as_agg_expr()?;
let opt_agg = agg_expr.evaluate_partitioned(&df, groups)?;
let opt_agg = agg_expr.evaluate_partitioned(&df, groups, state)?;
if let Some(agg) = &opt_agg {
if agg[0].len() != groups.len() {
panic!(
Expand Down Expand Up @@ -637,7 +639,7 @@ impl Executor for PartitionGroupByExec {
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df))
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;

// do the same on the outer results
Expand All @@ -652,7 +654,7 @@ impl Executor for PartitionGroupByExec {
let agg_expr = expr.as_agg_expr().unwrap();
// If None the column doesn't exist anymore.
// For instance when summing a string this column will not be in the aggregation result
let opt_agg = agg_expr.evaluate_partitioned_final(&df, groups).ok();
let opt_agg = agg_expr.evaluate_partitioned_final(&df, groups, state).ok();
opt_agg.map(|opt_s| {
opt_s.map(|mut s| {
s.rename(name);
Expand Down Expand Up @@ -728,13 +730,13 @@ impl Executor for JoinExec {
let left_names = self
.left_on
.iter()
.map(|e| e.evaluate(&df_left).map(|s| s.name().to_string()))
.map(|e| e.evaluate(&df_left, state).map(|s| s.name().to_string()))
.collect::<Result<Vec<_>>>()?;

let right_names = self
.right_on
.iter()
.map(|e| e.evaluate(&df_right).map(|s| s.name().to_string()))
.map(|e| e.evaluate(&df_right, state).map(|s| s.name().to_string()))
.collect::<Result<Vec<_>>>()?;

let df = df_left.join(&df_right, &left_names, &right_names, self.how);
Expand All @@ -761,7 +763,7 @@ impl Executor for StackExec {
let height = df.height();

let res: Result<_> = self.expr.iter().try_for_each(|expr| {
let s = expr.evaluate(&df).map(|series| {
let s = expr.evaluate(&df, state).map(|series| {
// literal series. Should be whole column size
if series.len() == 1 && height > 1 {
series.expand_at_index(0, height)
Expand Down

0 comments on commit 54019ff

Please sign in to comment.