Skip to content

Commit

Permalink
restrict parallel branches in lazy Union (#3628)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 8, 2022
1 parent 7ef502b commit f114082
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 9 deletions.
29 changes: 20 additions & 9 deletions polars/polars-lazy/src/physical_plan/executors/union.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ pub(crate) struct UnionExec {

impl Executor for UnionExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let inputs = std::mem::take(&mut self.inputs);
let mut inputs = std::mem::take(&mut self.inputs);

let dfs = if self.options.slice && self.options.slice_offset >= 0 {
if self.options.slice && self.options.slice_offset >= 0 {
let mut offset = self.options.slice_offset as usize;
let mut len = self.options.slice_len as usize;
let dfs = inputs
Expand Down Expand Up @@ -43,16 +43,27 @@ impl Executor for UnionExec {
})
.collect::<Result<Vec<_>>>()?;

dfs.into_iter().flatten().collect()
concat_df(dfs.iter().flatten())
} else {
POOL.install(|| {
// we don't use par_iter directly because the LP may also start threads for every LP (for instance scan_csv)
// this might then lead to a rayon SO. So we take a multitude of the threads to keep work stealing
// within bounds
let out = POOL.install(|| {
inputs
.into_par_iter()
.map(|mut input| input.execute(state))
.chunks_mut(POOL.current_num_threads() * 3)
.map(|chunk| {
chunk
.into_par_iter()
.map(|input| {
let mut input = std::mem::take(input);
input.execute(state)
})
.collect::<Result<Vec<_>>>()
})
.collect::<Result<Vec<_>>>()
})?
};
});

concat_df(&dfs)
concat_df(out?.iter().flat_map(|dfs| dfs.iter()))
}
}
}
13 changes: 13 additions & 0 deletions polars/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,16 @@ pub trait PhysicalPlanner {
pub trait Executor: Send + Sync {
fn execute(&mut self, cache: &ExecutionState) -> Result<DataFrame>;
}

pub struct Dummy {}
impl Executor for Dummy {
fn execute(&mut self, _cache: &ExecutionState) -> Result<DataFrame> {
panic!("should not get here");
}
}

impl Default for Box<dyn Executor> {
fn default() -> Self {
Box::new(Dummy {})
}
}

0 comments on commit f114082

Please sign in to comment.