Skip to content

Commit

Permalink
fix row count file projection pushdown (#3635)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 9, 2022
1 parent 9adb25f commit 82d6134
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 42 deletions.
52 changes: 32 additions & 20 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1126,6 +1126,7 @@ impl LazyFrame {
/// This can have a negative effect on query performance.
/// This may for instance block predicate pushdown optimization.
pub fn with_row_count(mut self, name: &str, offset: Option<IdxSize>) -> LazyFrame {
let mut add_row_count_in_map = false;
match &mut self.logical_plan {
// Do the row count at scan
#[cfg(feature = "csv-file")]
Expand All @@ -1134,44 +1135,55 @@ impl LazyFrame {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
#[cfg(feature = "ipc")]
LogicalPlan::IpcScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
#[cfg(feature = "parquet")]
LogicalPlan::ParquetScan { options, .. } => {
options.row_count = Some(RowCount {
name: name.to_string(),
offset: offset.unwrap_or(0),
});
self
}
_ => {
let new_schema = self
.schema()
.insert_index(0, name.to_string(), IDX_DTYPE)
.unwrap();
let name = name.to_owned();

let opt = AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
};
self.map(
move |df: DataFrame| df.with_row_count(&name, offset),
Some(opt),
Some(new_schema),
Some("WITH ROW COUNT"),
)
add_row_count_in_map = true;
}
}

let new_schema = self
.schema()
.insert_index(0, name.to_string(), IDX_DTYPE)
.unwrap();
let name = name.to_owned();

// if we do the row count at scan we add a dummy map, to update the schema
let opt = if add_row_count_in_map {
AllowedOptimizations {
slice_pushdown: false,
predicate_pushdown: false,
..Default::default()
}
} else {
AllowedOptimizations::default()
};

self.map(
move |df: DataFrame| {
if add_row_count_in_map {
df.with_row_count(&name, offset)
} else {
Ok(df)
}
},
Some(opt),
Some(new_schema),
Some("WITH ROW COUNT"),
)
}

/// Unnest the given `Struct` columns. This means that the fields of the `Struct` type will be
Expand Down
35 changes: 13 additions & 22 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ fn skip_rows_and_slice() -> Result<()> {
}

#[test]
fn test_row_count() -> Result<()> {
fn test_row_count_on_files() -> Result<()> {
let _guard = SINGLE_LOCK.lock().unwrap();
for offset in [0 as IdxSize, 10] {
let lf = LazyCsvReader::new(FOODS_CSV.to_string())
Expand All @@ -351,16 +351,8 @@ fn test_row_count() -> Result<()> {
(offset..27 + offset).collect::<Vec<_>>()
);

let lf = LazyFrame::scan_parquet(
FOODS_PARQUET.to_string(),
ScanArgsParquet {
row_count: Some(RowCount {
name: "rc".into(),
offset,
}),
..Default::default()
},
)?;
let lf = LazyFrame::scan_parquet(FOODS_PARQUET.to_string(), Default::default())?
.with_row_count("rc", Some(offset));
assert!(row_count_at_scan(lf.clone()));
let df = lf.collect()?;
let rc = df.column("rc")?;
Expand All @@ -369,24 +361,23 @@ fn test_row_count() -> Result<()> {
(offset..27 + offset).collect::<Vec<_>>()
);

let lf = LazyFrame::scan_ipc(
FOODS_IPC.to_string(),
ScanArgsIpc {
row_count: Some(RowCount {
name: "rc".into(),
offset,
}),
..Default::default()
},
)?;
let lf = LazyFrame::scan_ipc(FOODS_IPC.to_string(), Default::default())?
.with_row_count("rc", Some(offset));

assert!(row_count_at_scan(lf.clone()));
let df = lf.collect()?;
let df = lf.clone().collect()?;
let rc = df.column("rc")?;
assert_eq!(
rc.idx()?.into_no_null_iter().collect::<Vec<_>>(),
(offset..27 + offset).collect::<Vec<_>>()
);

let out = lf
.filter(col("rc").gt(lit(-1)))
.select([col("calories")])
.collect()?;
assert!(out.column("calories").is_ok());
assert_eq!(out.shape(), (27, 1));
}

Ok(())
Expand Down

0 comments on commit 82d6134

Please sign in to comment.