Skip to content

Commit

Permalink
made parquet writer parallel over columns
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Oct 10, 2021
1 parent acd2f8b commit 0cef635
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 27 deletions.
2 changes: 2 additions & 0 deletions polars/polars-arrow/src/trusted_len/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ use std::slice::Iter;
/// This is re-defined here and implemented for some iterators until `std::iter::TrustedLen`
/// is stabilized.
/// *Implementation from Jorge Leitao on Arrow2
/// # Safety
/// length of the iterator must be correct
pub unsafe trait TrustedLen: Iterator {}

unsafe impl<T> TrustedLen for Iter<'_, T> {}
Expand Down
79 changes: 58 additions & 21 deletions polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::prelude::*;
use crate::{PhysicalIoExpr, ScanAggregation};
use arrow::compute::cast;
use arrow::io::parquet::write::Encoding;
use arrow::datatypes::PhysicalType;
use arrow::io::parquet::write::{array_to_pages, DynIter, Encoding};
use arrow::io::parquet::{read, write};
use polars_core::prelude::*;
use rayon::prelude::*;
use std::io::{Read, Seek, Write};
use std::sync::Arc;

Expand Down Expand Up @@ -128,6 +130,8 @@ pub struct ParquetWriter<W> {
compression: write::Compression,
}

pub use write::Compression;

impl<W> ParquetWriter<W>
where
W: Write + Seek,
Expand All @@ -151,9 +155,24 @@ where

/// Write the given DataFrame in the the writer `W`.
pub fn finish(mut self, df: &DataFrame) -> Result<()> {
// temp coerce cat to utf8 until supported in https://github.com/jorgecarleitao/parquet2/issues/57 is fixed

let columns = df
.get_columns()
.iter()
.map(|s| {
if let DataType::Categorical = s.dtype() {
s.cast(&DataType::Utf8).unwrap()
} else {
s.clone()
}
})
.collect();
let df = DataFrame::new_no_checks(columns);

let mut fields = df.schema().to_arrow().fields().clone();

// datetimeis not supported by parquet and will be be truncated to Date
// date64 is not supported by parquet and will be be truncated to date32
// We coerce these to timestamp(ms)
let datetime_columns = df
.get_columns()
Expand All @@ -177,7 +196,7 @@ where
.map(|s| s.name().to_string())
.collect::<Vec<_>>();

let iter = df.iter_record_batches().map(|rb| {
let rb_iter = df.iter_record_batches().map(|rb| {
if !datetime_columns.is_empty() {
let mut columns = rb.columns().to_vec();
for i in &datetime_columns {
Expand All @@ -203,30 +222,48 @@ where
};
let schema = ArrowSchema::new(fields);
let parquet_schema = write::to_parquet_schema(&schema)?;
let encodings = schema
.fields()
.iter()
.map(|field| match field.data_type().to_physical_type() {
// delta encoding
// Not yet supported by pyarrow
// PhysicalType::LargeUtf8 => Encoding::DeltaLengthByteArray,
// dictionaries are kept dict-encoded
PhysicalType::Dictionary(_) => Encoding::RleDictionary,
// remaining is plain
_ => Encoding::Plain,
})
.collect::<Vec<_>>();

// clone is needed because parquet schema is moved into `write_file`
let parquet_schema_iter = parquet_schema.clone();
let iter = iter.map(|batch| {
let columns = batch.columns().to_vec();
Ok(write::DynIter::new(
columns
.into_iter()
.zip(parquet_schema_iter.columns().to_vec().into_iter())
.map(|(array, type_)| {
// one parquet page per array.
// we could use `array.slice()` to split it based on some number of rows.
Ok(write::DynIter::new(std::iter::once(write::array_to_page(
array.as_ref(),
type_,
options,
Encoding::Plain,
))))
}),
))
let row_group_iter = rb_iter.map(|batch| {
let columns = batch
.columns()
.par_iter()
.zip(parquet_schema_iter.columns().par_iter())
.zip(encodings.par_iter())
.map(|((array, descriptor), encoding)| {
let array = array.clone();

let pages =
array_to_pages(array, descriptor.clone(), options, *encoding).unwrap();
pages.collect::<Vec<_>>()
})
.collect::<Vec<_>>();

let out = write::DynIter::new(columns.into_iter().map(|column| {
// one parquet page per array.
// we could use `array.slice()` to split it based on some number of rows.
Ok(DynIter::new(column.into_iter()))
}));
ArrowResult::Ok(out)
});

write::write_file(
&mut self.writer,
iter,
row_group_iter,
&schema,
parquet_schema,
options,
Expand Down
13 changes: 10 additions & 3 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,7 @@ def to_parquet(
self,
file: Union[str, Path],
compression: str = "snappy",
use_pyarrow: bool = _PYARROW_AVAILABLE,
use_pyarrow: bool = False,
**kwargs: Any,
) -> None:
"""
Expand All @@ -767,7 +767,14 @@ def to_parquet(
file
File path to which the file should be written.
compression
Compression method (only supported if `use_pyarrow`).
Compression method. Choose one of:
- "uncompressed" (not supported by pyarrow)
- "snappy"
- "gzip"
- "lzo"
- "brotli"
- "lz4"
- "zstd"
use_pyarrow
Use C++ parquet implementation vs rust parquet implementation.
At the moment C++ supports more features.
Expand Down Expand Up @@ -804,7 +811,7 @@ def to_parquet(
table=tbl, where=file, compression=compression, **kwargs
)
else:
self._df.to_parquet(file)
self._df.to_parquet(file, compression)

def to_numpy(self) -> np.ndarray:
"""
Expand Down
19 changes: 16 additions & 3 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,9 +297,22 @@ impl PyDataFrame {
}

#[cfg(feature = "parquet")]
pub fn to_parquet(&self, path: &str) -> PyResult<()> {
let f = std::fs::File::create(path).expect("to open a new file");
ParquetWriter::new(f)
pub fn to_parquet(&self, py_f: PyObject, compression: &str) -> PyResult<()> {
let buf = get_file_like(py_f, true)?;

let compression = match compression {
"uncompressed" => Compression::Uncompressed,
"snappy" => Compression::Snappy,
"gzip" => Compression::Gzip,
"lzo" => Compression::Lzo,
"brotli" => Compression::Brotli,
"lz4" => Compression::Lz4,
"zstd" => Compression::Zstd,
s => panic!("compression {} not supported", s)
};

ParquetWriter::new(buf)
.with_compression(compression)
.finish(&self.df)
.map_err(PyPolarsEr::from)?;
Ok(())
Expand Down

0 comments on commit 0cef635

Please sign in to comment.