Skip to content

Commit

Permalink
parquet: stop reading when slice is reached (#4046)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 17, 2022
1 parent 604e830 commit 34a2f2d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 3 deletions.
6 changes: 6 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,9 @@ impl DataFrame {
/// ```
#[must_use]
pub fn slice(&self, offset: i64, length: usize) -> Self {
if offset == 0 && length == self.height() {
return self.clone();
}
let col = self
.columns
.iter()
Expand All @@ -2180,6 +2183,9 @@ impl DataFrame {

#[must_use]
pub fn slice_par(&self, offset: i64, length: usize) -> Self {
if offset == 0 && length == self.height() {
return self.clone();
}
let col = POOL.install(|| {
self.columns
.par_iter()
Expand Down
11 changes: 8 additions & 3 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,11 @@ fn rg_to_dfs(
apply_aggregations(&mut df, aggregate)?;

previous_row_count += current_row_count;
dfs.push(df)
dfs.push(df);

if remaining_rows == 0 {
break;
}
}
Ok(dfs)
}
Expand Down Expand Up @@ -178,7 +182,8 @@ fn rg_to_dfs_par(
let dfs = row_groups
.into_par_iter()
.map(|(md, local_limit, row_count_start)| {
if !read_this_row_group(predicate.as_ref(), file_metadata, schema)? {
if local_limit == 0 || !read_this_row_group(predicate.as_ref(), file_metadata, schema)?
{
return Ok(None);
}
// test we don't read the parquet file if this env var is set
Expand Down Expand Up @@ -282,6 +287,6 @@ pub fn read_parquet<R: MmapBytesReader>(
} else {
let mut df = accumulate_dataframes_vertical(dfs.into_iter())?;
apply_aggregations(&mut df, aggregate)?;
Ok(df.slice(0, limit))
Ok(df.slice_par(0, limit))
}
}

0 comments on commit 34a2f2d

Please sign in to comment.