Skip to content

Commit

Permalink
parallel dispatch in lazy scanner
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 22, 2021
1 parent af67445 commit 83c7d80
Show file tree
Hide file tree
Showing 14 changed files with 91 additions and 105 deletions.
9 changes: 7 additions & 2 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,13 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
#[cfg(feature = "parquet")]
pub fn scan_parquet(path: String, n_rows: Option<usize>, cache: bool) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, n_rows, cache)?
pub fn scan_parquet(
path: String,
n_rows: Option<usize>,
cache: bool,
parallel: bool,
) -> Result<Self> {
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel)?
.build()
.into();
lf.opt_state.agg_scan_projection = true;
Expand Down
14 changes: 5 additions & 9 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
#[cfg(feature = "ipc")]
use crate::logical_plan::IpcOptions;
#[cfg(feature = "parquet")]
use crate::logical_plan::ParquetOptions;
use crate::logical_plan::{det_melt_schema, Context, CsvParserOptions};
use crate::prelude::*;
use crate::utils::{aexprs_to_schema, PushNode};
Expand Down Expand Up @@ -58,11 +60,9 @@ pub enum ALogicalPlan {
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
with_columns: Option<Vec<String>>,
predicate: Option<Node>,
aggregate: Vec<Node>,
n_rows: Option<usize>,
cache: bool,
options: ParquetOptions,
},
DataFrameScan {
df: Arc<DataFrame>,
Expand Down Expand Up @@ -384,10 +384,8 @@ impl ALogicalPlan {
path,
schema,
output_schema,
with_columns,
predicate,
n_rows,
cache,
options,
..
} => {
let mut new_predicate = None;
Expand All @@ -399,11 +397,9 @@ impl ALogicalPlan {
path: path.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
with_columns: with_columns.clone(),
predicate: new_predicate,
aggregate: exprs,
n_rows: *n_rows,
cache: *cache,
options: options.clone(),
}
}
#[cfg(feature = "csv-file")]
Expand Down
16 changes: 4 additions & 12 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,19 @@ pub(crate) fn to_alp(
LogicalPlan::ParquetScan {
path,
schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
} => ALogicalPlan::ParquetScan {
path,
schema,
output_schema: None,
with_columns,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
aggregate: aggregate
.into_iter()
.map(|expr| to_aexpr(expr, expr_arena))
.collect(),
n_rows,
cache,
options,
},
LogicalPlan::DataFrameScan {
df,
Expand Down Expand Up @@ -704,19 +700,15 @@ pub(crate) fn node_to_lp(
path,
schema,
output_schema: _,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
} => LogicalPlan::ParquetScan {
path,
schema,
with_columns,
predicate: predicate.map(|n| node_to_exp(n, expr_arena)),
aggregate: nodes_to_exprs(&aggregate, expr_arena),
n_rows,
cache,
options,
},
ALogicalPlan::DataFrameScan {
df,
Expand Down
30 changes: 20 additions & 10 deletions polars/polars-lazy/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,14 @@ pub struct CsvParserOptions {
pub(crate) cache: bool,
pub(crate) null_values: Option<NullValues>,
}
#[cfg(feature = "parquet")]
#[derive(Clone, Debug)]
pub struct ParquetOptions {
pub(crate) n_rows: Option<usize>,
pub(crate) with_columns: Option<Vec<String>>,
pub(crate) cache: bool,
pub(crate) parallel: bool,
}

// https://stackoverflow.com/questions/1031076/what-are-projection-and-selection
#[derive(Clone)]
Expand Down Expand Up @@ -191,11 +199,9 @@ pub enum LogicalPlan {
ParquetScan {
path: PathBuf,
schema: SchemaRef,
with_columns: Option<Vec<String>>,
predicate: Option<Expr>,
aggregate: Vec<Expr>,
n_rows: Option<usize>,
cache: bool,
options: ParquetOptions,
},
#[cfg(feature = "ipc")]
#[cfg_attr(docsrs, doc(cfg(feature = "ipc")))]
Expand Down Expand Up @@ -320,13 +326,13 @@ impl fmt::Debug for LogicalPlan {
ParquetScan {
path,
schema,
with_columns,
predicate,
options,
..
} => {
let total_columns = schema.fields().len();
let mut n_columns = "*".to_string();
if let Some(columns) = with_columns {
if let Some(columns) = &options.with_columns {
n_columns = format!("{}", columns.len());
}
write!(
Expand Down Expand Up @@ -681,13 +687,13 @@ impl LogicalPlan {
ParquetScan {
path,
schema,
with_columns,
predicate,
options,
..
} => {
let total_columns = schema.fields().len();
let mut n_columns = "*".to_string();
if let Some(columns) = with_columns {
if let Some(columns) = &options.with_columns {
n_columns = format!("{}", columns.len());
}

Expand Down Expand Up @@ -823,6 +829,7 @@ impl LogicalPlanBuilder {
path: P,
n_rows: Option<usize>,
cache: bool,
parallel: bool,
) -> Result<Self> {
use polars_io::SerReader as _;

Expand All @@ -833,11 +840,14 @@ impl LogicalPlanBuilder {
Ok(LogicalPlan::ParquetScan {
path,
schema,
n_rows,
with_columns: None,
predicate: None,
aggregate: vec![],
cache,
options: ParquetOptions {
n_rows,
with_columns: None,
cache,
parallel,
},
}
.into())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,11 +152,9 @@ impl OptimizationRule for AggregatePushdown {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
} => match self.accumulated_projections.is_empty() {
true => {
lp_arena.replace(
Expand All @@ -165,11 +163,9 @@ impl OptimizationRule for AggregatePushdown {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
},
);
None
Expand All @@ -180,11 +176,9 @@ impl OptimizationRule for AggregatePushdown {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
})
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@ pub(crate) fn agg_projection(
process_with_columns(path, &options.with_columns, columns);
}
#[cfg(feature = "parquet")]
ParquetScan {
path, with_columns, ..
} => {
process_with_columns(path, with_columns, columns);
ParquetScan { path, options, .. } => {
process_with_columns(path, &options.with_columns, columns);
}
#[cfg(feature = "ipc")]
IpcScan { path, options, .. } => {
Expand Down Expand Up @@ -106,12 +104,12 @@ impl OptimizationRule for AggScanProjection {
mut options,
} = lp
{
let new_with_columns = self
let with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
// prevent infinite loop
if options.with_columns == new_with_columns {
if options.with_columns == with_columns {
let lp = ALogicalPlan::IpcScan {
path,
schema,
Expand All @@ -124,17 +122,16 @@ impl OptimizationRule for AggScanProjection {
return None;
}

let with_columns = std::mem::take(&mut options.with_columns);
options.with_columns = new_with_columns;
options.with_columns = with_columns;
let lp = ALogicalPlan::IpcScan {
path: path.clone(),
schema,
output_schema,
predicate,
aggregate,
options,
options: options.clone(),
};
Some(self.finish_rewrite(lp, expr_arena, lp_arena, &path, with_columns))
Some(self.finish_rewrite(lp, expr_arena, lp_arena, &path, options.with_columns))
} else {
unreachable!()
}
Expand All @@ -148,40 +145,35 @@ impl OptimizationRule for AggScanProjection {
output_schema,
predicate,
aggregate,
with_columns,
n_rows,
cache,
mut options,
} = lp
{
let new_with_columns = self
let mut with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
// prevent infinite loop
if with_columns == new_with_columns {
if options.with_columns == with_columns {
let lp = ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
predicate,
aggregate,
with_columns,
n_rows,
cache,
options,
};
lp_arena.replace(node, lp);
return None;
}
std::mem::swap(&mut options.with_columns, &mut with_columns);

let lp = ALogicalPlan::ParquetScan {
path: path.clone(),
schema,
output_schema,
with_columns: new_with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
};
Some(self.finish_rewrite(lp, expr_arena, lp_arena, &path, with_columns))
} else {
Expand All @@ -200,11 +192,11 @@ impl OptimizationRule for AggScanProjection {
aggregate,
} = lp
{
let new_with_columns = self
let with_columns = self
.columns
.get(&path)
.map(|agg| agg.iter().cloned().collect());
if options.with_columns == new_with_columns {
if options.with_columns == with_columns {
let lp = ALogicalPlan::CsvScan {
path,
schema,
Expand All @@ -216,7 +208,7 @@ impl OptimizationRule for AggScanProjection {
lp_arena.replace(node, lp);
return None;
}
options.with_columns = new_with_columns;
options.with_columns = with_columns;
let lp = ALogicalPlan::CsvScan {
path: path.clone(),
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,23 +204,19 @@ impl PredicatePushDown {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
} => {
let predicate = predicate_at_scan(acc_predicates, predicate, expr_arena);

let lp = ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
n_rows,
cache,
options,
};
Ok(lp)
}
Expand Down

0 comments on commit 83c7d80

Please sign in to comment.