Skip to content

Commit

Permalink
Exporting Avro Reader to py-polars (#2597)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Feb 10, 2022
1 parent 3d43eb7 commit 9d721da
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 2 deletions.
3 changes: 3 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ json = ["polars-io", "polars-io/json"]
# support for arrows ipc file parsing
ipc = ["polars-io", "polars-io/ipc", "polars-lazy/ipc"]

# support for appache avro file parsing
avro = ["polars-io", "polars-io/avro"]

# support for arrows csv file parsing
csv-file = ["polars-io", "polars-io/csv-file", "polars-lazy/csv-file"]

Expand Down
18 changes: 17 additions & 1 deletion polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,22 @@ use polars_core::prelude::*;

use arrow::io::avro::read;

/// Read Appache Avro format into a DataFrame
///
/// # Example
/// ```
/// use std::fs::File;
/// use polars_core::prelude::*;
/// use polars_io::avro::AvroReader;
/// use polars_io::SerReader;
///
/// fn example() -> Result<DataFrame> {
/// let file = File::open("file.avro").expect("file not found");
///
/// AvroReader::new(file)
/// .finish()
/// }
/// ```
#[must_use]
pub struct AvroReader<R> {
reader: R,
Expand All @@ -20,7 +36,7 @@ impl<R: Read + Seek> AvroReader<R> {
Ok(schema.into())
}

/// Get arrow schema of the Ipc File, this is faster than a polars schema.
/// Get arrow schema of the avro File, this is faster than a polars schema.
pub fn arrow_schema(&mut self) -> Result<ArrowSchema> {
let (_, schema, _, _) = read::read_metadata(&mut self.reader)?;
Ok(schema)
Expand Down
61 changes: 61 additions & 0 deletions py-polars/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,13 @@ pyo3 = { git = "https://github.com/ghuls/pyo3", branch = "polars_pypy_hasattr" }

# features are only there to enable building a slim binary for the benchmark in CI
[features]
avro = ["polars/avro"]
parquet = ["polars/parquet"]
ipc = ["polars/ipc"]
is_in = ["polars/is_in"]
json = ["polars/serde", "serde_json"]

default = ["json", "parquet", "ipc", "is_in", "json", "polars/repeat_by"]
default = ["json", "parquet", "ipc", "avro", "is_in", "json", "polars/repeat_by"]

[dependencies.polars]
path = "../polars"
Expand Down
1 change: 1 addition & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def version() -> str:
)
from polars.internals.whenthen import when
from polars.io import (
read_avro,
read_csv,
read_ipc,
read_ipc_schema,
Expand Down
22 changes: 22 additions & 0 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,28 @@ def _read_parquet(
)
return self

@staticmethod
def _read_avro(
file: Union[str, BinaryIO], n_rows: Optional[int] = None
) -> "DataFrame":
"""
Read into a DataFrame from Appache Avro format.
Parameters
----------
file
Path to a file or a file like object.
n_rows
Stop reading from Appache Avro file after reading ``n_rows``.
Returns
-------
DataFrame
"""
self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_avro(file, n_rows)
return self

@staticmethod
def _read_ipc(
file: Union[str, BinaryIO],
Expand Down
18 changes: 18 additions & 0 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,24 @@ def read_ipc_schema(
return _ipc_schema(file)


def read_avro(file: Union[str, BinaryIO], n_rows: Optional[int] = None) -> DataFrame:
"""
Read into a DataFrame from Appache Avro format.
Parameters
----------
file
Path to a file or a file like object.
n_rows
Stop reading from Appache Avro file after reading ``n_rows``.
Returns
-------
DataFrame
"""
return DataFrame._read_avro(file, n_rows=n_rows)


def read_ipc(
file: Union[str, BinaryIO, BytesIO, Path, bytes],
columns: Optional[Union[List[int], List[str]]] = None,
Expand Down
13 changes: 13 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,19 @@ impl PyDataFrame {
Ok(PyDataFrame::new(df))
}

#[staticmethod]
#[cfg(feature = "avro")]
pub fn read_avro(py_f: PyObject, n_rows: Option<usize>) -> PyResult<Self> {
use polars::io::avro::AvroReader;

let file = get_file_like(py_f, false)?;
let df = AvroReader::new(file)
.with_n_rows(n_rows)
.finish()
.map_err(PyPolarsEr::from)?;
Ok(PyDataFrame::new(df))
}

#[staticmethod]
#[cfg(feature = "json")]
pub fn read_json(json: &str) -> PyResult<Self> {
Expand Down
Binary file added py-polars/tests/files/small.avro
Binary file not shown.
7 changes: 7 additions & 0 deletions py-polars/tests/test_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ def test_compressed_to_ipc() -> None:
assert df_read.frame_equal(df)


def test_read_avro() -> None:
small_avro = Path(__file__).parent / "files" / "small.avro"
expected = pl.DataFrame({"i64": [1, 2], "f64": [0.1, 0.2], "utf8": ["a", "b"]})
df_read = pl.read_avro(str(small_avro))
assert df_read.frame_equal(expected)


def test_read_web_file() -> None:
url = "https://raw.githubusercontent.com/pola-rs/polars/master/examples/aggregate_multiple_files_in_chunks/datasets/foods1.csv"
df = pl.read_csv(url)
Expand Down

0 comments on commit 9d721da

Please sign in to comment.