Skip to content

Commit

Permalink
slice pushdown optimization
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 5, 2022
1 parent 3b3004f commit 73e5a33
Show file tree
Hide file tree
Showing 20 changed files with 401 additions and 42 deletions.
11 changes: 10 additions & 1 deletion polars/polars-lazy/src/frame/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -203,20 +203,29 @@ impl<'a> LazyCsvReader<'a> {
if self.path.contains('*') {
let paths = glob::glob(&self.path)
.map_err(|_| PolarsError::ValueError("invalid glob pattern given".into()))?;

let lfs = paths
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let path_string = path.to_string_lossy().into_owned();
let mut builder = self.clone();
builder.path = path_string;
builder.skip_rows = 0;
builder.n_rows = None;
// do no rechunk yet.
builder.rechunk = false;
builder.finish_impl()
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, self.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
if self.skip_rows != 0 || self.n_rows.is_some() {
lf.slice(self.skip_rows as i64, self.n_rows.unwrap() as u32)
} else {
lf
}
})
} else {
self.finish_impl()
}
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-lazy/src/frame/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,21 @@ impl LazyFrame {
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let path_string = path.to_string_lossy().into_owned();
let mut args = args;
args.n_rows = None;
Self::scan_ipc_impl(path_string, args)
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
if let Some(n_rows) = args.n_rows {
lf.slice(0, n_rows as u32)
} else {
lf
}
})
} else {
Self::scan_ipc_impl(path, args)
}
Expand Down
33 changes: 22 additions & 11 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use crate::prelude::aggregate_scan_projections::agg_projection;
use crate::prelude::drop_nulls::ReplaceDropNulls;
use crate::prelude::fast_projection::FastProjection;
use crate::prelude::simplify_expr::SimplifyBooleanRule;
use crate::prelude::slice_pushdown::SlicePushDown;
use crate::utils::{combine_predicates_expr, expr_to_root_column_names};
use crate::{logical_plan::FETCH_ROWS, prelude::*};
use polars_arrow::prelude::QuantileInterpolOptions;
Expand Down Expand Up @@ -104,7 +105,7 @@ pub struct OptState {
pub agg_scan_projection: bool,
pub aggregate_pushdown: bool,
pub global_string_cache: bool,
pub join_pruning: bool,
pub slice_pushdown: bool,
}

impl Default for OptState {
Expand All @@ -115,7 +116,7 @@ impl Default for OptState {
type_coercion: true,
simplify_expr: true,
global_string_cache: true,
join_pruning: true,
slice_pushdown: true,
// will be toggled by a scan operation such as csv scan or parquet scan
agg_scan_projection: false,
aggregate_pushdown: false,
Expand Down Expand Up @@ -192,9 +193,9 @@ impl LazyFrame {
self
}

/// Toggle join pruning optimization
pub fn with_join_pruning(mut self, toggle: bool) -> Self {
self.opt_state.join_pruning = toggle;
/// Toggle slice pushdown optimization
pub fn with_slice_pushdown(mut self, toggle: bool) -> Self {
self.opt_state.slice_pushdown = toggle;
self
}

Expand Down Expand Up @@ -399,6 +400,7 @@ impl LazyFrame {
let projection_pushdown = self.opt_state.projection_pushdown;
let type_coercion = self.opt_state.type_coercion;
let simplify_expr = self.opt_state.simplify_expr;
let slice_pushdown = self.opt_state.slice_pushdown;

#[cfg(any(feature = "parquet", feature = "csv-file"))]
let agg_scan_projection = self.opt_state.agg_scan_projection;
Expand All @@ -409,16 +411,14 @@ impl LazyFrame {
// gradually fill the rules passed to the optimizer
let mut rules: Vec<Box<dyn OptimizationRule>> = Vec::with_capacity(8);

let predicate_pushdown_opt = PredicatePushDown::default();
let projection_pushdown_opt = ProjectionPushDown {};

// during debug we check if the optimizations have not modified the final schema
#[cfg(debug_assertions)]
let prev_schema = logical_plan.schema().clone();

let mut lp_top = to_alp(logical_plan, expr_arena, lp_arena);

if projection_pushdown {
let projection_pushdown_opt = ProjectionPushDown {};
let alp = lp_arena.take(lp_top);
let alp = projection_pushdown_opt
.optimize(alp, lp_arena, expr_arena)
Expand All @@ -427,13 +427,24 @@ impl LazyFrame {
}

if predicate_pushdown {
let predicate_pushdown_opt = PredicatePushDown::default();
let alp = lp_arena.take(lp_top);
let alp = predicate_pushdown_opt
.optimize(alp, lp_arena, expr_arena)
.expect("predicate pushdown failed");
lp_arena.replace(lp_top, alp);
}

if slice_pushdown {
let slice_pushdown_opt = SlicePushDown {};
let alp = lp_arena.take(lp_top);
let alp = slice_pushdown_opt
.optimize(alp, lp_arena, expr_arena)
.expect("slice pushdown failed");

lp_arena.replace(lp_top, alp);
}

if type_coercion {
rules.push(Box::new(TypeCoercionRule {}))
}
Expand Down Expand Up @@ -852,7 +863,7 @@ impl LazyFrame {
}

/// Slice the DataFrame.
pub fn slice(self, offset: i64, len: usize) -> LazyFrame {
pub fn slice(self, offset: i64, len: u32) -> LazyFrame {
let opt_state = self.get_opt_state();
let lp = self.get_plan_builder().slice(offset, len).build();
Self::from_logical_plan(lp, opt_state)
Expand All @@ -869,7 +880,7 @@ impl LazyFrame {
}

/// Get the n last rows
pub fn tail(self, n: usize) -> LazyFrame {
pub fn tail(self, n: u32) -> LazyFrame {
let neg_tail = -(n as i64);
self.slice(neg_tail, n)
}
Expand All @@ -886,7 +897,7 @@ impl LazyFrame {

/// Limit the DataFrame to the first `n` rows. Note if you don't want the rows to be scanned,
/// use [fetch](LazyFrame::fetch).
pub fn limit(self, n: usize) -> LazyFrame {
pub fn limit(self, n: u32) -> LazyFrame {
self.slice(0, n)
}

Expand Down
9 changes: 8 additions & 1 deletion polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,19 @@ impl LazyFrame {
.map(|r| {
let path = r.map_err(|e| PolarsError::ComputeError(format!("{e}").into()))?;
let path_string = path.to_string_lossy().into_owned();
Self::scan_parquet_impl(path_string, args.n_rows, args.cache, false)
Self::scan_parquet_impl(path_string, None, args.cache, false)
})
.collect::<Result<Vec<_>>>()?;

concat(&lfs, args.rechunk)
.map_err(|_| PolarsError::ComputeError("no matching files found".into()))
.map(|lf| {
if let Some(n_rows) = args.n_rows {
lf.slice(0, n_rows as u32)
} else {
lf
}
})
} else {
Self::scan_parquet_impl(path, args.n_rows, args.cache, args.parallel)
}
Expand Down
5 changes: 4 additions & 1 deletion polars/polars-lazy/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, rechunk: bool) -> Result<LazyFra
lps.push(lp)
}

let lp = LogicalPlan::Union { inputs: lps };
let lp = LogicalPlan::Union {
inputs: lps,
options: Default::default(),
};
let mut lf = LazyFrame::from(lp);
lf.opt_state = opt_state;

Expand Down
8 changes: 6 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ pub enum ALogicalPlan {
Slice {
input: Node,
offset: i64,
len: usize,
len: u32,
},
Selection {
input: Node,
Expand Down Expand Up @@ -130,6 +130,7 @@ pub enum ALogicalPlan {
},
Union {
inputs: Vec<Node>,
options: UnionOptions,
},
}

Expand Down Expand Up @@ -194,7 +195,10 @@ impl ALogicalPlan {
use ALogicalPlan::*;

match self {
Union { .. } => Union { inputs },
Union { options, .. } => Union {
inputs,
options: *options,
},
Melt {
id_vars,
value_vars,
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,7 @@ impl LogicalPlanBuilder {
.into()
}

pub fn slice(self, offset: i64, len: usize) -> Self {
pub fn slice(self, offset: i64, len: u32) -> Self {
LogicalPlan::Slice {
input: Box::new(self.0),
offset,
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,12 @@ pub(crate) fn to_alp(
lp_arena: &mut Arena<ALogicalPlan>,
) -> Node {
let v = match lp {
LogicalPlan::Union { inputs } => {
LogicalPlan::Union { inputs, options } => {
let inputs = inputs
.into_iter()
.map(|lp| to_alp(lp, expr_arena, lp_arena))
.collect();
ALogicalPlan::Union { inputs }
ALogicalPlan::Union { inputs, options }
}
LogicalPlan::Selection { input, predicate } => {
let i = to_alp(*input, expr_arena, lp_arena);
Expand Down Expand Up @@ -642,12 +642,12 @@ pub(crate) fn node_to_lp(
let lp = std::mem::take(lp);

match lp {
ALogicalPlan::Union { inputs } => {
ALogicalPlan::Union { inputs, options } => {
let inputs = inputs
.into_iter()
.map(|node| node_to_lp(node, expr_arena, lp_arena))
.collect();
LogicalPlan::Union { inputs }
LogicalPlan::Union { inputs, options }
}
ALogicalPlan::Slice { input, offset, len } => {
let lp = node_to_lp(input, expr_arena, lp_arena);
Expand Down
11 changes: 5 additions & 6 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ mod format;
pub(crate) mod iterator;
mod lit;
pub(crate) mod optimizer;
mod options;
mod projection;
mod scans;

use polars_core::frame::groupby::DynamicGroupOptions;

pub(crate) use apply::*;
pub(crate) use builder::*;
pub use lit::*;
pub(crate) use scans::*;
pub(crate) use options::*;

// Will be set/ unset in the fetch operation to communicate overwriting the number of rows to scan.
thread_local! {pub(crate) static FETCH_ROWS: Cell<Option<usize>> = Cell::new(None)}
Expand All @@ -47,9 +47,7 @@ pub enum LogicalPlan {
predicate: Expr,
},
/// Cache the input at this point in the LP
Cache {
input: Box<LogicalPlan>,
},
Cache { input: Box<LogicalPlan> },
/// Scan a CSV file
#[cfg(feature = "csv-file")]
CsvScan {
Expand Down Expand Up @@ -147,7 +145,7 @@ pub enum LogicalPlan {
Slice {
input: Box<LogicalPlan>,
offset: i64,
len: usize,
len: u32,
},
/// A Melt operation
Melt {
Expand All @@ -168,6 +166,7 @@ pub enum LogicalPlan {
},
Union {
inputs: Vec<LogicalPlan>,
options: UnionOptions,
},
}

Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod fast_projection;
pub(crate) mod predicate_pushdown;
pub(crate) mod projection_pushdown;
pub(crate) mod simplify_expr;
pub(crate) mod slice_pushdown;
pub(crate) mod stack_opt;
pub(crate) mod type_coercion;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl PredicatePushDown {
maintain_order,
dynamic_options,
} => {
// start with a new empty predicate aggregator
self.pushdown_and_assign(input, optimizer::init_hashmap(), lp_arena, expr_arena)?;

// dont push down predicates. An aggregation needs all rows
Expand Down

0 comments on commit 73e5a33

Please sign in to comment.