diff --git a/Cargo.toml b/Cargo.toml index 3fe3cb06c78..825ac461276 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,14 +21,19 @@ num-traits = "0.2" dyn-clone = "1" bytemuck = { version = "1", features = ["derive"] } chrono = { version = "0.4", default_features = false, features = ["std"] } -chrono-tz = { version = "0.6", optional = true } -# To efficiently cast numbers to strings -lexical-core = { version = "0.8", optional = true } + # We need to Hash values before sending them to an hasher. This # crate provides HashMap that assumes pre-hashed values. hash_hasher = "^2.0.3" # For SIMD utf8 validation simdutf8 = "0.1.3" +# faster hashing +ahash = { version = "0.7" } + +# for timezone support +chrono-tz = { version = "0.6", optional = true } +# To efficiently cast numbers to strings +lexical-core = { version = "0.8", optional = true } # for csv io csv = { version = "^1.1", optional = true } @@ -64,20 +69,15 @@ base64 = { version = "0.13.0", optional = true } # to write to parquet as a stream futures = { version = "0.3", optional = true } -# for faster hashing -ahash = { version = "0.7" } +# to read IPC as a stream +async-stream = { version = "0.3.2", optional = true } # parquet support parquet2 = { version = "0.14.0", optional = true, default_features = false } # avro support -avro-schema = { version = "0.2", optional = true } -# compression of avro -libflate = { version = "1.1.1", optional = true } -snap = { version = "1", optional = true } -crc = { version = "2", optional = true } -# async avro -async-stream = { version = "0.3.2", optional = true } +#avro-schema = { version = "0.2", optional = true } +avro-schema = { git = "https://github.com/DataEngineeringLabs/avro-schema", branch = "more", optional = true } # ORC support orc-format = { version = "0.3.0", optional = true } @@ -161,14 +161,11 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] - -io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"] +io_avro = ["avro-schema", "streaming-iterator"] io_avro_compression = [ - "libflate", - "snap", - "crc", + "avro-schema/compression", ] -io_avro_async = ["io_avro", "futures", "async-stream"] +io_avro_async = ["avro-schema/async"] io_orc = [ "orc-format" ] diff --git a/benches/avro_read.rs b/benches/avro_read.rs index 9143db9e7e0..37088492df8 100644 --- a/benches/avro_read.rs +++ b/benches/avro_read.rs @@ -4,6 +4,7 @@ use avro_rs::types::Record; use criterion::*; use arrow2::error::Result; +use arrow2::io::avro::avro_schema::read::read_metadata; use arrow2::io::avro::read; use avro_rs::*; use avro_rs::{Codec, Schema as AvroSchema}; @@ -42,17 +43,10 @@ fn write(size: usize, has_codec: bool) -> Result> { fn read_batch(buffer: &[u8], size: usize) -> Result<()> { let mut file = Cursor::new(buffer); - let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?; + let metadata = read_metadata(&mut file)?; + let schema = read::infer_schema(&metadata.record)?; - let reader = read::Reader::new( - read::Decompressor::new( - read::BlockStreamIterator::new(&mut file, file_marker), - codec, - ), - avro_schema, - schema.fields, - None, - ); + let reader = read::Reader::new(file, metadata, schema.fields, None); let mut rows = 0; for maybe_batch in reader { diff --git a/examples/avro_kafka.rs b/examples/avro_kafka.rs index 5fb392fd93a..7645024939e 100644 --- a/examples/avro_kafka.rs +++ b/examples/avro_kafka.rs @@ -2,6 +2,8 @@ use arrow2::{ datatypes::{DataType, Field}, error::Error, io::avro, + io::avro::avro_schema::file::Block, + io::avro::avro_schema::schema::{Field as AvroField, Schema}, }; fn read_schema_id(reader: &mut R) -> Result { @@ -16,7 +18,7 @@ fn read_schema_id(reader: &mut R) -> Result { Ok(u32::from_be_bytes(header[1..].try_into().unwrap())) } -fn read_block(reader: &mut R, block: &mut avro::Block) -> Result<(), Error> { +fn read_block(reader: &mut R, block: &mut Block) -> Result<(), Error> { // (we could lump multiple records together by extending the block instead of clearing) block.data.clear(); reader.read_to_end(&mut block.data)?; @@ -34,8 +36,8 @@ fn main() -> Result<(), Error> { // say that from the registry we concluded that this schema has fields let avro_fields = vec![ - avro_schema::Schema::String(None), - avro_schema::Schema::String(None), + AvroField::new("first_name", Schema::String(None)), + AvroField::new("last_name", Schema::String(None)), ]; // (which we map to arrow fields as) let fields = vec![ @@ -44,7 +46,7 @@ fn main() -> Result<(), Error> { ]; // the below allow us to read it to arrow (a chunk of a single element) - let mut block = avro::Block::default(); + let mut block = Block::default(); read_block(&mut stream, &mut block)?; let chunk = avro::read::deserialize(&block, &fields, &avro_fields, &[true, true])?; diff --git a/examples/avro_read.rs b/examples/avro_read.rs index 2d0148cf9d9..749cf3c50db 100644 --- a/examples/avro_read.rs +++ b/examples/avro_read.rs @@ -2,6 +2,7 @@ use std::fs::File; use std::io::BufReader; use arrow2::error::Result; +use arrow2::io::avro::avro_schema; use arrow2::io::avro::read; fn main() -> Result<()> { @@ -12,16 +13,13 @@ fn main() -> Result<()> { let file = &mut BufReader::new(File::open(path)?); - let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; + let metadata = avro_schema::read::read_metadata(file)?; - println!("{:#?}", avro_schema); + let schema = read::infer_schema(&metadata.record)?; - let reader = read::Reader::new( - read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), - avro_schema, - schema.fields, - None, - ); + println!("{:#?}", metadata); + + let reader = read::Reader::new(file, metadata, schema.fields, None); for maybe_chunk in reader { let columns = maybe_chunk?; diff --git a/examples/avro_read_async.rs b/examples/avro_read_async.rs index 9c9c3649da7..ac7ad0b6450 100644 --- a/examples/avro_read_async.rs +++ b/examples/avro_read_async.rs @@ -1,11 +1,14 @@ +use std::sync::Arc; + use futures::pin_mut; use futures::StreamExt; use tokio::fs::File; use tokio_util::compat::*; use arrow2::error::Result; -use arrow2::io::avro::read::{decompress_block, deserialize}; -use arrow2::io::avro::read_async::*; +use arrow2::io::avro::avro_schema::file::Block; +use arrow2::io::avro::avro_schema::read_async::{block_stream, decompress_block, read_metadata}; +use arrow2::io::avro::read::{deserialize, infer_schema}; #[tokio::main(flavor = "current_thread")] async fn main() -> Result<()> { @@ -16,22 +19,28 @@ async fn main() -> Result<()> { let mut reader = File::open(file_path).await?.compat(); - let (avro_schemas, schema, compression, marker) = read_metadata(&mut reader).await?; - let avro_schemas = Box::new(avro_schemas); - let projection = Box::new(schema.fields.iter().map(|_| true).collect::>()); + let metadata = read_metadata(&mut reader).await?; + let schema = infer_schema(&metadata.record)?; + let metadata = Arc::new(metadata); + let projection = Arc::new(schema.fields.iter().map(|_| true).collect::>()); - let blocks = block_stream(&mut reader, marker).await; + let blocks = block_stream(&mut reader, metadata.marker).await; pin_mut!(blocks); while let Some(mut block) = blocks.next().await.transpose()? { let schema = schema.clone(); - let avro_schemas = avro_schemas.clone(); + let metadata = metadata.clone(); let projection = projection.clone(); // the content here is CPU-bounded. It should run on a dedicated thread pool let handle = tokio::task::spawn_blocking(move || { let mut decompressed = Block::new(0, vec![]); - decompress_block(&mut block, &mut decompressed, compression)?; - deserialize(&decompressed, &schema.fields, &avro_schemas, &projection) + decompress_block(&mut block, &mut decompressed, metadata.compression)?; + deserialize( + &decompressed, + &schema.fields, + &metadata.record.fields, + &projection, + ) }); let chunk = handle.await.unwrap()?; assert!(!chunk.is_empty()); diff --git a/examples/avro_write.rs b/examples/avro_write.rs index 5e8224c12cb..6042172913a 100644 --- a/examples/avro_write.rs +++ b/examples/avro_write.rs @@ -4,6 +4,7 @@ use arrow2::{ array::{Array, Int32Array}, datatypes::{Field, Schema}, error::Result, + io::avro::avro_schema, io::avro::write, }; @@ -11,26 +12,27 @@ fn write_avro( file: &mut W, arrays: &[&dyn Array], schema: &Schema, - compression: Option, + compression: Option, ) -> Result<()> { - let avro_fields = write::to_avro_schema(schema)?; + let record = write::to_record(schema)?; let mut serializers = arrays .iter() - .zip(avro_fields.iter()) + .zip(record.fields.iter()) .map(|(array, field)| write::new_serializer(*array, &field.schema)) .collect::>(); - let mut block = write::Block::new(arrays[0].len(), vec![]); + let mut block = avro_schema::file::Block::new(arrays[0].len(), vec![]); write::serialize(&mut serializers, &mut block); - let mut compressed_block = write::CompressedBlock::default(); + let mut compressed_block = avro_schema::file::CompressedBlock::default(); - let _was_compressed = write::compress(&mut block, &mut compressed_block, compression)?; + let _was_compressed = + avro_schema::write::compress(&mut block, &mut compressed_block, compression)?; - write::write_metadata(file, avro_fields.clone(), compression)?; + avro_schema::write::write_metadata(file, record, compression)?; - write::write_block(file, &compressed_block)?; + avro_schema::write::write_block(file, &compressed_block)?; Ok(()) } diff --git a/src/io/avro/mod.rs b/src/io/avro/mod.rs index 5addae95e4b..bf7bda85f19 100644 --- a/src/io/avro/mod.rs +++ b/src/io/avro/mod.rs @@ -1,23 +1,16 @@ //! Read and write from and to Apache Avro -pub mod read; -#[cfg(feature = "io_avro_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] -pub mod read_async; -pub mod write; -#[cfg(feature = "io_avro_async")] -#[cfg_attr(docsrs, doc(cfg(feature = "io_avro_async")))] -pub mod write_async; +pub use avro_schema; -/// Valid compressions -#[derive(Debug, Clone, Copy, Hash, PartialEq, Eq)] -pub enum Compression { - /// Deflate - Deflate, - /// Snappy - Snappy, +impl From for crate::error::Error { + fn from(error: avro_schema::error::Error) -> Self { + Self::ExternalFormat(error.to_string()) + } } +pub mod read; +pub mod write; + // macros that can operate in sync and async code. macro_rules! avro_decode { ($reader:ident $($_await:tt)*) => { @@ -46,86 +39,4 @@ macro_rules! avro_decode { } } -macro_rules! read_header { - ($reader:ident $($_await:tt)*) => {{ - let mut items = ahash::AHashMap::new(); - - loop { - let len = zigzag_i64($reader)$($_await)*? as usize; - if len == 0 { - break Ok(items); - } - - items.reserve(len); - for _ in 0..len { - let key = _read_binary($reader)$($_await)*?; - let key = String::from_utf8(key) - .map_err(|_| Error::ExternalFormat("Invalid Avro header".to_string()))?; - let value = _read_binary($reader)$($_await)*?; - items.insert(key, value); - } - } - }}; -} - -macro_rules! read_metadata { - ($reader:ident $($_await:tt)*) => {{ - let mut magic_number = [0u8; 4]; - $reader.read_exact(&mut magic_number)$($_await)*?; - - // see https://avro.apache.org/docs/current/spec.html#Object+Container+Files - if magic_number != [b'O', b'b', b'j', 1u8] { - return Err(Error::ExternalFormat( - "Avro header does not contain a valid magic number".to_string(), - )); - } - - let header = read_header($reader)$($_await)*?; - - let (schema, compression) = deserialize_header(header)?; - - let marker = read_file_marker($reader)$($_await)*?; - - Ok((schema, compression, marker)) - }}; -} - -pub(crate) use {avro_decode, read_header, read_metadata}; - -/// A compressed Avro block. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct CompressedBlock { - /// The number of rows - pub number_of_rows: usize, - /// The compressed data - pub data: Vec, -} - -impl CompressedBlock { - /// Creates a new CompressedBlock - pub fn new(number_of_rows: usize, data: Vec) -> Self { - Self { - number_of_rows, - data, - } - } -} - -/// An uncompressed Avro block. -#[derive(Debug, Clone, Default, PartialEq, Eq)] -pub struct Block { - /// The number of rows - pub number_of_rows: usize, - /// The uncompressed data - pub data: Vec, -} - -impl Block { - /// Creates a new Block - pub fn new(number_of_rows: usize, data: Vec) -> Self { - Self { - number_of_rows, - data, - } - } -} +pub(crate) use avro_decode; diff --git a/src/io/avro/read/block.rs b/src/io/avro/read/block.rs deleted file mode 100644 index 62cc5a88efc..00000000000 --- a/src/io/avro/read/block.rs +++ /dev/null @@ -1,101 +0,0 @@ -//! APIs to read from Avro format to arrow. -use std::io::Read; - -use fallible_streaming_iterator::FallibleStreamingIterator; - -use crate::error::{Error, Result}; - -use super::super::CompressedBlock; -use super::util; - -fn read_size(reader: &mut R) -> Result<(usize, usize)> { - let rows = match util::zigzag_i64(reader) { - Ok(a) => a, - Err(Error::Io(io_err)) => { - if let std::io::ErrorKind::UnexpectedEof = io_err.kind() { - // end - return Ok((0, 0)); - } else { - return Err(Error::Io(io_err)); - } - } - Err(other) => return Err(other), - }; - let bytes = util::zigzag_i64(reader)?; - Ok((rows as usize, bytes as usize)) -} - -/// Reads a [`CompressedBlock`] from the `reader`. -/// # Error -/// This function errors iff either the block cannot be read or the sync marker does not match -fn read_block( - reader: &mut R, - block: &mut CompressedBlock, - file_marker: [u8; 16], -) -> Result<()> { - let (rows, bytes) = read_size(reader)?; - block.number_of_rows = rows; - if rows == 0 { - return Ok(()); - }; - - block.data.clear(); - block.data.try_reserve(bytes)?; - reader.take(bytes as u64).read_to_end(&mut block.data)?; - - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker)?; - - if marker != file_marker { - return Err(Error::ExternalFormat( - "Avro: the sync marker in the block does not correspond to the file marker".to_string(), - )); - } - Ok(()) -} - -/// [`FallibleStreamingIterator`] of compressed avro blocks -pub struct BlockStreamIterator { - buf: CompressedBlock, - reader: R, - file_marker: [u8; 16], -} - -impl BlockStreamIterator { - /// Creates a new [`BlockStreamIterator`]. - pub fn new(reader: R, file_marker: [u8; 16]) -> Self { - Self { - reader, - file_marker, - buf: CompressedBlock::new(0, vec![]), - } - } - - /// The buffer of [`BlockStreamIterator`]. - pub fn buffer(&mut self) -> &mut CompressedBlock { - &mut self.buf - } - - /// Deconstructs itself - pub fn into_inner(self) -> (R, Vec) { - (self.reader, self.buf.data) - } -} - -impl FallibleStreamingIterator for BlockStreamIterator { - type Error = Error; - type Item = CompressedBlock; - - fn advance(&mut self) -> Result<()> { - read_block(&mut self.reader, &mut self.buf, self.file_marker)?; - Ok(()) - } - - fn get(&self) -> Option<&Self::Item> { - if self.buf.number_of_rows > 0 { - Some(&self.buf) - } else { - None - } - } -} diff --git a/src/io/avro/read/decompress.rs b/src/io/avro/read/decompress.rs deleted file mode 100644 index bc5fcbd5825..00000000000 --- a/src/io/avro/read/decompress.rs +++ /dev/null @@ -1,119 +0,0 @@ -//! APIs to read from Avro format to arrow. -use std::io::Read; - -use fallible_streaming_iterator::FallibleStreamingIterator; - -use crate::error::{Error, Result}; - -use super::super::{Block, CompressedBlock}; -use super::BlockStreamIterator; -use super::Compression; - -#[cfg(feature = "io_avro_compression")] -const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); - -/// Decompresses an Avro block. -/// Returns whether the buffers where swapped. -pub fn decompress_block( - block: &mut CompressedBlock, - decompressed: &mut Block, - compression: Option, -) -> Result { - decompressed.number_of_rows = block.number_of_rows; - let block = &mut block.data; - let decompressed = &mut decompressed.data; - - match compression { - None => { - std::mem::swap(block, decompressed); - Ok(true) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Deflate) => { - decompressed.clear(); - let mut decoder = libflate::deflate::Decoder::new(&block[..]); - decoder.read_to_end(decompressed)?; - Ok(false) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Snappy) => { - let crc = &block[block.len() - 4..]; - let block = &block[..block.len() - 4]; - - let len = snap::raw::decompress_len(block) - .map_err(|e| Error::ExternalFormat(e.to_string()))?; - decompressed.clear(); - decompressed.resize(len, 0); - snap::raw::Decoder::new() - .decompress(block, decompressed) - .map_err(|e| Error::ExternalFormat(e.to_string()))?; - - let expected_crc = u32::from_be_bytes([crc[0], crc[1], crc[2], crc[3]]); - - let actual_crc = CRC_TABLE.checksum(decompressed); - if expected_crc != actual_crc { - return Err(Error::ExternalFormat( - "The crc of snap-compressed block does not match".to_string(), - )); - } - Ok(false) - } - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Deflate) => Err(Error::InvalidArgumentError( - "The avro file is deflate-encoded but feature 'io_avro_compression' is not active." - .to_string(), - )), - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Snappy) => Err(Error::InvalidArgumentError( - "The avro file is snappy-encoded but feature 'io_avro_compression' is not active." - .to_string(), - )), - } -} - -/// [`FallibleStreamingIterator`] of decompressed Avro blocks -pub struct Decompressor { - blocks: BlockStreamIterator, - codec: Option, - buf: Block, - was_swapped: bool, -} - -impl Decompressor { - /// Creates a new [`Decompressor`]. - pub fn new(blocks: BlockStreamIterator, codec: Option) -> Self { - Self { - blocks, - codec, - buf: Block::new(0, vec![]), - was_swapped: false, - } - } - - /// Deconstructs itself into its internal reader - pub fn into_inner(self) -> R { - self.blocks.into_inner().0 - } -} - -impl FallibleStreamingIterator for Decompressor { - type Error = Error; - type Item = Block; - - fn advance(&mut self) -> Result<()> { - if self.was_swapped { - std::mem::swap(&mut self.blocks.buffer().data, &mut self.buf.data); - } - self.blocks.advance()?; - self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf, self.codec)?; - Ok(()) - } - - fn get(&self) -> Option<&Self::Item> { - if self.buf.number_of_rows > 0 { - Some(&self.buf) - } else { - None - } - } -} diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 1bce0fb52e0..9fa9f152544 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -1,7 +1,8 @@ use std::convert::TryInto; -use avro_schema::Record; -use avro_schema::{Enum, Schema as AvroSchema}; +use avro_schema::file::Block; +use avro_schema::schema::Record; +use avro_schema::schema::{Enum, Field as AvroField, Schema as AvroSchema}; use crate::array::*; use crate::chunk::Chunk; @@ -10,13 +11,12 @@ use crate::error::Error; use crate::error::Result; use crate::types::months_days_ns; -use super::super::Block; use super::nested::*; use super::util; fn make_mutable( data_type: &DataType, - avro_schema: Option<&AvroSchema>, + avro_field: Option<&AvroSchema>, capacity: usize, ) -> Result> { Ok(match data_type.to_physical_type() { @@ -34,7 +34,7 @@ fn make_mutable( Box::new(MutableUtf8Array::::with_capacity(capacity)) as Box } PhysicalType::Dictionary(_) => { - if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_schema { + if let Some(AvroSchema::Enum(Enum { symbols, .. })) = avro_field { let values = Utf8Array::::from_slice(symbols); Box::new(FixedItemsUtf8Dictionary::with_capacity(values, capacity)) as Box @@ -437,24 +437,30 @@ fn skip_item<'a>(field: &Field, avro_field: &AvroSchema, mut block: &'a [u8]) -> Ok(block) } -/// Deserializes a [`Block`] into [`Chunk`], projected +/// Deserializes a [`Block`] assumed to be encoded according to [`AvroField`] into [`Chunk`], +/// using `projection` to ignore `avro_fields`. +/// # Panics +/// `fields`, `avro_fields` and `projection` must have the same length. pub fn deserialize( block: &Block, fields: &[Field], - avro_schemas: &[AvroSchema], + avro_fields: &[AvroField], projection: &[bool], ) -> Result>> { + assert_eq!(fields.len(), avro_fields.len()); + assert_eq!(fields.len(), projection.len()); + let rows = block.number_of_rows; let mut block = block.data.as_ref(); // create mutables, one per field let mut arrays: Vec> = fields .iter() - .zip(avro_schemas.iter()) + .zip(avro_fields.iter()) .zip(projection.iter()) - .map(|((field, avro_schema), projection)| { + .map(|((field, avro_field), projection)| { if *projection { - make_mutable(&field.data_type, Some(avro_schema), rows) + make_mutable(&field.data_type, Some(&avro_field.schema), rows) } else { // just something; we are not going to use it make_mutable(&DataType::Int32, None, 0) @@ -467,14 +473,14 @@ pub fn deserialize( let iter = arrays .iter_mut() .zip(fields.iter()) - .zip(avro_schemas.iter()) + .zip(avro_fields.iter()) .zip(projection.iter()); for (((array, field), avro_field), projection) in iter { block = if *projection { - deserialize_item(array.as_mut(), field.is_nullable, avro_field, block) + deserialize_item(array.as_mut(), field.is_nullable, &avro_field.schema, block) } else { - skip_item(field, avro_field, block) + skip_item(field, &avro_field.schema, block) }? } } diff --git a/src/io/avro/read/header.rs b/src/io/avro/read/header.rs deleted file mode 100644 index c985ce25084..00000000000 --- a/src/io/avro/read/header.rs +++ /dev/null @@ -1,29 +0,0 @@ -use ahash::AHashMap; -use avro_schema::Schema; -use serde_json; - -use crate::error::{Error, Result}; - -use super::Compression; - -/// Deserializes the Avro header into an Avro [`Schema`] and optional [`Compression`]. -pub(crate) fn deserialize_header( - header: AHashMap>, -) -> Result<(Schema, Option)> { - let schema = header - .get("avro.schema") - .ok_or_else(|| Error::ExternalFormat("Avro schema must be present".to_string())) - .and_then(|bytes| { - serde_json::from_slice(bytes.as_ref()).map_err(|e| Error::ExternalFormat(e.to_string())) - })?; - - let compression = header.get("avro.codec").and_then(|bytes| { - let bytes: &[u8] = bytes.as_ref(); - match bytes { - b"snappy" => Some(Compression::Snappy), - b"deflate" => Some(Compression::Deflate), - _ => None, - } - }); - Ok((schema, compression)) -} diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index f02af4052ee..5014499c12a 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -1,51 +1,28 @@ //! APIs to read from Avro format to arrow. use std::io::Read; -use avro_schema::{Record, Schema as AvroSchema}; -use fallible_streaming_iterator::FallibleStreamingIterator; +use avro_schema::file::FileMetadata; +use avro_schema::read::fallible_streaming_iterator::FallibleStreamingIterator; +use avro_schema::read::{block_iterator, BlockStreamingIterator}; +use avro_schema::schema::Field as AvroField; -mod block; -mod decompress; -pub use block::BlockStreamIterator; -pub use decompress::{decompress_block, Decompressor}; mod deserialize; pub use deserialize::deserialize; -mod header; mod nested; mod schema; mod util; -pub(super) use header::deserialize_header; -pub(super) use schema::infer_schema; +pub use schema::infer_schema; use crate::array::Array; use crate::chunk::Chunk; -use crate::datatypes::{Field, Schema}; +use crate::datatypes::Field; use crate::error::Result; -use super::Compression; - -/// Reads the avro metadata from `reader` into a [`Schema`], [`Compression`] and magic marker. -#[allow(clippy::type_complexity)] -pub fn read_metadata( - reader: &mut R, -) -> Result<(Vec, Schema, Option, [u8; 16])> { - let (avro_schema, codec, marker) = util::read_schema(reader)?; - let schema = infer_schema(&avro_schema)?; - - let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { - fields.into_iter().map(|x| x.schema).collect() - } else { - panic!() - }; - - Ok((avro_schema, schema, codec, marker)) -} - /// Single threaded, blocking reader of Avro; [`Iterator`] of [`Chunk`]. pub struct Reader { - iter: Decompressor, - avro_schemas: Vec, + iter: BlockStreamingIterator, + avro_fields: Vec, fields: Vec, projection: Vec, } @@ -53,15 +30,16 @@ pub struct Reader { impl Reader { /// Creates a new [`Reader`]. pub fn new( - iter: Decompressor, - avro_schemas: Vec, + reader: R, + metadata: FileMetadata, fields: Vec, projection: Option>, ) -> Self { let projection = projection.unwrap_or_else(|| fields.iter().map(|_| true).collect()); + Self { - iter, - avro_schemas, + iter: block_iterator(reader, metadata.compression, metadata.marker), + avro_fields: metadata.record.fields, fields, projection, } @@ -78,12 +56,12 @@ impl Iterator for Reader { fn next(&mut self) -> Option { let fields = &self.fields[..]; - let avro_schemas = &self.avro_schemas; + let avro_fields = &self.avro_fields; let projection = &self.projection; self.iter .next() .transpose() - .map(|maybe_block| deserialize(maybe_block?, fields, avro_schemas, projection)) + .map(|maybe_block| deserialize(maybe_block?, fields, avro_fields, projection)) } } diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 13408efd9c2..b45fc95c0ea 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -1,4 +1,4 @@ -use avro_schema::{Enum, Fixed, Record, Schema as AvroSchema}; +use avro_schema::schema::{Enum, Fixed, Record, Schema as AvroSchema}; use crate::datatypes::*; use crate::error::{Error, Result}; @@ -19,26 +19,21 @@ fn external_props(schema: &AvroSchema) -> Metadata { props } -/// Infers an [`Schema`] from the root [`AvroSchema`]. +/// Infers an [`Schema`] from the root [`Record`]. /// This -pub fn infer_schema(schema: &AvroSchema) -> Result { - if let AvroSchema::Record(Record { fields, .. }) = schema { - Ok(fields - .iter() - .map(|field| { - schema_to_field( - &field.schema, - Some(&field.name), - external_props(&field.schema), - ) - }) - .collect::>>()? - .into()) - } else { - Err(Error::OutOfSpec( - "The root AvroSchema must be of type Record".to_string(), - )) - } +pub fn infer_schema(record: &Record) -> Result { + Ok(record + .fields + .iter() + .map(|field| { + schema_to_field( + &field.schema, + Some(&field.name), + external_props(&field.schema), + ) + }) + .collect::>>()? + .into()) } fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> Result { @@ -48,24 +43,24 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> AvroSchema::Boolean => DataType::Boolean, AvroSchema::Int(logical) => match logical { Some(logical) => match logical { - avro_schema::IntLogical::Date => DataType::Date32, - avro_schema::IntLogical::Time => DataType::Time32(TimeUnit::Millisecond), + avro_schema::schema::IntLogical::Date => DataType::Date32, + avro_schema::schema::IntLogical::Time => DataType::Time32(TimeUnit::Millisecond), }, None => DataType::Int32, }, AvroSchema::Long(logical) => match logical { Some(logical) => match logical { - avro_schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond), - avro_schema::LongLogical::TimestampMillis => { + avro_schema::schema::LongLogical::Time => DataType::Time64(TimeUnit::Microsecond), + avro_schema::schema::LongLogical::TimestampMillis => { DataType::Timestamp(TimeUnit::Millisecond, Some("00:00".to_string())) } - avro_schema::LongLogical::TimestampMicros => { + avro_schema::schema::LongLogical::TimestampMicros => { DataType::Timestamp(TimeUnit::Microsecond, Some("00:00".to_string())) } - avro_schema::LongLogical::LocalTimestampMillis => { + avro_schema::schema::LongLogical::LocalTimestampMillis => { DataType::Timestamp(TimeUnit::Millisecond, None) } - avro_schema::LongLogical::LocalTimestampMicros => { + avro_schema::schema::LongLogical::LocalTimestampMicros => { DataType::Timestamp(TimeUnit::Microsecond, None) } }, @@ -75,7 +70,7 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> AvroSchema::Double => DataType::Float64, AvroSchema::Bytes(logical) => match logical { Some(logical) => match logical { - avro_schema::BytesLogical::Decimal(precision, scale) => { + avro_schema::schema::BytesLogical::Decimal(precision, scale) => { DataType::Decimal(*precision, *scale) } }, @@ -134,10 +129,10 @@ fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> } AvroSchema::Fixed(Fixed { size, logical, .. }) => match logical { Some(logical) => match logical { - avro_schema::FixedLogical::Decimal(precision, scale) => { + avro_schema::schema::FixedLogical::Decimal(precision, scale) => { DataType::Decimal(*precision, *scale) } - avro_schema::FixedLogical::Duration => { + avro_schema::schema::FixedLogical::Duration => { DataType::Interval(IntervalUnit::MonthDayNano) } }, diff --git a/src/io/avro/read/util.rs b/src/io/avro/read/util.rs index b14870a3905..f3cb07c6b04 100644 --- a/src/io/avro/read/util.rs +++ b/src/io/avro/read/util.rs @@ -1,12 +1,8 @@ -use ahash::AHashMap; use std::io::Read; -use avro_schema::Schema; - use crate::error::{Error, Result}; -use super::super::{avro_decode, read_header, read_metadata}; -use super::{deserialize_header, Compression}; +use super::super::avro_decode; pub fn zigzag_i64(reader: &mut R) -> Result { let z = decode_variable(reader)?; @@ -20,28 +16,3 @@ pub fn zigzag_i64(reader: &mut R) -> Result { fn decode_variable(reader: &mut R) -> Result { avro_decode!(reader) } - -fn _read_binary(reader: &mut R) -> Result> { - let len: usize = zigzag_i64(reader)? as usize; - let mut buf = vec![]; - buf.try_reserve(len)?; - reader.take(len as u64).read_to_end(&mut buf)?; - Ok(buf) -} - -fn read_header(reader: &mut R) -> Result>> { - read_header!(reader) -} - -fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker)?; - Ok(marker) -} - -/// Reads the schema from `reader`, returning the file's [`Schema`] and [`Compression`]. -/// # Error -/// This function errors iff the header is not a valid avro file header. -pub fn read_schema(reader: &mut R) -> Result<(Schema, Option, [u8; 16])> { - read_metadata!(reader) -} diff --git a/src/io/avro/read_async/block.rs b/src/io/avro/read_async/block.rs deleted file mode 100644 index ff92d6a97e7..00000000000 --- a/src/io/avro/read_async/block.rs +++ /dev/null @@ -1,77 +0,0 @@ -//! APIs to read from Avro format to arrow. -use async_stream::try_stream; -use futures::AsyncRead; -use futures::AsyncReadExt; -use futures::Stream; - -use crate::error::{Error, Result}; - -use super::CompressedBlock; - -use super::utils::zigzag_i64; - -async fn read_size(reader: &mut R) -> Result<(usize, usize)> { - let rows = match zigzag_i64(reader).await { - Ok(a) => a, - Err(Error::Io(io_err)) => { - if let std::io::ErrorKind::UnexpectedEof = io_err.kind() { - // end - return Ok((0, 0)); - } else { - return Err(Error::Io(io_err)); - } - } - Err(other) => return Err(other), - }; - let bytes = zigzag_i64(reader).await?; - Ok((rows as usize, bytes as usize)) -} - -/// Reads a [`CompressedBlock`] from the `reader`. -/// # Error -/// This function errors iff either the block cannot be read or the sync marker does not match -async fn read_block( - reader: &mut R, - block: &mut CompressedBlock, - file_marker: [u8; 16], -) -> Result<()> { - let (rows, bytes) = read_size(reader).await?; - block.number_of_rows = rows; - if rows == 0 { - return Ok(()); - }; - - block.data.clear(); - block.data.try_reserve(bytes)?; - reader - .take(bytes as u64) - .read_to_end(&mut block.data) - .await?; - - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker).await?; - - if marker != file_marker { - return Err(Error::ExternalFormat( - "Avro: the sync marker in the block does not correspond to the file marker".to_string(), - )); - } - Ok(()) -} - -/// Returns a fallible [`Stream`] of Avro blocks bound to `reader` -pub async fn block_stream( - reader: &mut R, - file_marker: [u8; 16], -) -> impl Stream> + '_ { - try_stream! { - loop { - let mut block = CompressedBlock::new(0, vec![]); - read_block(reader, &mut block, file_marker).await?; - if block.number_of_rows == 0 { - break - } - yield block - } - } -} diff --git a/src/io/avro/read_async/metadata.rs b/src/io/avro/read_async/metadata.rs deleted file mode 100644 index 213428e0f0b..00000000000 --- a/src/io/avro/read_async/metadata.rs +++ /dev/null @@ -1,60 +0,0 @@ -//! Async Avro -use ahash::AHashMap; -use avro_schema::{Record, Schema as AvroSchema}; -use futures::AsyncRead; -use futures::AsyncReadExt; - -use crate::datatypes::Schema; -use crate::error::{Error, Result}; - -use super::super::read::deserialize_header; -use super::super::read::infer_schema; -use super::super::Compression; -use super::super::{read_header, read_metadata}; -use super::utils::zigzag_i64; - -/// Reads Avro's metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker. -#[allow(clippy::type_complexity)] -async fn read_metadata_async( - reader: &mut R, -) -> Result<(AvroSchema, Option, [u8; 16])> { - read_metadata!(reader.await) -} - -/// Reads the avro metadata from `reader` into a [`AvroSchema`], [`Compression`] and magic marker. -#[allow(clippy::type_complexity)] -pub async fn read_metadata( - reader: &mut R, -) -> Result<(Vec, Schema, Option, [u8; 16])> { - let (avro_schema, codec, marker) = read_metadata_async(reader).await?; - let schema = infer_schema(&avro_schema)?; - - let avro_schema = if let AvroSchema::Record(Record { fields, .. }) = avro_schema { - fields.into_iter().map(|x| x.schema).collect() - } else { - panic!() - }; - - Ok((avro_schema, schema, codec, marker)) -} - -/// Reads the file marker asynchronously -async fn read_file_marker(reader: &mut R) -> Result<[u8; 16]> { - let mut marker = [0u8; 16]; - reader.read_exact(&mut marker).await?; - Ok(marker) -} - -async fn _read_binary(reader: &mut R) -> Result> { - let len: usize = zigzag_i64(reader).await? as usize; - let mut buf = vec![]; - buf.try_reserve(len)?; - reader.take(len as u64).read_to_end(&mut buf).await?; - Ok(buf) -} - -async fn read_header( - reader: &mut R, -) -> Result>> { - read_header!(reader.await) -} diff --git a/src/io/avro/read_async/mod.rs b/src/io/avro/read_async/mod.rs deleted file mode 100644 index dc3d7c276ba..00000000000 --- a/src/io/avro/read_async/mod.rs +++ /dev/null @@ -1,9 +0,0 @@ -//! Async read Avro - -mod block; -mod metadata; -pub(self) mod utils; - -pub use super::{Block, CompressedBlock}; -pub use block::block_stream; -pub use metadata::read_metadata; diff --git a/src/io/avro/read_async/utils.rs b/src/io/avro/read_async/utils.rs deleted file mode 100644 index 25ed898d09f..00000000000 --- a/src/io/avro/read_async/utils.rs +++ /dev/null @@ -1,19 +0,0 @@ -use futures::AsyncRead; -use futures::AsyncReadExt; - -use crate::error::{Error, Result}; - -use super::super::avro_decode; - -pub async fn zigzag_i64(reader: &mut R) -> Result { - let z = decode_variable(reader).await?; - Ok(if z & 0x1 == 0 { - (z >> 1) as i64 - } else { - !(z >> 1) as i64 - }) -} - -async fn decode_variable(reader: &mut R) -> Result { - avro_decode!(reader.await) -} diff --git a/src/io/avro/write/block.rs b/src/io/avro/write/block.rs deleted file mode 100644 index 0758c95c4ac..00000000000 --- a/src/io/avro/write/block.rs +++ /dev/null @@ -1,19 +0,0 @@ -use std::io::Write; - -use crate::error::Result; - -use super::super::CompressedBlock; -use super::{util::zigzag_encode, SYNC_NUMBER}; - -/// Writes a [`CompressedBlock`] to `writer` -pub fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> { - // write size and rows - zigzag_encode(compressed_block.number_of_rows as i64, writer)?; - zigzag_encode(compressed_block.data.len() as i64, writer)?; - - writer.write_all(&compressed_block.data)?; - - writer.write_all(&SYNC_NUMBER)?; - - Ok(()) -} diff --git a/src/io/avro/write/compress.rs b/src/io/avro/write/compress.rs deleted file mode 100644 index 5121003a2b2..00000000000 --- a/src/io/avro/write/compress.rs +++ /dev/null @@ -1,62 +0,0 @@ -//! APIs to read from Avro format to arrow. - -use crate::error::Result; - -use super::Compression; -use super::{Block, CompressedBlock}; - -#[cfg(feature = "io_avro_compression")] -const CRC_TABLE: crc::Crc = crc::Crc::::new(&crc::CRC_32_ISO_HDLC); - -/// Compresses a [`Block`] to a [`CompressedBlock`]. -pub fn compress( - block: &mut Block, - compressed: &mut CompressedBlock, - compression: Option, -) -> Result { - compressed.number_of_rows = block.number_of_rows; - let block = &mut block.data; - let compressed = &mut compressed.data; - - match compression { - None => { - std::mem::swap(block, compressed); - Ok(true) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Deflate) => { - use std::io::Write; - compressed.clear(); - let mut encoder = libflate::deflate::Encoder::new(compressed); - encoder.write_all(block)?; - encoder.finish(); - Ok(false) - } - #[cfg(feature = "io_avro_compression")] - Some(Compression::Snappy) => { - use snap::raw::{max_compress_len, Encoder}; - - compressed.clear(); - - let required_len = max_compress_len(block.len()); - compressed.resize(required_len, 0); - let compressed_bytes = Encoder::new() - .compress(block, compressed) - .map_err(|e| crate::error::Error::ExternalFormat(e.to_string()))?; - compressed.truncate(compressed_bytes); - - compressed.extend(CRC_TABLE.checksum(block).to_be_bytes()); - Ok(false) - } - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Deflate) => Err(crate::error::Error::InvalidArgumentError( - "Trying to compress Avro with deflate but feature 'io_avro_compression' is not active." - .to_string(), - )), - #[cfg(not(feature = "io_avro_compression"))] - Some(Compression::Snappy) => Err(crate::error::Error::InvalidArgumentError( - "Trying to compress Avro with snappy but feature 'io_avro_compression' is not active." - .to_string(), - )), - } -} diff --git a/src/io/avro/write/header.rs b/src/io/avro/write/header.rs deleted file mode 100644 index 370a0532b5a..00000000000 --- a/src/io/avro/write/header.rs +++ /dev/null @@ -1,28 +0,0 @@ -use ahash::AHashMap; -use avro_schema::Schema; -use serde_json; - -use crate::error::{Error, Result}; - -use super::Compression; - -/// Serializes an [`Schema`] and optional [`Compression`] into an avro header. -pub(crate) fn serialize_header( - schema: &Schema, - compression: Option, -) -> Result>> { - let schema = serde_json::to_string(schema).map_err(|e| Error::ExternalFormat(e.to_string()))?; - - let mut header = AHashMap::>::default(); - - header.insert("avro.schema".to_string(), schema.into_bytes()); - if let Some(compression) = compression { - let value = match compression { - Compression::Snappy => b"snappy".to_vec(), - Compression::Deflate => b"deflate".to_vec(), - }; - header.insert("avro.codec".to_string(), value); - }; - - Ok(header) -} diff --git a/src/io/avro/write/mod.rs b/src/io/avro/write/mod.rs index 158ec571a8d..f821f0dd1eb 100644 --- a/src/io/avro/write/mod.rs +++ b/src/io/avro/write/mod.rs @@ -1,46 +1,10 @@ //! APIs to write to Avro format. -use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; +use avro_schema::file::Block; -use crate::error::Result; - -pub use super::Compression; - -mod header; -pub(super) use header::serialize_header; mod schema; -pub use schema::to_avro_schema; +pub use schema::to_record; mod serialize; pub use serialize::{can_serialize, new_serializer, BoxSerializer}; -mod block; -pub use block::*; -mod compress; -pub(super) mod util; -pub use compress::compress; - -pub use super::{Block, CompressedBlock}; - -pub(super) const SYNC_NUMBER: [u8; 16] = [1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4, 1, 2, 3, 4]; -// * Four bytes, ASCII 'O', 'b', 'j', followed by 1. -pub(super) const AVRO_MAGIC: [u8; 4] = [b'O', b'b', b'j', 1u8]; - -/// Writes Avro's metadata to `writer`. -pub fn write_metadata( - writer: &mut W, - fields: Vec, - compression: Option, -) -> Result<()> { - writer.write_all(&AVRO_MAGIC)?; - - // * file metadata, including the schema. - let schema = AvroSchema::Record(Record::new("", fields)); - - write_schema(writer, &schema, compression)?; - - // The 16-byte, randomly-generated sync marker for this file. - writer.write_all(&SYNC_NUMBER)?; - - Ok(()) -} /// consumes a set of [`BoxSerializer`] into an [`Block`]. /// # Panics @@ -62,19 +26,3 @@ pub fn serialize<'a>(serializers: &mut [BoxSerializer<'a>], block: &mut Block) { } } } - -pub(super) fn write_schema( - writer: &mut W, - schema: &AvroSchema, - compression: Option, -) -> Result<()> { - let header = serialize_header(schema, compression)?; - - util::zigzag_encode(header.len() as i64, writer)?; - for (name, item) in header { - util::write_binary(name.as_bytes(), writer)?; - util::write_binary(&item, writer)?; - } - writer.write_all(&[0])?; - Ok(()) -} diff --git a/src/io/avro/write/schema.rs b/src/io/avro/write/schema.rs index 26a14db8b3e..94673f45a48 100644 --- a/src/io/avro/write/schema.rs +++ b/src/io/avro/write/schema.rs @@ -1,4 +1,4 @@ -use avro_schema::{ +use avro_schema::schema::{ BytesLogical, Field as AvroField, Fixed, FixedLogical, IntLogical, LongLogical, Record, Schema as AvroSchema, }; @@ -6,9 +6,20 @@ use avro_schema::{ use crate::datatypes::*; use crate::error::{Error, Result}; -/// Converts a [`Schema`] to a vector of [`AvroField`] with it. -pub fn to_avro_schema(schema: &Schema) -> Result> { - schema.fields.iter().map(field_to_field).collect() +/// Converts a [`Schema`] to an Avro [`Record`]. +pub fn to_record(schema: &Schema) -> Result { + let fields = schema + .fields + .iter() + .map(field_to_field) + .collect::>()?; + Ok(Record { + name: "".to_string(), + namespace: None, + doc: None, + aliases: vec![], + fields, + }) } fn field_to_field(field: &Field) -> Result { diff --git a/src/io/avro/write/serialize.rs b/src/io/avro/write/serialize.rs index 443848ab615..7fbcfdb7760 100644 --- a/src/io/avro/write/serialize.rs +++ b/src/io/avro/write/serialize.rs @@ -1,4 +1,5 @@ -use avro_schema::{Record, Schema as AvroSchema}; +use avro_schema::schema::{Record, Schema as AvroSchema}; +use avro_schema::write::encode; use crate::bitmap::utils::zip_validity; use crate::datatypes::{IntervalUnit, PhysicalType, PrimitiveType}; @@ -6,7 +7,6 @@ use crate::types::months_days_ns; use crate::{array::*, datatypes::DataType}; use super::super::super::iterator::*; -use super::util; // Zigzag representation of false and true respectively. const IS_NULL: u8 = 0; @@ -20,7 +20,7 @@ fn utf8_required(array: &Utf8Array) -> BoxSerializer { Box::new(BufStreamingIterator::new( array.values_iter(), |x, buf| { - util::zigzag_encode(x.len() as i64, buf).unwrap(); + encode::zigzag_encode(x.len() as i64, buf).unwrap(); buf.extend_from_slice(x.as_bytes()); }, vec![], @@ -33,7 +33,7 @@ fn utf8_optional(array: &Utf8Array) -> BoxSerializer { |x, buf| { if let Some(x) = x { buf.push(IS_VALID); - util::zigzag_encode(x.len() as i64, buf).unwrap(); + encode::zigzag_encode(x.len() as i64, buf).unwrap(); buf.extend_from_slice(x.as_bytes()); } else { buf.push(IS_NULL); @@ -47,7 +47,7 @@ fn binary_required(array: &BinaryArray) -> BoxSerializer { Box::new(BufStreamingIterator::new( array.values_iter(), |x, buf| { - util::zigzag_encode(x.len() as i64, buf).unwrap(); + encode::zigzag_encode(x.len() as i64, buf).unwrap(); buf.extend_from_slice(x); }, vec![], @@ -60,7 +60,7 @@ fn binary_optional(array: &BinaryArray) -> BoxSerializer { |x, buf| { if let Some(x) = x { buf.push(IS_VALID); - util::zigzag_encode(x.len() as i64, buf).unwrap(); + encode::zigzag_encode(x.len() as i64, buf).unwrap(); buf.extend_from_slice(x); } else { buf.push(IS_NULL); @@ -105,13 +105,13 @@ fn list_required<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> Box::new(BufStreamingIterator::new( lengths, move |length, buf| { - util::zigzag_encode(length, buf).unwrap(); + encode::zigzag_encode(length, buf).unwrap(); let mut rows = 0; while let Some(item) = inner.next() { buf.extend_from_slice(item); rows += 1; if rows == length { - util::zigzag_encode(0, buf).unwrap(); + encode::zigzag_encode(0, buf).unwrap(); break; } } @@ -133,13 +133,13 @@ fn list_optional<'a, O: Offset>(array: &'a ListArray, schema: &AvroSchema) -> move |length, buf| { if let Some(length) = length { buf.push(IS_VALID); - util::zigzag_encode(length, buf).unwrap(); + encode::zigzag_encode(length, buf).unwrap(); let mut rows = 0; while let Some(item) = inner.next() { buf.extend_from_slice(item); rows += 1; if rows == length { - util::zigzag_encode(0, buf).unwrap(); + encode::zigzag_encode(0, buf).unwrap(); break; } } @@ -278,7 +278,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria |x, buf| { if let Some(x) = x { buf.push(IS_VALID); - util::zigzag_encode(*x as i64, buf).unwrap(); + encode::zigzag_encode(*x as i64, buf).unwrap(); } else { buf.push(IS_NULL); } @@ -294,7 +294,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria Box::new(BufStreamingIterator::new( values.values().iter(), |x, buf| { - util::zigzag_encode(*x as i64, buf).unwrap(); + encode::zigzag_encode(*x as i64, buf).unwrap(); }, vec![], )) @@ -309,7 +309,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria |x, buf| { if let Some(x) = x { buf.push(IS_VALID); - util::zigzag_encode(*x, buf).unwrap(); + encode::zigzag_encode(*x, buf).unwrap(); } else { buf.push(IS_NULL); } @@ -325,7 +325,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria Box::new(BufStreamingIterator::new( values.values().iter(), |x, buf| { - util::zigzag_encode(*x, buf).unwrap(); + encode::zigzag_encode(*x, buf).unwrap(); }, vec![], )) @@ -401,7 +401,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria values.values().iter(), |x, buf| { let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize; - util::zigzag_encode((16 - len) as i64, buf).unwrap(); + encode::zigzag_encode((16 - len) as i64, buf).unwrap(); buf.extend_from_slice(&x.to_be_bytes()[len..]); }, vec![], @@ -419,7 +419,7 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria buf.push(IS_VALID); let len = ((x.leading_zeros() / 8) - ((x.leading_zeros() / 8) % 2)) as usize; - util::zigzag_encode((16 - len) as i64, buf).unwrap(); + encode::zigzag_encode((16 - len) as i64, buf).unwrap(); buf.extend_from_slice(&x.to_be_bytes()[len..]); } else { buf.push(IS_NULL); diff --git a/src/io/avro/write/util.rs b/src/io/avro/write/util.rs deleted file mode 100644 index 52d13c026e9..00000000000 --- a/src/io/avro/write/util.rs +++ /dev/null @@ -1,26 +0,0 @@ -use crate::error::Result; - -#[inline] -pub fn zigzag_encode(n: i64, writer: &mut W) -> Result<()> { - _zigzag_encode(((n << 1) ^ (n >> 63)) as u64, writer) -} - -#[inline] -fn _zigzag_encode(mut z: u64, writer: &mut W) -> Result<()> { - loop { - if z <= 0x7F { - writer.write_all(&[(z & 0x7F) as u8])?; - break; - } else { - writer.write_all(&[(0x80 | (z & 0x7F)) as u8])?; - z >>= 7; - } - } - Ok(()) -} - -pub(crate) fn write_binary(bytes: &[u8], writer: &mut W) -> Result<()> { - zigzag_encode(bytes.len() as i64, writer)?; - writer.write_all(bytes)?; - Ok(()) -} diff --git a/src/io/avro/write_async/block.rs b/src/io/avro/write_async/block.rs deleted file mode 100644 index 6eba0a7fda4..00000000000 --- a/src/io/avro/write_async/block.rs +++ /dev/null @@ -1,26 +0,0 @@ -use futures::{AsyncWrite, AsyncWriteExt}; - -use crate::error::Result; - -use super::super::write::{util::zigzag_encode, SYNC_NUMBER}; -use super::super::CompressedBlock; - -/// Writes a [`CompressedBlock`] to `writer` -pub async fn write_block(writer: &mut W, compressed_block: &CompressedBlock) -> Result<()> -where - W: AsyncWrite + Unpin, -{ - // write size and rows - let mut scratch = Vec::with_capacity(10); - zigzag_encode(compressed_block.number_of_rows as i64, &mut scratch)?; - writer.write_all(&scratch).await?; - scratch.clear(); - zigzag_encode(compressed_block.data.len() as i64, &mut scratch)?; - writer.write_all(&scratch).await?; - - writer.write_all(&compressed_block.data).await?; - - writer.write_all(&SYNC_NUMBER).await?; - - Ok(()) -} diff --git a/src/io/avro/write_async/mod.rs b/src/io/avro/write_async/mod.rs deleted file mode 100644 index 81340125d4f..00000000000 --- a/src/io/avro/write_async/mod.rs +++ /dev/null @@ -1,38 +0,0 @@ -//! Async write Avro -mod block; -pub use block::write_block; - -use avro_schema::{Field as AvroField, Record, Schema as AvroSchema}; -use futures::{AsyncWrite, AsyncWriteExt}; - -use crate::error::Result; - -use super::{ - write::{write_schema, AVRO_MAGIC, SYNC_NUMBER}, - Compression, -}; - -/// Writes Avro's metadata to `writer`. -pub async fn write_metadata( - writer: &mut W, - fields: Vec, - compression: Option, -) -> Result<()> -where - W: AsyncWrite + Unpin, -{ - writer.write_all(&AVRO_MAGIC).await?; - - // * file metadata, including the schema. - let schema = AvroSchema::Record(Record::new("", fields)); - - let mut scratch = vec![]; - write_schema(&mut scratch, &schema, compression)?; - - writer.write_all(&scratch).await?; - - // The 16-byte, randomly-generated sync marker for this file. - writer.write_all(&SYNC_NUMBER).await?; - - Ok(()) -} diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index deb8ffd802f..5c301f7d1ba 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -6,6 +6,7 @@ use avro_rs::{Days, Decimal, Duration, Millis, Months, Schema as AvroSchema}; use arrow2::array::*; use arrow2::datatypes::*; use arrow2::error::Result; +use arrow2::io::avro::avro_schema::read::read_metadata; use arrow2::io::avro::read; pub(super) fn schema() -> (AvroSchema, Schema) { @@ -196,14 +197,10 @@ pub(super) fn read_avro( ) -> Result<(Chunk>, Schema)> { let file = &mut avro; - let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; + let metadata = read_metadata(file)?; + let schema = read::infer_schema(&metadata.record)?; - let mut reader = read::Reader::new( - read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), - avro_schema, - schema.fields.clone(), - projection.clone(), - ); + let mut reader = read::Reader::new(file, metadata, schema.fields.clone(), projection.clone()); let schema = if let Some(projection) = projection { let fields = schema diff --git a/tests/it/io/avro/read_async.rs b/tests/it/io/avro/read_async.rs index 3dd04cdb479..6a6b09ac9f6 100644 --- a/tests/it/io/avro/read_async.rs +++ b/tests/it/io/avro/read_async.rs @@ -4,7 +4,8 @@ use futures::pin_mut; use futures::StreamExt; use arrow2::error::Result; -use arrow2::io::avro::read_async::*; +use arrow2::io::avro::avro_schema::read_async::{block_stream, read_metadata}; +use arrow2::io::avro::read; use super::read::{schema, write_avro}; @@ -14,11 +15,12 @@ async fn test(codec: Codec) -> Result<()> { let mut reader = &mut &avro_data[..]; - let (_, schema, _, marker) = read_metadata(&mut reader).await?; + let metadata = read_metadata(&mut reader).await?; + let schema = read::infer_schema(&metadata.record)?; assert_eq!(schema, expected_schema); - let blocks = block_stream(&mut reader, marker).await; + let blocks = block_stream(&mut reader, metadata.marker).await; pin_mut!(blocks); while let Some(block) = blocks.next().await.transpose()? { diff --git a/tests/it/io/avro/write.rs b/tests/it/io/avro/write.rs index d4b7e410c13..091da09c75b 100644 --- a/tests/it/io/avro/write.rs +++ b/tests/it/io/avro/write.rs @@ -2,7 +2,9 @@ use arrow2::array::*; use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; -use arrow2::io::avro::{write, CompressedBlock}; +use arrow2::io::avro::avro_schema::file::{Block, CompressedBlock, Compression}; +use arrow2::io::avro::avro_schema::write::{compress, write_block, write_metadata}; +use arrow2::io::avro::write; use arrow2::types::months_days_ns; use super::read::read_avro; @@ -107,24 +109,24 @@ pub(super) fn data() -> Chunk> { pub(super) fn serialize_to_block>( columns: &Chunk, schema: &Schema, - compression: Option, + compression: Option, ) -> Result { - let avro_fields = write::to_avro_schema(schema)?; + let record = write::to_record(schema)?; let mut serializers = columns .arrays() .iter() .map(|x| x.as_ref()) - .zip(avro_fields.iter()) + .zip(record.fields.iter()) .map(|(array, field)| write::new_serializer(array, &field.schema)) .collect::>(); - let mut block = write::Block::new(columns.len(), vec![]); + let mut block = Block::new(columns.len(), vec![]); write::serialize(&mut serializers, &mut block); - let mut compressed_block = write::CompressedBlock::default(); + let mut compressed_block = CompressedBlock::default(); - write::compress(&mut block, &mut compressed_block, compression)?; + compress(&mut block, &mut compressed_block, compression)?; Ok(compressed_block) } @@ -132,21 +134,21 @@ pub(super) fn serialize_to_block>( fn write_avro>( columns: &Chunk, schema: &Schema, - compression: Option, + compression: Option, ) -> Result> { let compressed_block = serialize_to_block(columns, schema, compression)?; - let avro_fields = write::to_avro_schema(schema)?; + let avro_fields = write::to_record(schema)?; let mut file = vec![]; - write::write_metadata(&mut file, avro_fields, compression)?; + write_metadata(&mut file, avro_fields, compression)?; - write::write_block(&mut file, &compressed_block)?; + write_block(&mut file, &compressed_block)?; Ok(file) } -fn roundtrip(compression: Option) -> Result<()> { +fn roundtrip(compression: Option) -> Result<()> { let expected = data(); let expected_schema = schema(); @@ -169,13 +171,13 @@ fn no_compression() -> Result<()> { #[cfg(feature = "io_avro_compression")] #[test] fn snappy() -> Result<()> { - roundtrip(Some(write::Compression::Snappy)) + roundtrip(Some(Compression::Snappy)) } #[cfg(feature = "io_avro_compression")] #[test] fn deflate() -> Result<()> { - roundtrip(Some(write::Compression::Deflate)) + roundtrip(Some(Compression::Deflate)) } fn large_format_schema() -> Schema { diff --git a/tests/it/io/avro/write_async.rs b/tests/it/io/avro/write_async.rs index 1c679c564c7..b98071accbc 100644 --- a/tests/it/io/avro/write_async.rs +++ b/tests/it/io/avro/write_async.rs @@ -2,8 +2,9 @@ use arrow2::array::*; use arrow2::chunk::Chunk; use arrow2::datatypes::*; use arrow2::error::Result; +use arrow2::io::avro::avro_schema::file::Compression; +use arrow2::io::avro::avro_schema::write_async::{write_block, write_metadata}; use arrow2::io::avro::write; -use arrow2::io::avro::write_async; use super::read::read_avro; use super::write::{data, schema, serialize_to_block}; @@ -11,22 +12,22 @@ use super::write::{data, schema, serialize_to_block}; async fn write_avro>( columns: &Chunk, schema: &Schema, - compression: Option, + compression: Option, ) -> Result> { // usually done on a different thread pool let compressed_block = serialize_to_block(columns, schema, compression)?; - let avro_fields = write::to_avro_schema(schema)?; + let record = write::to_record(schema)?; let mut file = vec![]; - write_async::write_metadata(&mut file, avro_fields.clone(), compression).await?; + write_metadata(&mut file, record, compression).await?; - write_async::write_block(&mut file, &compressed_block).await?; + write_block(&mut file, &compressed_block).await?; Ok(file) } -async fn roundtrip(compression: Option) -> Result<()> { +async fn roundtrip(compression: Option) -> Result<()> { let expected = data(); let expected_schema = schema();