Skip to content

Commit

Permalink
do not write empty chunk to parquet (#3259)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 29, 2022
1 parent 4cdbc37 commit e093f90
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 26 deletions.
61 changes: 35 additions & 26 deletions polars/polars-io/src/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,33 +104,42 @@ where
})
.collect::<Vec<_>>();

let row_group_iter = rb_iter.map(|batch| {
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let encoded_pages = array_to_pages(
array.as_ref(),
descriptor.descriptor.clone(),
options,
*encoding,
)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()
let row_group_iter = rb_iter.filter_map(|batch| match batch.len() {
0 => None,
_ => {
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let encoded_pages = array_to_pages(
array.as_ref(),
descriptor.descriptor.clone(),
options,
*encoding,
)?;
encoded_pages
.map(|page| {
compress(page?, vec![], options.compression).map_err(|x| x.into())
})
.collect::<ArrowResult<VecDeque<_>>>()
})
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()
.map(Some)
.transpose();

columns.map(|columns| {
columns.map(|columns| {
let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
(row_group, batch.columns()[0].len())
})
})
.collect::<ArrowResult<Vec<VecDeque<CompressedPage>>>>()?;

let row_group = DynIter::new(
columns
.into_iter()
.map(|column| Ok(DynStreamingIterator::new(Bla::new(column)))),
);
ArrowResult::Ok((row_group, batch.columns()[0].len()))
}
});

let mut writer = FileWriter::try_new(&mut self.writer, schema, options)?;
Expand Down
3 changes: 3 additions & 0 deletions polars/tests/it/io/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
mod csv;
#[cfg(feature = "parquet")]
mod parquet;

use polars::prelude::*;

pub(crate) fn create_df() -> DataFrame {
Expand Down
19 changes: 19 additions & 0 deletions polars/tests/it/io/parquet.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use polars::prelude::*;
use std::io::Cursor;

#[test]
fn test_vstack_empty_3220() -> Result<()> {
let df1 = df! {
"a" => ["1", "2"],
"b" => [1, 2]
}?;
let empty_df = df1.head(Some(0));
let mut stacked = df1.clone();
stacked.vstack_mut(&empty_df)?;
stacked.vstack_mut(&df1)?;
let mut buf = Cursor::new(Vec::new());
ParquetWriter::new(&mut buf).finish(&mut stacked)?;
let read_df = ParquetReader::new(buf).finish()?;
assert!(stacked.frame_equal(&read_df));
Ok(())
}

0 comments on commit e093f90

Please sign in to comment.