Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Migrated Avro code to avro-schema repo #1199

Merged
merged 1 commit into from
Aug 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@ num-traits = "0.2"
dyn-clone = "1"
bytemuck = { version = "1", features = ["derive"] }
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }

# We need to Hash values before sending them to an hasher. This
# crate provides HashMap that assumes pre-hashed values.
hash_hasher = "^2.0.3"
# For SIMD utf8 validation
simdutf8 = "0.1.3"
# faster hashing
ahash = { version = "0.7" }

# for timezone support
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }

# for csv io
csv = { version = "^1.1", optional = true }
Expand Down Expand Up @@ -64,20 +69,14 @@ base64 = { version = "0.13.0", optional = true }
# to write to parquet as a stream
futures = { version = "0.3", optional = true }

# for faster hashing
ahash = { version = "0.7" }
# to read IPC as a stream
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.14.0", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }
avro-schema = { version = "0.3", optional = true }

# ORC support
orc-format = { version = "0.3.0", optional = true }
Expand Down Expand Up @@ -161,14 +160,11 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]

io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro = ["avro-schema", "streaming-iterator"]
io_avro_compression = [
"libflate",
"snap",
"crc",
"avro-schema/compression",
]
io_avro_async = ["io_avro", "futures", "async-stream"]
io_avro_async = ["avro-schema/async"]

io_orc = [ "orc-format" ]

Expand Down
14 changes: 4 additions & 10 deletions benches/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use avro_rs::types::Record;
use criterion::*;

use arrow2::error::Result;
use arrow2::io::avro::avro_schema::read::read_metadata;
use arrow2::io::avro::read;
use avro_rs::*;
use avro_rs::{Codec, Schema as AvroSchema};
Expand Down Expand Up @@ -42,17 +43,10 @@ fn write(size: usize, has_codec: bool) -> Result<Vec<u8>> {
fn read_batch(buffer: &[u8], size: usize) -> Result<()> {
let mut file = Cursor::new(buffer);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?;
let metadata = read_metadata(&mut file)?;
let schema = read::infer_schema(&metadata.record)?;

let reader = read::Reader::new(
read::Decompressor::new(
read::BlockStreamIterator::new(&mut file, file_marker),
codec,
),
avro_schema,
schema.fields,
None,
);
let reader = read::Reader::new(file, metadata, schema.fields, None);

let mut rows = 0;
for maybe_batch in reader {
Expand Down
10 changes: 6 additions & 4 deletions examples/avro_kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ use arrow2::{
datatypes::{DataType, Field},
error::Error,
io::avro,
io::avro::avro_schema::file::Block,
io::avro::avro_schema::schema::{Field as AvroField, Schema},
};

fn read_schema_id<R: std::io::Read>(reader: &mut R) -> Result<u32, Error> {
Expand All @@ -16,7 +18,7 @@ fn read_schema_id<R: std::io::Read>(reader: &mut R) -> Result<u32, Error> {
Ok(u32::from_be_bytes(header[1..].try_into().unwrap()))
}

fn read_block<R: std::io::Read>(reader: &mut R, block: &mut avro::Block) -> Result<(), Error> {
fn read_block<R: std::io::Read>(reader: &mut R, block: &mut Block) -> Result<(), Error> {
// (we could lump multiple records together by extending the block instead of clearing)
block.data.clear();
reader.read_to_end(&mut block.data)?;
Expand All @@ -34,8 +36,8 @@ fn main() -> Result<(), Error> {

// say that from the registry we concluded that this schema has fields
let avro_fields = vec![
avro_schema::Schema::String(None),
avro_schema::Schema::String(None),
AvroField::new("first_name", Schema::String(None)),
AvroField::new("last_name", Schema::String(None)),
];
// (which we map to arrow fields as)
let fields = vec![
Expand All @@ -44,7 +46,7 @@ fn main() -> Result<(), Error> {
];

// the below allow us to read it to arrow (a chunk of a single element)
let mut block = avro::Block::default();
let mut block = Block::default();
read_block(&mut stream, &mut block)?;

let chunk = avro::read::deserialize(&block, &fields, &avro_fields, &[true, true])?;
Expand Down
14 changes: 6 additions & 8 deletions examples/avro_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::fs::File;
use std::io::BufReader;

use arrow2::error::Result;
use arrow2::io::avro::avro_schema;
use arrow2::io::avro::read;

fn main() -> Result<()> {
Expand All @@ -12,16 +13,13 @@ fn main() -> Result<()> {

let file = &mut BufReader::new(File::open(path)?);

let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?;
let metadata = avro_schema::read::read_metadata(file)?;

println!("{:#?}", avro_schema);
let schema = read::infer_schema(&metadata.record)?;

let reader = read::Reader::new(
read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec),
avro_schema,
schema.fields,
None,
);
println!("{:#?}", metadata);

let reader = read::Reader::new(file, metadata, schema.fields, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand Down
27 changes: 18 additions & 9 deletions examples/avro_read_async.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::sync::Arc;

use futures::pin_mut;
use futures::StreamExt;
use tokio::fs::File;
use tokio_util::compat::*;

use arrow2::error::Result;
use arrow2::io::avro::read::{decompress_block, deserialize};
use arrow2::io::avro::read_async::*;
use arrow2::io::avro::avro_schema::file::Block;
use arrow2::io::avro::avro_schema::read_async::{block_stream, decompress_block, read_metadata};
use arrow2::io::avro::read::{deserialize, infer_schema};

#[tokio::main(flavor = "current_thread")]
async fn main() -> Result<()> {
Expand All @@ -16,22 +19,28 @@ async fn main() -> Result<()> {

let mut reader = File::open(file_path).await?.compat();

let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?;
let avro_schemas = Box::new(avro_schemas);
let projection = Box::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());
let metadata = read_metadata(&mut reader).await?;
let schema = infer_schema(&metadata.record)?;
let metadata = Arc::new(metadata);
let projection = Arc::new(schema.fields.iter().map(|_| true).collect::<Vec<_>>());

let blocks = block_stream(&mut reader, marker).await;
let blocks = block_stream(&mut reader, metadata.marker).await;

pin_mut!(blocks);
while let Some(mut block) = blocks.next().await.transpose()? {
let schema = schema.clone();
let avro_schemas = avro_schemas.clone();
let metadata = metadata.clone();
let projection = projection.clone();
// the content here is CPU-bounded. It should run on a dedicated thread pool
let handle = tokio::task::spawn_blocking(move || {
let mut decompressed = Block::new(0, vec![]);
decompress_block(&mut block, &mut decompressed, compression)?;
deserialize(&decompressed, &schema.fields, &avro_schemas, &projection)
decompress_block(&mut block, &mut decompressed, metadata.compression)?;
deserialize(
&decompressed,
&schema.fields,
&metadata.record.fields,
&projection,
)
});
let chunk = handle.await.unwrap()?;
assert!(!chunk.is_empty());
Expand Down
18 changes: 10 additions & 8 deletions examples/avro_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,33 +4,35 @@ use arrow2::{
array::{Array, Int32Array},
datatypes::{Field, Schema},
error::Result,
io::avro::avro_schema,
io::avro::write,
};

fn write_avro<W: std::io::Write>(
file: &mut W,
arrays: &[&dyn Array],
schema: &Schema,
compression: Option<write::Compression>,
compression: Option<avro_schema::file::Compression>,
) -> Result<()> {
let avro_fields = write::to_avro_schema(schema)?;
let record = write::to_record(schema)?;

let mut serializers = arrays
.iter()
.zip(avro_fields.iter())
.zip(record.fields.iter())
.map(|(array, field)| write::new_serializer(*array, &field.schema))
.collect::<Vec<_>>();
let mut block = write::Block::new(arrays[0].len(), vec![]);
let mut block = avro_schema::file::Block::new(arrays[0].len(), vec![]);

write::serialize(&mut serializers, &mut block);

let mut compressed_block = write::CompressedBlock::default();
let mut compressed_block = avro_schema::file::CompressedBlock::default();

let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?;
let _was_compressed =
avro_schema::write::compress(&mut block, &mut compressed_block, compression)?;

write::write_metadata(file, avro_fields.clone(), compression)?;
avro_schema::write::write_metadata(file, record, compression)?;

write::write_block(file, &compressed_block)?;
avro_schema::write::write_block(file, &compressed_block)?;

Ok(())
}
Expand Down
107 changes: 9 additions & 98 deletions src/io/avro/mod.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
//! Read and write from and to Apache Avro

pub mod read;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod read_async;
pub mod write;
#[cfg(feature = "io_avro_async")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))]
pub mod write_async;
pub use avro_schema;

/// Valid compressions
#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)]
pub enum Compression {
/// Deflate
Deflate,
/// Snappy
Snappy,
impl From<avro_schema::error::Error> for crate::error::Error {
fn from(error: avro_schema::error::Error) -> Self {
Self::ExternalFormat(error.to_string())
}
}

pub mod read;
pub mod write;

// macros that can operate in sync and async code.
macro_rules! avro_decode {
($reader:ident $($_await:tt)*) => {
Expand Down Expand Up @@ -46,86 +39,4 @@ macro_rules! avro_decode {
}
}

macro_rules! read_header {
($reader:ident $($_await:tt)*) => {{
let mut items = ahash::AHashMap::new();

loop {
let len = zigzag_i64($reader)$($_await)*? as usize;
if len == 0 {
break Ok(items);
}

items.reserve(len);
for _ in 0..len {
let key = _read_binary($reader)$($_await)*?;
let key = String::from_utf8(key)
.map_err(|_| Error::ExternalFormat("Invalid Avro header".to_string()))?;
let value = _read_binary($reader)$($_await)*?;
items.insert(key, value);
}
}
}};
}

macro_rules! read_metadata {
($reader:ident $($_await:tt)*) => {{
let mut magic_number = [0u8; 4];
$reader.read_exact(&mut magic_number)$($_await)*?;

// see https://avro.apache.org/docs/current/spec.html#Object+Container+Files
if magic_number != [b'O', b'b', b'j', 1u8] {
return Err(Error::ExternalFormat(
"Avro header does not contain a valid magic number".to_string(),
));
}

let header = read_header($reader)$($_await)*?;

let (schema, compression) = deserialize_header(header)?;

let marker = read_file_marker($reader)$($_await)*?;

Ok((schema, compression, marker))
}};
}

pub(crate) use {avro_decode, read_header, read_metadata};

/// A compressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct CompressedBlock {
/// The number of rows
pub number_of_rows: usize,
/// The compressed data
pub data: Vec<u8>,
}

impl CompressedBlock {
/// Creates a new CompressedBlock
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}

/// An uncompressed Avro block.
#[derive(Debug, Clone, Default, PartialEq, Eq)]
pub struct Block {
/// The number of rows
pub number_of_rows: usize,
/// The uncompressed data
pub data: Vec<u8>,
}

impl Block {
/// Creates a new Block
pub fn new(number_of_rows: usize, data: Vec<u8>) -> Self {
Self {
number_of_rows,
data,
}
}
}
pub(crate) use avro_decode;
Loading