Skip to content

Commit

Permalink
rechunk on default sort and groupby (#3354)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 10, 2022
1 parent 575bc4c commit bfbb7a7
Show file tree
Hide file tree
Showing 5 changed files with 14 additions and 7 deletions.
8 changes: 5 additions & 3 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,9 @@ impl DataFrame {
/// Aggregate all the chunks in the DataFrame to a single chunk in parallel.
/// This may lead to more peak memory consumption.
pub fn as_single_chunk_par(&mut self) -> &mut Self {
self.columns = self.apply_columns_par(&|s| s.rechunk());
if self.columns.iter().any(|s| s.n_chunks() > 1) {
self.columns = self.apply_columns_par(&|s| s.rechunk());
}
self
}

Expand Down Expand Up @@ -1677,7 +1679,7 @@ impl DataFrame {
reverse: impl IntoVec<bool>,
) -> Result<&mut Self> {
// a lot of indirection in both sorting and take
self.rechunk();
self.as_single_chunk_par();
let by_column = self.select_series(by_column)?;
let reverse = reverse.into_vec();
self.columns = self.sort_impl(by_column, reverse, false, None)?.columns;
Expand Down Expand Up @@ -1772,7 +1774,7 @@ impl DataFrame {
pub fn sort_with_options(&self, by_column: &str, options: SortOptions) -> Result<Self> {
let mut df = self.clone();
// a lot of indirection in both sorting and take
df.rechunk();
df.as_single_chunk_par();
let by_column = vec![df.column(by_column)?.clone()];
let reverse = vec![options.descending];
df.columns = df
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,15 @@ impl GroupByExec {
}

pub(super) fn groupby_helper(
df: DataFrame,
mut df: DataFrame,
keys: Vec<Series>,
aggs: &[Arc<dyn PhysicalExpr>],
apply: Option<&Arc<dyn DataFrameUdf>>,
state: &ExecutionState,
maintain_order: bool,
slice: Option<(i64, usize)>,
) -> Result<DataFrame> {
df.as_single_chunk_par();
let gb = df.groupby_with_series(keys, true, maintain_order)?;

if let Some(f) = apply {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ impl Executor for GroupByDynamicExec {

#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();
state.set_schema(self.input_schema.clone());
let keys = self
.keys
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ impl Executor for GroupByRollingExec {

#[cfg(feature = "dynamic_groupby")]
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();
state.set_schema(self.input_schema.clone());

let keys = self
Expand Down
4 changes: 3 additions & 1 deletion polars/polars-lazy/src/physical_plan/executors/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ pub(crate) struct SortExec {

impl Executor for SortExec {
fn execute(&mut self, state: &ExecutionState) -> Result<DataFrame> {
let df = self.input.execute(state)?;
let mut df = self.input.execute(state)?;
df.as_single_chunk_par();

let by_columns = self
.by_column
Expand All @@ -28,6 +29,7 @@ impl Executor for SortExec {
Ok(s)
})
.collect::<Result<Vec<_>>>()?;

df.sort_impl(
by_columns,
std::mem::take(&mut self.args.reverse),
Expand Down

0 comments on commit bfbb7a7

Please sign in to comment.