Skip to content

Commit

Permalink
Added Support AvroWriter (#2609)
Browse files Browse the repository at this point in the history
  • Loading branch information
illumination-k committed Feb 11, 2022
1 parent 1e8480f commit 445880f
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 52 deletions.
2 changes: 1 addition & 1 deletion polars/polars-arrow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ description = "Arrow interfaces for Polars DataFrame library"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "2c4dbb21daa4f258fd707a3f3c2cbe5abb374f6b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "48761acf30052eb2e3b2bc1688fadd95cb8edab5", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false, features = ["compute_concatenate"] }
hashbrown = "0.12"
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ unsafe_unwrap = "^0.1.0"
package = "arrow2"
git = "https://github.com/jorgecarleitao/arrow2"
# git = "https://github.com/ritchie46/arrow2"
rev = "2c4dbb21daa4f258fd707a3f3c2cbe5abb374f6b"
rev = "48761acf30052eb2e3b2bc1688fadd95cb8edab5"
# branch = "cherry_pick"
# version = "0.9"
default-features = false
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private = []
[dependencies]
ahash = "0.7"
anyhow = "1.0"
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "2c4dbb21daa4f258fd707a3f3c2cbe5abb374f6b", default-features = false }
arrow = { package = "arrow2", git = "https://github.com/jorgecarleitao/arrow2", rev = "48761acf30052eb2e3b2bc1688fadd95cb8edab5", default-features = false }
# arrow = { package = "arrow2", git = "https://github.com/ritchie46/arrow2", branch = "cherry_pick", default-features = false }
# arrow = { package = "arrow2", version = "0.9", default-features = false }
csv-core = { version = "0.1.10", optional = true }
Expand Down
159 changes: 110 additions & 49 deletions polars/polars-io/src/avro.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use std::io::{Read, Seek};
use std::io::{Read, Seek, Write};

use super::{finish_reader, ArrowChunk, ArrowReader, ArrowResult};
use crate::prelude::*;
use lazy_static::__Deref;
use polars_core::prelude::*;

use arrow::io::avro::read;
use arrow::io::avro::{read, write};

/// Read Appache Avro format into a DataFrame
///
Expand Down Expand Up @@ -93,69 +94,129 @@ where
}
}

#[cfg(test)]
mod test {
use crate::avro::AvroReader;
use crate::SerReader;
use arrow::array::Array;
use polars_core::df;
use polars_core::prelude::*;
use std::io::Cursor;
/// Write a DataFrame to Appache Avro format
///
/// # Example
///
/// ```
/// use polars_core::prelude::*;
/// use polars_io::avro::AvroWriter;
/// use std::fs::File;
/// use polars_io::SerWriter;
///
/// fn example(df: &mut DataFrame) -> Result<()> {
/// let mut file = File::create("file.avro").expect("could not create file");
///
/// AvroWriter::new(&mut file)
/// .finish(df)
/// }
///
/// ```
#[must_use]
pub struct AvroWriter<W> {
writer: W,
compression: Option<write::Compression>,
}

fn write_avro(buf: &mut Cursor<Vec<u8>>) {
use arrow::array::{Float64Array, Int64Array, Utf8Array};
use arrow::datatypes::{Field, Schema};
use arrow::io::avro::write;

let i64_array = Int64Array::from(&[Some(1), Some(2)]);
let f64_array = Float64Array::from(&[Some(0.1), Some(0.2)]);
let utf8_array = Utf8Array::<i32>::from(&[Some("a"), Some("b")]);
let i64_field = Field::new("i64", i64_array.data_type().clone(), true);
let f64_field = Field::new("f64", f64_array.data_type().clone(), true);
let utf8_field = Field::new("utf8", utf8_array.data_type().clone(), true);
let schema = Schema::from(vec![i64_field, f64_field, utf8_field]);
let arrays = vec![
&i64_array as &dyn Array,
&f64_array as &dyn Array,
&utf8_array as &dyn Array,
];
let avro_fields = write::to_avro_schema(&schema).unwrap();
impl<W> AvroWriter<W>
where
W: Write,
{
/// Set the compression used. Defaults to None.
pub fn with_compression(mut self, compression: Option<write::Compression>) -> Self {
self.compression = compression;
self
}
}

let mut serializers = arrays
.iter()
.zip(avro_fields.iter())
.map(|(array, field)| write::new_serializer(*array, &field.schema))
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);
impl<W> SerWriter<W> for AvroWriter<W>
where
W: Write,
{
fn new(writer: W) -> Self {
Self {
writer,
compression: None,
}
}

write::serialize(&mut serializers, &mut block);
fn finish(mut self, df: &mut DataFrame) -> Result<()> {
let schema = df.schema().to_arrow();
let avro_fields = write::to_avro_schema(&schema)?;

let mut compressed_block = write::CompressedBlock::default();
for chunk in df.iter_chunks() {
let mut serializers = chunk
.iter()
.zip(avro_fields.iter())
.map(|(array, field)| write::new_serializer(array.deref(), &field.schema))
.collect::<Vec<_>>();

let _was_compressed = write::compress(&mut block, &mut compressed_block, None).unwrap();
let mut block = write::Block::new(chunk.len(), vec![]);
let mut compressed_block = write::CompressedBlock::default();

write::write_metadata(buf, avro_fields.clone(), None).unwrap();
write::serialize(&mut serializers, &mut block);
let _was_compressed =
write::compress(&mut block, &mut compressed_block, self.compression)?;

write::write_block(buf, &compressed_block).unwrap();
write::write_metadata(&mut self.writer, avro_fields.clone(), self.compression)?;

write::write_block(&mut self.writer, &compressed_block)?;
}

Ok(())
}
}

#[cfg(test)]
mod test {
use super::{write, AvroReader, AvroWriter};
use crate::prelude::*;
use polars_core::df;
use polars_core::prelude::*;
use std::io::Cursor;
#[test]
fn write_and_read_avro_naive() {
fn write_and_read_avro() -> Result<()> {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
write_avro(&mut buf);
let mut write_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2],
"utf8" => &["a", "b"]
)?;

AvroWriter::new(&mut buf).finish(&mut write_df)?;
buf.set_position(0);

let df = AvroReader::new(buf).finish();
assert!(df.is_ok());
let df = df.unwrap();
let read_df = AvroReader::new(buf).finish()?;
assert_eq!(write_df, read_df);
Ok(())
}

let expected = df!(
#[test]
fn test_write_and_read_with_compression() -> Result<()> {
let mut write_df = df!(
"i64" => &[1, 2],
"f64" => &[0.1, 0.2],
"utf8" => &["a", "b"]
)
.unwrap();
assert_eq!(df.shape(), expected.shape());
assert!(df.frame_equal(&expected));
)?;

let compressions = vec![
None,
Some(write::Compression::Deflate),
Some(write::Compression::Snappy),
];

for compression in compressions.into_iter() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());

AvroWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut write_df)?;
buf.set_position(0);

let read_df = AvroReader::new(buf).finish()?;
assert_eq!(write_df, read_df);
}

Ok(())
}
}

0 comments on commit 445880f

Please sign in to comment.