Skip to content

Commit

Permalink
parquet read: fix remaining_rows counter (#3887)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 4, 2022
1 parent 78c769c commit 665edbb
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 3 deletions.
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ pub struct ParquetReader<R: Read + Seek> {
impl<R: MmapBytesReader> ParquetReader<R> {
#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
pub fn _finish_with_scan_ops(
mut self,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
Expand Down
3 changes: 2 additions & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ pub fn read_parquet<R: MmapBytesReader>(
.collect::<Result<Vec<_>>>()?
};

remaining_rows = file_metadata.row_groups[rg].num_rows() as usize;
remaining_rows =
remaining_rows.saturating_sub(file_metadata.row_groups[rg].num_rows() as usize);

let mut df = DataFrame::new_no_checks(columns);
if let Some(rc) = &row_count {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ impl ParquetExec {
.read_parallel(self.options.parallel)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.finish_with_scan_ops(
._finish_with_scan_ops(
predicate,
aggregate,
projection.as_ref().map(|v| v.as_ref()),
Expand Down
22 changes: 22 additions & 0 deletions py-polars/tests/io/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,25 @@ def test_glob_parquet(io_test_dir: str) -> None:
path = os.path.join(io_test_dir, "small*.parquet")
assert pl.read_parquet(path).shape == (3, 16)
assert pl.scan_parquet(path).collect().shape == (3, 16)


def test_chunked_round_trip() -> None:
df1 = pl.DataFrame(
{
"a": [1] * 2,
"l": [[1] for j in range(0, 2)],
}
)
df2 = pl.DataFrame(
{
"a": [2] * 3,
"l": [[2] for j in range(0, 3)],
}
)

df = df1.vstack(df2)

f = io.BytesIO()
df.write_parquet(f)
f.seek(0)
assert pl.read_parquet(f).frame_equal(df)

0 comments on commit 665edbb

Please sign in to comment.