Skip to content

Commit

Permalink
delay rechunk optimization (#3381)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 12, 2022
1 parent 2d83a82 commit 2045e1b
Show file tree
Hide file tree
Showing 9 changed files with 122 additions and 22 deletions.
48 changes: 29 additions & 19 deletions polars/polars-core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,37 @@ pub fn split_series(s: &Series, n: usize) -> Result<Vec<Series>> {
split_array!(s, n, i64)
}

fn flatten_df(df: &DataFrame) -> impl Iterator<Item = DataFrame> + '_ {
df.iter_chunks().map(|chunk| {
DataFrame::new_no_checks(
df.iter()
.zip(chunk.into_arrays())
.map(|(s, arr)| {
// Safety:
// datatypes are correct
unsafe {
Series::from_chunks_and_dtype_unchecked(s.name(), vec![arr], s.dtype())
}
})
.collect(),
)
})
}

#[cfg(feature = "private")]
#[doc(hidden)]
pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
let total_len = df.height();
let chunk_size = total_len / n;

if df.n_chunks()? == n
&& df.get_columns()[0]
.chunk_lengths()
.all(|len| len.abs_diff(chunk_size) < 100)
{
return Ok(flatten_df(df).collect());
}

let mut out = Vec::with_capacity(n);

for i in 0..n {
Expand All @@ -152,25 +178,9 @@ pub fn split_df(df: &DataFrame, n: usize) -> Result<Vec<DataFrame>> {
};
let df = df.slice((i * chunk_size) as i64, len);
if df.n_chunks()? > 1 {
let iter = df.iter_chunks().map(|chunk| {
DataFrame::new_no_checks(
df.iter()
.zip(chunk.into_arrays())
.map(|(s, arr)| {
// Safety:
// datatypes are correct
unsafe {
Series::from_chunks_and_dtype_unchecked(
s.name(),
vec![arr],
s.dtype(),
)
}
})
.collect(),
)
});
out.extend(iter)
// we add every chunk as separate dataframe. This make sure that every partition
// deals with it.
out.extend(flatten_df(&df))
} else {
out.push(df)
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ impl LazyFrame {
cache: args.cache,
with_columns: None,
row_count: args.row_count,
rechunk: args.rechunk,
};
let mut lf: LazyFrame = LogicalPlanBuilder::scan_ipc(path, options)?.build().into();
lf.opt_state.agg_scan_projection = true;
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use crate::prelude::{
};

use crate::logical_plan::FETCH_ROWS;
use crate::prelude::delay_rechunk::DelayRechunk;
use crate::utils::{combine_predicates_expr, expr_to_root_column_names};
use polars_arrow::prelude::QuantileInterpolOptions;
use polars_core::frame::explode::MeltArgs;
Expand Down Expand Up @@ -567,6 +568,7 @@ impl LazyFrame {
}
// make sure its before slice pushdown.
rules.push(Box::new(FastProjection {}));
rules.push(Box::new(DelayRechunk {}));

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
Expand Down
21 changes: 18 additions & 3 deletions polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ impl LazyFrame {
cache: bool,
parallel: bool,
row_count: Option<RowCount>,
rechunk: bool,
) -> Result<Self> {
let mut lf: LazyFrame =
LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel, row_count)?
LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel, row_count, rechunk)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
Expand All @@ -49,7 +50,14 @@ impl LazyFrame {
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{}", e).into()))?;
let path_string = path.to_string_lossy().into_owned();
Self::scan_parquet_impl(path_string, args.n_rows, args.cache, false, None)
Self::scan_parquet_impl(
path_string,
args.n_rows,
args.cache,
false,
None,
args.rechunk,
)
})
.collect::<Result<Vec<_>>>()?;

Expand All @@ -66,7 +74,14 @@ impl LazyFrame {
lf
})
} else {
Self::scan_parquet_impl(path, args.n_rows, args.cache, args.parallel, args.row_count)
Self::scan_parquet_impl(
path,
args.n_rows,
args.cache,
args.parallel,
args.row_count,
args.rechunk,
)
}
}
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl LogicalPlanBuilder {
cache: bool,
parallel: bool,
row_count: Option<RowCount>,
rechunk: bool,
) -> Result<Self> {
use polars_io::SerReader as _;

Expand All @@ -78,6 +79,7 @@ impl LogicalPlanBuilder {
cache,
parallel,
row_count,
rechunk,
},
}
.into())
Expand Down
65 changes: 65 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/delay_rechunk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
use crate::prelude::stack_opt::OptimizationRule;
use crate::prelude::*;
use polars_utils::arena::{Arena, Node};

pub(crate) struct DelayRechunk {}

impl OptimizationRule for DelayRechunk {
fn optimize_plan(
&mut self,
lp_arena: &mut Arena<ALogicalPlan>,
_expr_arena: &mut Arena<AExpr>,
node: Node,
) -> Option<ALogicalPlan> {
match lp_arena.get(node) {
// An aggregation can be partitioned, its wasteful to rechunk before that partition.
ALogicalPlan::Aggregate { input, .. } => {
use ALogicalPlan::*;
let mut input_node = None;
for (node, lp) in (&*lp_arena).iter(*input) {
match lp {
// we get the input node
#[cfg(feature = "parquet")]
ParquetScan { .. } => {
input_node = Some(node);
break;
}
#[cfg(feature = "csv-file")]
CsvScan { .. } => {
input_node = Some(node);
break;
}
#[cfg(feature = "ipc")]
IpcScan { .. } => {
input_node = Some(node);
break;
}

// don't delay rechunk if there is a join first
Join { .. } => break,
_ => {}
}
}

if let Some(node) = input_node {
match lp_arena.get_mut(node) {
#[cfg(feature = "csv-file")]
CsvScan { options, .. } => {
options.rechunk = false;
}
#[cfg(feature = "parquet")]
ParquetScan { options, .. } => options.rechunk = false,
#[cfg(feature = "ipc")]
IpcScan { options, .. } => {
options.rechunk = false;
}
_ => unreachable!(),
}
};

None
}
_ => None,
}
}
}
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use polars_core::{datatypes::PlHashMap, prelude::*};
pub(crate) mod aggregate_pushdown;
#[cfg(any(feature = "parquet", feature = "csv-file"))]
pub(crate) mod aggregate_scan_projections;
pub(crate) mod delay_rechunk;
pub(crate) mod drop_nulls;
pub(crate) mod fast_projection;
pub(crate) mod predicate_pushdown;
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ pub struct ParquetOptions {
pub(crate) with_columns: Option<Arc<Vec<String>>>,
pub(crate) cache: bool,
pub(crate) parallel: bool,
pub(crate) rechunk: bool,
pub(crate) row_count: Option<RowCount>,
}

Expand All @@ -42,6 +43,7 @@ pub struct IpcScanOptions {
pub with_columns: Option<Arc<Vec<String>>>,
pub cache: bool,
pub row_count: Option<RowCount>,
pub rechunk: bool,
}

#[derive(Clone, Debug, Copy, Default)]
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/executors/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl Executor for IpcExec {
let df = IpcReader::new(file)
.with_n_rows(n_rows)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.finish_with_scan_ops(
predicate,
aggregate,
Expand Down Expand Up @@ -158,6 +159,7 @@ impl Executor for ParquetExec {
.with_n_rows(n_rows)
.read_parallel(self.options.parallel)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.finish_with_scan_ops(
predicate,
aggregate,
Expand Down

0 comments on commit 2045e1b

Please sign in to comment.