Skip to content

Commit

Permalink
fix scan projection pushdown of same file (#2600)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 10, 2022
1 parent 9d721da commit aa75584
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,73 @@ use crate::logical_plan::optimizer::stack_opt::OptimizationRule;
use crate::logical_plan::ALogicalPlanBuilder;
use crate::prelude::*;
use polars_core::datatypes::{PlHashMap, PlHashSet};
use polars_core::prelude::Schema;
use std::path::{Path, PathBuf};
use std::sync::Arc;

fn process_with_columns(
path: &Path,
with_columns: &Option<Vec<String>>,
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
schema: &Schema,
) {
if let Some(with_columns) = &with_columns {
let cols = columns
.entry(path.to_owned())
.or_insert_with(PlHashSet::new);
cols.extend(with_columns.iter().enumerate().map(|t| (t.0, t.1.clone())));
let cols = columns
.entry(path.to_owned())
.or_insert_with(PlHashSet::new);

match with_columns {
// add only the projected columns
Some(with_columns) => {
cols.extend(with_columns.iter().enumerate().map(|t| (t.0, t.1.clone())));
}
// no projection, so we must take all columns
None => {
cols.extend(
schema
.fields()
.iter()
.enumerate()
.map(|t| (t.0, t.1.name().to_string())),
);
}
}
}

/// Aggregate all the projections in an LP
pub(crate) fn agg_projection(
root: Node,
// The hashmap maps files to a hashset over column names. (There is a usize to be able to sort them later)
columns: &mut PlHashMap<PathBuf, PlHashSet<(usize, String)>>,
lp_arena: &Arena<ALogicalPlan>,
) {
use ALogicalPlan::*;
match lp_arena.get(root) {
#[cfg(feature = "csv-file")]
CsvScan { path, options, .. } => {
process_with_columns(path, &options.with_columns, columns);
CsvScan {
path,
options,
schema,
..
} => {
process_with_columns(path, &options.with_columns, columns, schema);
}
#[cfg(feature = "parquet")]
ParquetScan { path, options, .. } => {
process_with_columns(path, &options.with_columns, columns);
ParquetScan {
path,
options,
schema,
..
} => {
process_with_columns(path, &options.with_columns, columns, schema);
}
#[cfg(feature = "ipc")]
IpcScan { path, options, .. } => {
process_with_columns(path, &options.with_columns, columns);
IpcScan {
path,
options,
schema,
..
} => {
process_with_columns(path, &options.with_columns, columns, schema);
}
DataFrameScan { .. } => (),
lp => {
Expand Down
26 changes: 25 additions & 1 deletion polars/polars-lazy/src/tests/projection_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,31 @@ fn test_row_count_pd() -> Result<()> {
.select([col("row_count"), col("x") * lit(3i32)])
.collect()?;

dbg!(df);
let expected = df![
"row_count" => [0u32, 1, 2],
"x" => [3i32, 6, 9]
]?;

assert!(df.frame_equal(&expected));

Ok(())
}

#[test]
fn scan_join_same_file() -> Result<()> {
let lf = LazyCsvReader::new(FOODS_CSV.to_string()).finish()?;

let partial = lf.clone().select([col("category")]).limit(5);
let lf = lf.join(
partial,
[col("category")],
[col("category")],
JoinType::Inner,
);
let out = lf.collect()?;
assert_eq!(
out.get_column_names(),
&["category", "calories", "fats_g", "sugars_g"]
);
Ok(())
}

0 comments on commit aa75584

Please sign in to comment.