Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated Avro code (#1199)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 4, 2022
1 parent 35b65c4 commit 497d431
Show file tree
Hide file tree
Showing 31 changed files with 192 additions and 982 deletions.
32 changes: 14 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ num-traits = "0.2"
dyn-clone = "1"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }

# We need to Hash values before sending them to an hasher. This
# crate provides HashMap that assumes pre-hashed values.
hash_hasher = "^2.0.3"
# For SIMD utf8 validation
simdutf8 = "0.1.3"
# faster hashing
ahash = { version = "0.7" }

# for timezone support
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }

# for csv io
csv = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -65,20 +70,14 @@ base64 = { version = "0.13.0", optional = true }
# to write to parquet as a stream
futures = { version = "0.3", optional = true }

# for faster hashing
ahash = { version = "0.7" }
# to read IPC as a stream
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.14.0", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }
avro-schema = { version = "0.3", optional = true }

# ORC support
orc-format = { version = "0.3.0", optional = true }
Expand Down Expand Up @@ -163,14 +162,11 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]

io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator"]
io_avro_compression = [
"libflate",
"snap",
"crc",
"avro-schema/compression",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
io_avro_async = ["avro-schema/async"]

io_orc = [ "orc-format" ]

Expand Down
14 changes: 4 additions & 10 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use avro_rs::types::Record;
use criterion::*;

use arrow2::error::Result;
use arrow2::io::avro::avro_schema::read::read_metadata;
use arrow2::io::avro::read;
use avro_rs::*;
use avro_rs::{Codec, Schema as AvroSchema};
Expand Down Expand Up @@ -42,17 +43,10 @@ fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
let mut file = Cursor::new(buffer);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?;
let metadata = read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata.record)?;

let reader = read::Reader::new(
read::Decompressor::new(
read::BlockStreamIterator::new(&mut file, file_marker),
codec,
),
avro_schema,
schema.fields,
None,
);
let reader = read::Reader::new(file, metadata, schema.fields, None);

let mut rows = 0;
for maybe_batch in reader {
Expand Down
10 changes: 6 additions & 4 deletions examples/avro_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use arrow2::{
datatypes::{DataType, Field},
error::Error,
io::avro,
io::avro::avro_schema::file::Block,
io::avro::avro_schema::schema::{Field as AvroField, Schema},
};

fn read_schema_id<R: std::io::Read>(reader: &mut R) -> Result<u32, Error> {
Expand All @@ -16,7 +18,7 @@ fn read_schema_id<R: std::io::Read>(reader: &mut R) -> Result<u32, Error> {
Ok(u32::from_be_bytes(header[1..].try_into().unwrap()))
}

fn read_block<R: std::io::Read>(reader: &mut R, block: &mut avro::Block) -> Result<(), Error> {
fn read_block<R: std::io::Read>(reader: &mut R, block: &mut Block) -> Result<(), Error> {
// (we could lump multiple records together by extending the block instead of clearing)
block.data.clear();
reader.read_to_end(&mut block.data)?;
Expand All @@ -34,8 +36,8 @@ fn main() -> Result<(), Error> {

// say that from the registry we concluded that this schema has fields
let avro_fields = vec![
avro_schema::Schema::String(None),
avro_schema::Schema::String(None),
AvroField::new("first_name", Schema::String(None)),
AvroField::new("last_name", Schema::String(None)),
];
// (which we map to arrow fields as)
let fields = vec![
Expand All @@ -44,7 +46,7 @@ fn main() -> Result<(), Error> {
];

// the below allow us to read it to arrow (a chunk of a single element)
let mut block = avro::Block::default();
let mut block = Block::default();
read_block(&mut stream, &mut block)?;

let chunk = avro::read::deserialize(&block, &fields, &avro_fields, &[true, true])?;
Expand Down
14 changes: 6 additions & 8 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fs::File;
use std::io::BufReader;

use arrow2::error::Result;
use arrow2::io::avro::avro_schema;
use arrow2::io::avro::read;

fn main() -> Result<()> {
Expand All @@ -12,16 +13,13 @@ fn main() -> Result<()> {

let file = &mut BufReader::new(File::open(path)?);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;
let metadata = avro_schema::read::read_metadata(file)?;

println!("{:#?}", avro_schema);
let schema = read::infer_schema(&metadata.record)?;

let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
schema.fields,
None,
);
println!("{:#?}", metadata);

let reader = read::Reader::new(file, metadata, schema.fields, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
27 changes: 18 additions & 9 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::avro::read::{decompress_block, deserialize};
use arrow2::io::avro::read_async::*;
use arrow2::io::avro::avro_schema::file::Block;
use arrow2::io::avro::avro_schema::read_async::{block_stream, decompress_block, read_metadata};
use arrow2::io::avro::read::{deserialize, infer_schema};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
Expand All @@ -16,22 +19,28 @@ async fn main() -> Result<()> {

let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Box::new(avro_schemas);
let projection = Box::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());
let metadata = read_metadata(&mut reader).await?;
let schema = infer_schema(&metadata.record)?;
let metadata = Arc::new(metadata);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;
let blocks = block_stream(&mut reader, metadata.marker).await;

pin_mut!(blocks);
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let metadata = metadata.clone();
let projection = projection.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
decompress_block(&mut block, &mut decompressed, metadata.compression)?;
deserialize(
&decompressed,
&schema.fields,
&metadata.record.fields,
&projection,
)
});
let chunk = handle.await.unwrap()?;
assert!(!chunk.is_empty());
Expand Down
18 changes: 10 additions & 8 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,35 @@ use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::avro_schema,
io::avro::write,
};

fn write_avro<W: std::io::Write>(
file: &mut W,
arrays: &[&dyn Array],
schema: &Schema,
compression: Option<write::Compression>,
compression: Option<avro_schema::file::Compression>,
) -> Result<()> {
let avro_fields = write::to_avro_schema(schema)?;
let record = write::to_record(schema)?;

let mut serializers = arrays
.iter()
.zip(avro_fields.iter())
.zip(record.fields.iter())
.map(|(array, field)| write::new_serializer(*array, &field.schema))
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);
let mut block = avro_schema::file::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();
let mut compressed_block = avro_schema::file::CompressedBlock::default();

let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?;
let _was_compressed =
avro_schema::write::compress(&mut block, &mut compressed_block, compression)?;

write::write_metadata(file, avro_fields.clone(), compression)?;
avro_schema::write::write_metadata(file, record, compression)?;

write::write_block(file, &compressed_block)?;
avro_schema::write::write_block(file, &compressed_block)?;

Ok(())
}
Expand Down
107 changes: 9 additions & 98 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
//! Read and write from and to Apache Avro

pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod write_async;
pub use avro_schema;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
impl From<avro_schema::error::Error> for crate::error::Error {
fn from(error: avro_schema::error::Error) -> Self {
Self::ExternalFormat(error.to_string())
}
}

pub mod read;
pub mod write;

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
Expand Down Expand Up @@ -46,86 +39,4 @@ macro_rules! avro_decode {
}
}

macro_rules! read_header {
($reader:ident $($_await:tt)*) => {{
let mut items = ahash::AHashMap::new();

loop {
let len = zigzag_i64($reader)$($_await)*? as usize;
if len == 0 {
break Ok(items);
}

items.reserve(len);
for _ in 0..len {
let key = _read_binary($reader)$($_await)*?;
let key = String::from_utf8(key)
.map_err(|_| Error::ExternalFormat("Invalid Avro header".to_string()))?;
let value = _read_binary($reader)$($_await)*?;
items.insert(key, value);
}
}
}};
}

macro_rules! read_metadata {
($reader:ident $($_await:tt)*) => {{
let mut magic_number = [0u8; 4];
$reader.read_exact(&mut magic_number)$($_await)*?;

// see https://avro.apache.org/docs/current/spec.html#Object+Container+Files
if magic_number != [b'O', b'b', b'j', 1u8] {
return Err(Error::ExternalFormat(
"Avro header does not contain a valid magic number".to_string(),
));
}

let header = read_header($reader)$($_await)*?;

let (schema, compression) = deserialize_header(header)?;

let marker = read_file_marker($reader)$($_await)*?;

Ok((schema, compression, marker))
}};
}

pub(crate) use {avro_decode, read_header, read_metadata};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}
pub(crate) use avro_decode;
Loading

0 comments on commit 497d431

Please sign in to comment.