Skip to content

Commit

Permalink
IpcWriter with compression (#1769)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Nov 14, 2021
1 parent 95f9408 commit 80beb1c
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 14 deletions.
49 changes: 47 additions & 2 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,21 +216,40 @@ where
/// ```
pub struct IpcWriter<W> {
writer: W,
compression: Option<write::Compression>,
}

pub use write::Compression as IpcCompression;

impl<W> IpcWriter<W>
where
W: Write + Seek,
{
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}
}

impl<W> SerWriter<W> for IpcWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
IpcWriter { writer }
IpcWriter {
writer,
compression: None,
}
}

fn finish(mut self, df: &DataFrame) -> Result<()> {
let mut ipc_writer = write::FileWriter::try_new(
&mut self.writer,
&df.schema().to_arrow(),
WriteOptions { compression: None },
WriteOptions {
compression: self.compression,
},
)?;

let iter = df.iter_record_batches();
Expand All @@ -246,6 +265,7 @@ where
#[cfg(test)]
mod test {
use crate::prelude::*;
use arrow::io::ipc::write;
use polars_core::df;
use polars_core::prelude::*;
use std::io::Cursor;
Expand Down Expand Up @@ -298,4 +318,29 @@ mod test {
assert_eq!(df_read.shape(), (3, 2));
df_read.frame_equal(&expected);
}

#[test]
fn test_write_with_compression() {
let df = create_df();

let compressions = vec![
None,
Some(write::Compression::LZ4),
Some(write::Compression::ZSTD),
];

for compression in compressions.into_iter() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcWriter::new(&mut buf)
.with_compression(compression)
.finish(&df)
.expect("ipc writer");
buf.set_position(0);

let df_read = IpcReader::new(buf)
.finish()
.expect(&format!("IPC reader: {:?}", compression));
assert!(df.frame_equal(&df_read));
}
}
}
2 changes: 1 addition & 1 deletion polars/polars-io/src/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ pub struct ParquetWriter<W> {
compression: write::Compression,
}

pub use write::Compression;
pub use write::Compression as ParquetCompression;

impl<W> ParquetWriter<W>
where
Expand Down
11 changes: 9 additions & 2 deletions py-polars/polars/eager/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -739,19 +739,26 @@ def to_csv(
self._df.to_csv(file, has_headers, ord(sep))
return None

def to_ipc(self, file: Union[BinaryIO, str, Path]) -> None:
def to_ipc(
self, file: Union[BinaryIO, str, Path], compression: str = "uncompressed"
) -> None:
"""
Write to Arrow IPC binary stream, or a feather file.
Parameters
----------
file
File path to which the file should be written.
compression
Compression method. Choose one of:
- "uncompressed"
- "lz4"
- "zstd"
"""
if isinstance(file, Path):
file = str(file)

self._df.to_ipc(file)
self._df.to_ipc(file, compression)

def to_dicts(self) -> tp.List[Dict[str, Any]]:
pydf = self._df
Expand Down
26 changes: 17 additions & 9 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,17 @@ impl PyDataFrame {
}

#[cfg(feature = "ipc")]
pub fn to_ipc(&self, py_f: PyObject) -> PyResult<()> {
pub fn to_ipc(&self, py_f: PyObject, compression: &str) -> PyResult<()> {
let mut buf = get_file_like(py_f, true)?;
let compression = match compression {
"uncompressed" => None,
"lz4" => Some(IpcCompression::LZ4),
"zstd" => Some(IpcCompression::ZSTD),
s => return Err(PyPolarsEr::Other(format!("compression {} not supported", s)).into()),
};

IpcWriter::new(&mut buf)
.with_compression(compression)
.finish(&self.df)
.map_err(PyPolarsEr::from)?;
Ok(())
Expand Down Expand Up @@ -332,14 +340,14 @@ impl PyDataFrame {
let buf = get_file_like(py_f, true)?;

let compression = match compression {
"uncompressed" => Compression::Uncompressed,
"snappy" => Compression::Snappy,
"gzip" => Compression::Gzip,
"lzo" => Compression::Lzo,
"brotli" => Compression::Brotli,
"lz4" => Compression::Lz4,
"zstd" => Compression::Zstd,
s => panic!("compression {} not supported", s),
"uncompressed" => ParquetCompression::Uncompressed,
"snappy" => ParquetCompression::Snappy,
"gzip" => ParquetCompression::Gzip,
"lzo" => ParquetCompression::Lzo,
"brotli" => ParquetCompression::Brotli,
"lz4" => ParquetCompression::Lz4,
"zstd" => ParquetCompression::Zstd,
s => return Err(PyPolarsEr::Other(format!("compression {} not supported", s)).into()),
};

ParquetWriter::new(buf)
Expand Down
13 changes: 13 additions & 0 deletions py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ def test_select_columns_and_projection_from_buffer():
assert df_2.frame_equal(expected)


def test_compressed_to_ipc():
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
compressions = ["uncompressed", "lz4", "zstd"]

for compression in compressions:
f = io.BytesIO()
df.to_ipc(f, compression)
f.seek(0)

df_read = pl.read_ipc(f, use_pyarrow=False)
assert df_read.frame_equal(df)


def test_read_web_file():
url = "https://raw.githubusercontent.com/pola-rs/polars/master/examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv"
df = pl.read_csv(url)
Expand Down

0 comments on commit 80beb1c

Please sign in to comment.