Skip to content

Commit

Permalink
refactor(rust): relax sync requirement on Executor trait impls (#5142)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 8, 2022
1 parent ba92cdf commit 820452a
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 4 deletions.
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/executors/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::*;
///
/// Executors have other executors as input. By having a tree of executors we can execute the
/// physical plan until the last executor is evaluated.
pub trait Executor: Send + Sync {
pub trait Executor: Send {
fn execute(&mut self, cache: &mut ExecutionState) -> PolarsResult<DataFrame>;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,18 @@ impl PartitionGroupByExec {
}

fn keys(&self, df: &DataFrame, state: &ExecutionState) -> PolarsResult<Vec<Series>> {
self.keys.iter().map(|s| s.evaluate(df, state)).collect()
compute_keys(&self.keys, df, state)
}
}

fn compute_keys(
keys: &[Arc<dyn PhysicalExpr>],
df: &DataFrame,
state: &ExecutionState,
) -> PolarsResult<Vec<Series>> {
keys.iter().map(|s| s.evaluate(df, state)).collect()
}

fn run_partitions(
df: &mut DataFrame,
exec: &PartitionGroupByExec,
Expand All @@ -50,11 +58,12 @@ fn run_partitions(
// split on several threads. Than the final result we apply the same groupby again.
let dfs = split_df(df, n_threads)?;

let phys_aggs = &exec.phys_aggs;
let keys = &exec.keys;
POOL.install(|| {
dfs.into_par_iter()
.map(|df| {
let keys = exec.keys(&df, state)?;
let phys_aggs = &exec.phys_aggs;
let keys = compute_keys(keys, &df, state)?;
let gb = df.groupby_with_series(keys, false, maintain_order)?;
let groups = gb.get_groups();

Expand Down

0 comments on commit 820452a

Please sign in to comment.