Skip to content

Commit

Permalink
Exported AvroWriter to py-polars (#2628)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Feb 13, 2022
1 parent 1bae0a8 commit ff68696
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 4 deletions.
8 changes: 5 additions & 3 deletions polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::io::{Read, Seek, Write};

use super::{finish_reader, ArrowChunk, ArrowReader, ArrowResult};
use crate::prelude::*;
use lazy_static::__Deref;
use polars_core::prelude::*;
use std::ops::Deref;

use arrow::io::avro::{read, write};

Expand Down Expand Up @@ -94,6 +94,8 @@ where
}
}

pub use write::Compression as AvroCompression;

/// Write a DataFrame to Appache Avro format
///
/// # Example
Expand Down Expand Up @@ -187,7 +189,7 @@ mod test {
buf.set_position(0);

let read_df = AvroReader::new(buf).finish()?;
assert_eq!(write_df, read_df);
assert!(write_df.frame_equal(&read_df));
Ok(())
}

Expand All @@ -214,7 +216,7 @@ mod test {
buf.set_position(0);

let read_df = AvroReader::new(buf).finish()?;
assert_eq!(write_df, read_df);
assert!(write_df.frame_equal(&read_df));
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/dataframe.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ Conversion
:toctree: api/

DataFrame.to_arrow
DataFrame.to_avro
DataFrame.to_json
DataFrame.to_pandas
DataFrame.to_csv
Expand Down
23 changes: 23 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,29 @@ def to_csv(
self._df.to_csv(file, has_header, ord(sep))
return None

def to_avro(
self,
file: Union[BinaryIO, BytesIO, str, Path],
compression: Literal["uncompressed", "snappy", "deflate"] = "uncompressed",
) -> None:
"""
Write to Appache Avro file.
Parameters
----------
file
File path to which the file should be written.
compression
Compression method. Choose one of:
- "uncompressed"
- "snappy"
- "deflate"
"""
if isinstance(file, Path):
file = str(file)

self._df.to_avro(file, compression)

def to_ipc(
self,
file: Union[BinaryIO, BytesIO, str, Path],
Expand Down
7 changes: 6 additions & 1 deletion py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,9 @@ def read_ipc_schema(
return _ipc_schema(file)


def read_avro(file: Union[str, BinaryIO], n_rows: Optional[int] = None) -> DataFrame:
def read_avro(
file: Union[str, Path, BytesIO, BinaryIO], n_rows: Optional[int] = None
) -> DataFrame:
"""
Read into a DataFrame from Appache Avro format.
Expand All @@ -711,6 +713,9 @@ def read_avro(file: Union[str, BinaryIO], n_rows: Optional[int] = None) -> DataF
-------
DataFrame
"""
if isinstance(file, Path):
file = str(file)

return DataFrame._read_avro(file, n_rows=n_rows)


Expand Down
19 changes: 19 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,25 @@ impl PyDataFrame {
Ok(PyDataFrame::new(df))
}

#[cfg(feature = "avro")]
pub fn to_avro(&mut self, py_f: PyObject, compression: &str) -> PyResult<()> {
use polars::io::avro::{AvroCompression, AvroWriter};
let compression = match compression {
"uncompressed" => None,
"snappy" => Some(AvroCompression::Snappy),
"deflate" => Some(AvroCompression::Deflate),
s => return Err(PyPolarsEr::Other(format!("compression {} not supported", s)).into()),
};

let mut buf = get_file_like(py_f, false)?;
AvroWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut self.df)
.map_err(PyPolarsEr::from)?;

Ok(())
}

#[staticmethod]
#[cfg(feature = "json")]
pub fn read_json(json: &str) -> PyResult<Self> {
Expand Down
13 changes: 13 additions & 0 deletions py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,19 @@ def test_read_avro() -> None:
assert df_read.frame_equal(expected)


def test_compressed_to_avro() -> None:
df = pl.DataFrame({"i64": [1, 2], "f64": [0.1, 0.2], "utf8": ["a", "b"]})
compressions = ["uncompressed", "snappy", "deflate"]

for compression in compressions:
f = io.BytesIO()
df.to_avro(f, compression) # type: ignore
f.seek(0)

df_read = pl.read_avro(f)
assert df_read.frame_equal(df)


def test_read_web_file() -> None:
url = "https://raw.githubusercontent.com/pola-rs/polars/master/examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv"
df = pl.read_csv(url)
Expand Down

0 comments on commit ff68696

Please sign in to comment.