Skip to content

Commit

Permalink
update schema at scan level
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 2, 2021
1 parent 646a47b commit 477bdbd
Show file tree
Hide file tree
Showing 9 changed files with 77 additions and 2 deletions.
22 changes: 20 additions & 2 deletions polars/polars-lazy/src/logical_plan/alp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,21 @@ pub enum ALogicalPlan {
#[cfg(feature = "csv-file")]
CsvScan {
path: PathBuf,
// schema of the complete file
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
options: CsvParserOptions,
predicate: Option<Node>,
aggregate: Vec<Node>,
},
#[cfg(feature = "parquet")]
ParquetScan {
path: PathBuf,
// schema of the complete file
schema: SchemaRef,
// schema of the projected file
output_schema: Option<SchemaRef>,
with_columns: Option<Vec<String>>,
predicate: Option<Node>,
aggregate: Vec<Node>,
Expand Down Expand Up @@ -129,11 +135,19 @@ impl ALogicalPlan {
Sort { input, .. } => arena.get(*input).schema(arena),
Explode { input, .. } => arena.get(*input).schema(arena),
#[cfg(feature = "parquet")]
ParquetScan { schema, .. } => schema,
ParquetScan {
schema,
output_schema,
..
} => output_schema.as_ref().unwrap_or(schema),
DataFrameScan { schema, .. } => schema,
Selection { input, .. } => arena.get(*input).schema(arena),
#[cfg(feature = "csv-file")]
CsvScan { schema, .. } => schema,
CsvScan {
schema,
output_schema,
..
} => output_schema.as_ref().unwrap_or(schema),
Projection { schema, .. } => schema,
LocalProjection { schema, .. } => schema,
Aggregate { schema, .. } => schema,
Expand Down Expand Up @@ -314,6 +328,7 @@ impl ALogicalPlan {
ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
stop_after_n_rows,
Expand All @@ -328,6 +343,7 @@ impl ALogicalPlan {
ParquetScan {
path: path.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
with_columns: with_columns.clone(),
predicate: new_predicate,
aggregate: exprs,
Expand All @@ -339,6 +355,7 @@ impl ALogicalPlan {
CsvScan {
path,
schema,
output_schema,
predicate,
options,
..
Expand All @@ -350,6 +367,7 @@ impl ALogicalPlan {
CsvScan {
path: path.clone(),
schema: schema.clone(),
output_schema: output_schema.clone(),
options: options.clone(),
predicate: new_predicate,
aggregate: exprs,
Expand Down
4 changes: 4 additions & 0 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@ pub(crate) fn to_alp(
} => ALogicalPlan::CsvScan {
path,
schema,
output_schema: None,
options,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
aggregate: aggregate
Expand All @@ -207,6 +208,7 @@ pub(crate) fn to_alp(
} => ALogicalPlan::ParquetScan {
path,
schema,
output_schema: None,
with_columns,
predicate: predicate.map(|expr| to_aexpr(expr, expr_arena)),
aggregate: aggregate
Expand Down Expand Up @@ -624,6 +626,7 @@ pub(crate) fn node_to_lp(
ALogicalPlan::CsvScan {
path,
schema,
output_schema: _,
options,
predicate,
aggregate,
Expand All @@ -638,6 +641,7 @@ pub(crate) fn node_to_lp(
ALogicalPlan::ParquetScan {
path,
schema,
output_schema: _,
with_columns,
predicate,
aggregate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl OptimizationRule for AggregatePushdown {
CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand All @@ -126,6 +127,7 @@ impl OptimizationRule for AggregatePushdown {
CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand All @@ -138,6 +140,7 @@ impl OptimizationRule for AggregatePushdown {
Some(ALogicalPlan::CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand All @@ -148,6 +151,7 @@ impl OptimizationRule for AggregatePushdown {
ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand All @@ -160,6 +164,7 @@ impl OptimizationRule for AggregatePushdown {
ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand All @@ -174,6 +179,7 @@ impl OptimizationRule for AggregatePushdown {
Some(ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ impl OptimizationRule for AggScanProjection {
if let ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
predicate,
aggregate,
with_columns,
Expand All @@ -112,6 +113,7 @@ impl OptimizationRule for AggScanProjection {
let lp = ALogicalPlan::ParquetScan {
path,
schema,
output_schema,
predicate,
aggregate,
with_columns,
Expand All @@ -125,6 +127,7 @@ impl OptimizationRule for AggScanProjection {
let lp = ALogicalPlan::ParquetScan {
path: path.clone(),
schema,
output_schema,
with_columns: new_with_columns,
predicate,
aggregate,
Expand All @@ -142,6 +145,7 @@ impl OptimizationRule for AggScanProjection {
if let ALogicalPlan::CsvScan {
path,
schema,
output_schema,
mut options,
predicate,
aggregate,
Expand All @@ -155,6 +159,7 @@ impl OptimizationRule for AggScanProjection {
let lp = ALogicalPlan::CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand All @@ -166,6 +171,7 @@ impl OptimizationRule for AggScanProjection {
let lp = ALogicalPlan::CsvScan {
path: path.clone(),
schema,
output_schema,
options: options.clone(),
predicate,
aggregate,
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/join_pruning.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ fn combine_lp_nodes(
#[cfg(feature = "csv-file")]
(CsvScan {path: path_l,
schema,
output_schema,
options: options_l,
predicate,
aggregate,
Expand All @@ -138,6 +139,7 @@ fn combine_lp_nodes(
Some(CsvScan {
path,
schema: schema.clone(),
output_schema: output_schema.clone(),
options: options_l,
predicate: *predicate,
aggregate: aggregate.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ impl PredicatePushDown {
ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand All @@ -191,6 +192,7 @@ impl PredicatePushDown {
let lp = ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand All @@ -203,6 +205,7 @@ impl PredicatePushDown {
CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand All @@ -212,6 +215,7 @@ impl PredicatePushDown {
let lp = CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,20 @@ impl ProjectionPushDown {
..
} => {
let with_columns = get_scan_columns(&mut acc_projections, expr_arena);
let output_schema = if with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&*schema,
)?))
};

let lp = ParquetScan {
path,
schema,
output_schema,
with_columns,
predicate,
aggregate,
Expand All @@ -343,9 +354,20 @@ impl ProjectionPushDown {
} => {
options.with_columns = get_scan_columns(&mut acc_projections, expr_arena);

let output_schema = if options.with_columns.is_none() {
None
} else {
Some(Arc::new(update_scan_schema(
&acc_projections,
expr_arena,
&*schema,
)?))
};

let lp = CsvScan {
path,
schema,
output_schema,
options,
predicate,
aggregate,
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ impl DefaultPlanner {
CsvScan {
path,
schema,
output_schema: _,
options,
predicate,
aggregate,
Expand All @@ -141,6 +142,7 @@ impl DefaultPlanner {
ParquetScan {
path,
schema,
output_schema: _,
with_columns,
predicate,
aggregate,
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-lazy/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,6 +2029,11 @@ fn test_binary_expr() -> Result<()> {
fn test_drop_and_select() -> Result<()> {
let df = fruits_cars();

// we test that the schema is still correct for drop to work.
// typically the projection is pushed to before the drop and then the drop may think that some
// columns are still there to be projected

// we test this on both dataframe scan and csv scan.
let out = df
.lazy()
.drop_columns(["A", "B"])
Expand All @@ -2037,5 +2042,11 @@ fn test_drop_and_select() -> Result<()> {

assert_eq!(out.get_column_names(), &["fruits"]);

let out = scan_foods_csv()
.drop_columns(["calories", "sugar_g"])
.select([col("category")])
.collect()?;

assert_eq!(out.get_column_names(), &["category"]);
Ok(())
}

0 comments on commit 477bdbd

Please sign in to comment.