Skip to content

Commit

Permalink
Exported the select columns feature of AvroReader to py-polars (#2865)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Mar 10, 2022
1 parent 5ed1c2d commit 36f2c62
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 14 deletions.
25 changes: 16 additions & 9 deletions polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,15 +101,15 @@ where
self.projection = Some(columns_to_projection(columns, &schema)?);
}

let prj = if let Some(projection) = self.projection {
let (prj, arrow_schema) = if let Some(projection) = self.projection {
let mut prj = vec![false; avro_schema.len()];
for index in projection {
for &index in projection.iter() {
prj[index] = true;
}

Some(prj)
(Some(prj), apply_projection(&schema, &projection))
} else {
None
(None, schema.clone())
};

let avro_reader = read::Reader::new(
Expand All @@ -118,11 +118,19 @@ where
codec,
),
avro_schema,
schema.clone().fields,
schema.fields,
prj,
);

finish_reader(avro_reader, rechunk, self.n_rows, None, None, &schema, None)
finish_reader(
avro_reader,
rechunk,
self.n_rows,
None,
None,
&arrow_schema,
None,
)
}
}

Expand All @@ -144,7 +152,6 @@ pub use write::Compression as AvroCompression;
/// AvroWriter::new(&mut file)
/// .finish(df)
/// }
///
/// ```
#[must_use]
pub struct AvroWriter<W> {
Expand Down Expand Up @@ -275,7 +282,7 @@ mod test {

let expected_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2]
"utf8" => &["a", "b"]
)?;

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
Expand All @@ -284,7 +291,7 @@ mod test {
buf.set_position(0);

let read_df = AvroReader::new(buf)
.with_columns(Some(vec!["i64".to_string(), "f64".to_string()]))
.with_columns(Some(vec!["i64".to_string(), "utf8".to_string()]))
.finish()?;

assert!(expected_df.frame_equal(&read_df));
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pub fn resolve_homedir(path: &Path) -> PathBuf {
path.into()
}

#[cfg(any(feature = "ipc", feature = "parquet"))]
#[cfg(any(feature = "ipc", feature = "parquet", feature = "avro"))]
pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> ArrowSchema {
let fields = &schema.fields;
let fields = projection
Expand Down
4 changes: 3 additions & 1 deletion py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,7 @@ def _read_parquet(
def _read_avro(
cls: Type[DF],
file: Union[str, BinaryIO],
columns: Optional[Union[List[int], List[str]]] = None,
n_rows: Optional[int] = None,
) -> DF:
"""
Expand All @@ -584,8 +585,9 @@ def _read_avro(
-------
DataFrame
"""
projection, columns = handle_projection_columns(columns)
self = cls.__new__(cls)
self._df = PyDataFrame.read_avro(file, n_rows)
self._df = PyDataFrame.read_avro(file, columns, projection, n_rows)
return self

@classmethod
Expand Down
11 changes: 9 additions & 2 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,10 @@ def read_ipc_schema(


def read_avro(
file: Union[str, Path, BytesIO, BinaryIO], n_rows: Optional[int] = None
file: Union[str, Path, BytesIO, BinaryIO],
columns: Optional[Union[List[int], List[str]]] = None,
n_rows: Optional[int] = None,
**kwargs: Any,
) -> DataFrame:
"""
Read into a DataFrame from Apache Avro format.
Expand All @@ -706,6 +709,8 @@ def read_avro(
----------
file
Path to a file or a file-like object.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list of column names.
n_rows
Stop reading from Apache Avro file after reading ``n_rows``.
Expand All @@ -715,8 +720,10 @@ def read_avro(
"""
if isinstance(file, Path):
file = str(file)
if columns is None:
columns = kwargs.pop("projection", None)

return DataFrame._read_avro(file, n_rows=n_rows)
return DataFrame._read_avro(file, n_rows=n_rows, columns=columns)


def read_ipc(
Expand Down
9 changes: 8 additions & 1 deletion py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,18 @@ impl PyDataFrame {

#[staticmethod]
#[cfg(feature = "avro")]
pub fn read_avro(py_f: PyObject, n_rows: Option<usize>) -> PyResult<Self> {
pub fn read_avro(
py_f: PyObject,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
n_rows: Option<usize>,
) -> PyResult<Self> {
use polars::io::avro::AvroReader;

let file = get_file_like(py_f, false)?;
let df = AvroReader::new(file)
.with_projection(projection)
.with_columns(columns)
.with_n_rows(n_rows)
.finish()
.map_err(PyPolarsErr::from)?;
Expand Down
24 changes: 24 additions & 0 deletions py-polars/tests/io/test_avro.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,27 @@ def test_from_to_file(
example_df.to_avro(f, compression=compression) # type: ignore
df_read = pl.read_avro(str(f))
assert example_df.frame_equal(df_read)


def test_select_columns() -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})

f = io.BytesIO()
df.to_avro(f)
f.seek(0)

read_df = pl.read_avro(f, columns=["b", "c"])
assert expected.frame_equal(read_df)


def test_select_projection() -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})

f = io.BytesIO()
df.to_avro(f)
f.seek(0)

read_df = pl.read_avro(f, columns=[1, 2])
assert expected.frame_equal(read_df)

0 comments on commit 36f2c62

Please sign in to comment.