Skip to content

Commit

Permalink
Added with_projection and with_columns for AvroReader (#2825)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Mar 4, 2022
1 parent e1609c8 commit 5a5991d
Show file tree
Hide file tree
Showing 4 changed files with 133 additions and 56 deletions.
106 changes: 88 additions & 18 deletions polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ pub struct AvroReader<R> {
reader: R,
rechunk: bool,
n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
}

impl<R: Read + Seek> AvroReader<R> {
Expand All @@ -48,6 +50,19 @@ impl<R: Read + Seek> AvroReader<R> {
self.n_rows = num_rows;
self
}

/// Set the reader's column projection. This counts from 0, meaning that
/// `vec![0, 4]` would select the 1st and 5th column.
pub fn with_projection(mut self, projection: Option<Vec<usize>>) -> Self {
self.projection = projection;
self
}

/// Columns to select/ project
pub fn with_columns(mut self, columns: Option<Vec<String>>) -> Self {
self.columns = columns;
self
}
}

impl<R> ArrowReader for read::Reader<R>
Expand All @@ -68,6 +83,8 @@ where
reader,
rechunk: true,
n_rows: None,
columns: None,
projection: None,
}
}

Expand All @@ -80,17 +97,32 @@ where
let rechunk = self.rechunk;
let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut self.reader)?;

if let Some(columns) = self.columns {
self.projection = Some(columns_to_projection(columns, &schema)?);
}

let prj = if let Some(projection) = self.projection {
let mut prj = vec![false; avro_schema.len()];
for index in projection {
prj[index] = true;
}

Some(prj)
} else {
None
};

let avro_reader = read::Reader::new(
read::Decompressor::new(
read::BlockStreamIterator::new(&mut self.reader, file_marker),
codec,
),
avro_schema,
schema.clone().fields,
None,
prj,
);

finish_reader(avro_reader, rechunk, None, None, None, &schema, None)
finish_reader(avro_reader, rechunk, self.n_rows, None, None, &schema, None)
}
}

Expand Down Expand Up @@ -176,22 +208,6 @@ mod test {
use polars_core::df;
use polars_core::prelude::*;
use std::io::Cursor;
#[test]
fn write_and_read_avro() -> Result<()> {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut write_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2],
"utf8" => &["a", "b"]
)?;

AvroWriter::new(&mut buf).finish(&mut write_df)?;
buf.set_position(0);

let read_df = AvroReader::new(buf).finish()?;
assert!(write_df.frame_equal(&read_df));
Ok(())
}

#[test]
fn test_write_and_read_with_compression() -> Result<()> {
Expand Down Expand Up @@ -221,4 +237,58 @@ mod test {

Ok(())
}

#[test]
fn test_with_projection() -> Result<()> {
let mut df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2],
"utf8" => &["a", "b"]
)?;

let expected_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2]
)?;

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());

AvroWriter::new(&mut buf).finish(&mut df)?;
buf.set_position(0);

let read_df = AvroReader::new(buf)
.with_projection(Some(vec![0, 1]))
.finish()?;

assert!(expected_df.frame_equal(&read_df));

Ok(())
}

#[test]
fn test_with_columns() -> Result<()> {
let mut df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2],
"utf8" => &["a", "b"]
)?;

let expected_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2]
)?;

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());

AvroWriter::new(&mut buf).finish(&mut df)?;
buf.set_position(0);

let read_df = AvroReader::new(buf)
.with_columns(Some(vec!["i64".to_string(), "f64".to_string()]))
.finish()?;

assert!(expected_df.frame_equal(&read_df));

Ok(())
}
}
32 changes: 2 additions & 30 deletions polars/polars-io/src/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
use super::{finish_reader, ArrowReader, ArrowResult};
use crate::predicates::PhysicalIoExpr;
use crate::prelude::*;
use ahash::AHashMap;
use arrow::io::ipc::write::WriteOptions;
use arrow::io::ipc::{read, write};
use polars_core::prelude::*;
Expand Down Expand Up @@ -177,35 +176,8 @@ where
let metadata = read::read_file_metadata(&mut self.reader)?;
let schema = &metadata.schema;

let err = |column: &str| {
let valid_fields: Vec<String> = schema.fields.iter().map(|f| f.name.clone()).collect();
PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
column, valid_fields
))
};

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
if cols.len() > 100 {
let mut column_names = AHashMap::with_capacity(schema.fields.len());
schema.fields.iter().enumerate().for_each(|(i, c)| {
column_names.insert(c.name.as_str(), i);
});

for column in cols.iter() {
if let Some(i) = column_names.get(column.as_str()) {
prj.push(*i);
} else {
return Err(err(column));
}
}
} else {
for column in cols.iter() {
let i = schema.try_index_of(column)?;
prj.push(i);
}
}
if let Some(columns) = self.columns {
let mut prj = columns_to_projection(columns, schema)?;

// Ipc reader panics if the projection is not in increasing order, so sorting is the safer way.
prj.sort_unstable();
Expand Down
8 changes: 1 addition & 7 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
let schema = read::schema::infer_schema(&metadata)?;

if let Some(cols) = self.columns {
let mut prj = Vec::with_capacity(cols.len());
for col in cols.iter() {
let i = schema.try_index_of(col)?;
prj.push(i);
}

self.projection = Some(prj);
self.projection = Some(columns_to_projection(cols, &schema)?);
}

read_parquet(
Expand Down
43 changes: 42 additions & 1 deletion polars/polars-io/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
#[cfg(any(feature = "ipc", feature = "parquet"))]
#[cfg(any(feature = "ipc", feature = "parquet", feature = "avro"))]
use crate::ArrowSchema;
use dirs::home_dir;
use polars_core::frame::DataFrame;
#[cfg(any(feature = "ipc", feature = "avro", feature = "parquet"))]
use polars_core::prelude::*;
use std::path::{Path, PathBuf};

// used by python polars
Expand All @@ -26,6 +28,45 @@ pub(crate) fn apply_projection(schema: &ArrowSchema, projection: &[usize]) -> Ar
ArrowSchema::from(fields)
}

#[cfg(any(feature = "ipc", feature = "avro", feature = "parquet"))]
pub(crate) fn columns_to_projection(
columns: Vec<String>,
schema: &ArrowSchema,
) -> Result<Vec<usize>> {
use ahash::AHashMap;

let err = |column: &str| {
let valid_fields: Vec<String> = schema.fields.iter().map(|f| f.name.clone()).collect();
PolarsError::NotFound(format!(
"Unable to get field named \"{}\". Valid fields: {:?}",
column, valid_fields
))
};

let mut prj = Vec::with_capacity(columns.len());
if columns.len() > 100 {
let mut column_names = AHashMap::with_capacity(schema.fields.len());
schema.fields.iter().enumerate().for_each(|(i, c)| {
column_names.insert(c.name.as_str(), i);
});

for column in columns.iter() {
if let Some(&i) = column_names.get(column.as_str()) {
prj.push(i)
} else {
return Err(err(column));
}
}
} else {
for column in columns.iter() {
let i = schema.try_index_of(column)?;
prj.push(i);
}
}

Ok(prj)
}

/// Because of threading every row starts from `0` or from `offset`.
/// We must correct that so that they are monotonically increasing.
pub(crate) fn update_row_counts(dfs: &mut [(DataFrame, u32)]) {
Expand Down

0 comments on commit 5a5991d

Please sign in to comment.