Skip to content

Commit

Permalink
don't let slice pushdown happen on predicate at scan
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 19, 2022
1 parent a309e4b commit 24bdcbc
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 9 deletions.
3 changes: 1 addition & 2 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,6 @@ mod test {
use crate::prelude::*;
use crate::utils::split_ca;
use num::traits::FloatConst;
use polars_arrow::prelude::QuantileInterpolOptions;

#[test]
#[cfg(feature = "dtype-date")]
Expand Down Expand Up @@ -1553,7 +1552,7 @@ mod test {

df.try_apply("g", |s| s.cast(&DataType::Categorical))?;

let out = df.groupby(["g"])?.sum()?;
let _ = df.groupby(["g"])?.sum()?;
Ok(())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,10 @@ impl SlicePushDown {
aggregate,
options,

}, Some(state)) if state.offset == 0 => {
},
// TODO! we currently skip slice pushdown if there is a predicate.
// we can modify the readers to only limit after predicates have been applied
Some(state)) if state.offset == 0 && predicate.is_none() => {
let mut options = options;
options.n_rows = Some(state.len as usize);
let lp = ParquetScan {
Expand All @@ -126,7 +129,7 @@ impl SlicePushDown {
predicate,
aggregate,
options
}, Some(state)) if state.offset == 0 => {
}, Some(state)) if state.offset == 0 && predicate.is_none() => {
let mut options = options;
options.n_rows = Some(state.len as usize);
let lp = IpcScan {
Expand All @@ -150,7 +153,7 @@ impl SlicePushDown {
options,
predicate,
aggregate,
}, Some(state)) if state.offset >= 0 => {
}, Some(state)) if state.offset >= 0 && predicate.is_none() => {
let mut options = options;
options.skip_rows = state.offset as usize;
options.n_rows = Some(state.len as usize);
Expand Down
49 changes: 49 additions & 0 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,3 +273,52 @@ fn test_union_and_agg_projections() -> Result<()> {

Ok(())
}

#[test]
#[cfg(all(feature = "ipc", feature = "csv-file"))]
fn test_slice_filter() -> Result<()> {
init_files();
let _guard = PARQUET_IO_LOCK.lock().unwrap();

// make sure that the slices are not applied before the predicates.
let len = 5;
let offset = 3;

let df1 = scan_foods_csv()
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;
let df2 = scan_foods_parquet(false)
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;
let df3 = scan_foods_ipc()
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;

let df1_ = scan_foods_csv()
.collect()?
.lazy()
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;
let df2_ = scan_foods_parquet(false)
.collect()?
.lazy()
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;
let df3_ = scan_foods_ipc()
.collect()?
.lazy()
.filter(col("category").eq(lit("fruit")))
.slice(offset, len)
.collect()?;

assert_eq!(df1.shape(), df1_.shape());
assert_eq!(df2.shape(), df2_.shape());
assert_eq!(df3.shape(), df3_.shape());

Ok(())
}
14 changes: 10 additions & 4 deletions polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@ use std::iter::FromIterator;
static GLOB_PARQUET: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.parquet";
static GLOB_CSV: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.csv";
static GLOB_IPC: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/*.ipc";
static FOODS_CSV: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
static FOODS_IPC: &str = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.ipc";
static FOODS_PARQUET: &str =
"../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.parquet";

fn scan_foods_csv() -> LazyFrame {
let path = "../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv";
LazyCsvReader::new(path.to_string()).finish().unwrap()
LazyCsvReader::new(FOODS_CSV.to_string()).finish().unwrap()
}

fn scan_foods_ipc() -> LazyFrame {
LazyFrame::scan_ipc(FOODS_IPC.to_string(), Default::default()).unwrap()
}

fn init_files() {
Expand Down Expand Up @@ -61,8 +68,7 @@ fn init_files() {
#[cfg(feature = "parquet")]
fn scan_foods_parquet(parallel: bool) -> LazyFrame {
init_files();
let out_path =
"../../examples/aggregate_multiple_files_in_chunks/datasets/foods1.parquet".into();
let out_path = FOODS_PARQUET.to_string();

let args = ScanArgsParquet {
n_rows: None,
Expand Down

0 comments on commit 24bdcbc

Please sign in to comment.