From a026b66a6500bf55a7bb59b5375717861ecf5e6c Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Wed, 9 Feb 2022 06:30:24 +0100 Subject: [PATCH] Added support to read avro structs (#826) --- src/io/avro/read/deserialize.rs | 44 +++++++++++++-- src/io/avro/read/nested.rs | 97 +++++++++++++++++++++++++++++++-- src/io/avro/read/schema.rs | 58 ++++++++------------ tests/it/io/avro/read.rs | 25 +++++++++ 4 files changed, 178 insertions(+), 46 deletions(-) diff --git a/src/io/avro/read/deserialize.rs b/src/io/avro/read/deserialize.rs index 28f2d4e6d6f..cc636cc5b77 100644 --- a/src/io/avro/read/deserialize.rs +++ b/src/io/avro/read/deserialize.rs @@ -1,6 +1,7 @@ use std::convert::TryInto; use std::sync::Arc; +use avro_schema::Record; use avro_schema::{Enum, Schema as AvroSchema}; use crate::array::*; @@ -45,17 +46,27 @@ fn make_mutable( _ => match data_type { DataType::List(inner) => { let values = make_mutable(inner.data_type(), None, 0)?; - Box::new(DynMutableListArray::::new_with_capacity( - values, capacity, + Box::new(DynMutableListArray::::new_from( + values, + data_type.clone(), + capacity, )) as Box } DataType::FixedSizeBinary(size) => Box::new(MutableFixedSizeBinaryArray::with_capacity( *size as usize, capacity, )) as Box, + DataType::Struct(fields) => { + let values = fields + .iter() + .map(|field| make_mutable(field.data_type(), None, capacity)) + .collect::>>()?; + Box::new(DynMutableStructArray::new(values, data_type.clone())) + as Box + } other => { return Err(ArrowError::NotYetImplemented(format!( - "Deserializing type {:?} is still not implemented", + "Deserializing type {:#?} is still not implemented", other ))) } @@ -96,6 +107,7 @@ fn deserialize_value<'a>( let data_type = array.data_type(); match data_type { DataType::List(inner) => { + let is_nullable = inner.is_nullable; let avro_inner = match avro_field { AvroSchema::Array(inner) => inner.as_ref(), AvroSchema::Union(u) => match &u.as_slice() { @@ -107,7 +119,6 @@ fn deserialize_value<'a>( _ => unreachable!(), }; - let is_nullable = inner.is_nullable; let array = array .as_mut_any() .downcast_mut::>() @@ -144,6 +155,31 @@ fn deserialize_value<'a>( .unwrap(); array.push(Some(value)) } + DataType::Struct(inner_fields) => { + let fields = match avro_field { + AvroSchema::Record(Record { fields, .. }) => fields, + AvroSchema::Union(u) => match &u.as_slice() { + &[AvroSchema::Record(Record { fields, .. }), _] + | &[_, AvroSchema::Record(Record { fields, .. })] => fields, + _ => unreachable!(), + }, + _ => unreachable!(), + }; + + let is_nullable = inner_fields + .iter() + .map(|x| x.is_nullable) + .collect::>(); + let array = array + .as_mut_any() + .downcast_mut::() + .unwrap(); + + for (index, (field, is_nullable)) in fields.iter().zip(is_nullable.iter()).enumerate() { + let values = array.mut_values(index); + block = deserialize_item(values, *is_nullable, &field.schema, block)?; + } + } _ => match data_type.to_physical_type() { PhysicalType::Boolean => { let is_valid = block[0] == 1; diff --git a/src/io/avro/read/nested.rs b/src/io/avro/read/nested.rs index d0e2ebf81bf..cf588899921 100644 --- a/src/io/avro/read/nested.rs +++ b/src/io/avro/read/nested.rs @@ -28,12 +28,6 @@ impl DynMutableListArray { } } - /// Creates a new [`MutableListArray`] from a [`MutableArray`] and capacity. - pub fn new_with_capacity(values: Box, capacity: usize) -> Self { - let data_type = ListArray::::default_datatype(values.data_type().clone()); - Self::new_from(values, data_type, capacity) - } - /// The values pub fn mut_values(&mut self) -> &mut dyn MutableArray { self.values.as_mut() @@ -199,3 +193,94 @@ impl MutableArray for FixedItemsUtf8Dictionary { todo!(); } } + +/// Auxiliary struct +#[derive(Debug)] +pub struct DynMutableStructArray { + data_type: DataType, + values: Vec>, + validity: Option, +} + +impl DynMutableStructArray { + pub fn new(values: Vec>, data_type: DataType) -> Self { + Self { + data_type, + values, + validity: None, + } + } + + /// The values + pub fn mut_values(&mut self, field: usize) -> &mut dyn MutableArray { + self.values[field].as_mut() + } + + #[inline] + fn push_null(&mut self) { + match &mut self.validity { + Some(validity) => validity.push(false), + None => self.init_validity(), + } + } + + fn init_validity(&mut self) { + let len = self.len(); + + let mut validity = MutableBitmap::new(); + validity.extend_constant(len, true); + validity.set(len - 1, false); + self.validity = Some(validity) + } +} + +impl MutableArray for DynMutableStructArray { + fn len(&self) -> usize { + self.values[0].len() + } + + fn validity(&self) -> Option<&MutableBitmap> { + self.validity.as_ref() + } + + fn as_box(&mut self) -> Box { + let values = self.values.iter_mut().map(|x| x.as_arc()).collect(); + + Box::new(StructArray::from_data( + self.data_type.clone(), + values, + std::mem::take(&mut self.validity).map(|x| x.into()), + )) + } + + fn as_arc(&mut self) -> Arc { + let values = self.values.iter_mut().map(|x| x.as_arc()).collect(); + + Arc::new(StructArray::from_data( + self.data_type.clone(), + values, + std::mem::take(&mut self.validity).map(|x| x.into()), + )) + } + + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + #[inline] + fn push_null(&mut self) { + self.push_null() + } + + fn shrink_to_fit(&mut self) { + todo!(); + } +} diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index c9a19823841..75e65afa3d8 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -19,36 +19,29 @@ fn external_props(schema: &AvroSchema) -> Metadata { props } -/// Maps an Avro Schema into a [`Schema`]. +/// Maps an [`AvroSchema`] into a [`Schema`]. pub fn convert_schema(schema: &AvroSchema) -> Result { - let mut schema_fields = vec![]; - match schema { - AvroSchema::Record(Record { fields, .. }) => { - for field in fields { - schema_fields.push(schema_to_field( + if let AvroSchema::Record(Record { fields, .. }) = schema { + Ok(fields + .iter() + .map(|field| { + schema_to_field( &field.schema, Some(&field.name), - false, external_props(&field.schema), - )?) - } - } - other => { - return Err(ArrowError::OutOfSpec(format!( - "An avro Schema must be of type Record - it is of type {:?}", - other - ))) - } - }; - Ok(schema_fields.into()) + ) + }) + .collect::>>()? + .into()) + } else { + Err(ArrowError::OutOfSpec( + "An avro Schema must be of type Record".to_string(), + )) + } } -fn schema_to_field( - schema: &AvroSchema, - name: Option<&str>, - mut nullable: bool, - props: Metadata, -) -> Result { +fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> Result { + let mut nullable = false; let data_type = match schema { AvroSchema::Null => DataType::Null, AvroSchema::Boolean => DataType::Boolean, @@ -91,7 +84,6 @@ fn schema_to_field( AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field( item_schema, Some("item"), // default name for list items - false, Metadata::default(), )?)), AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"), @@ -104,9 +96,7 @@ fn schema_to_field( .iter() .find(|&schema| !matches!(schema, AvroSchema::Null)) { - schema_to_field(schema, None, has_nullable, Metadata::default())? - .data_type() - .clone() + schema_to_field(schema, None, Metadata::default())?.data_type } else { return Err(ArrowError::NotYetImplemented(format!( "Can't read avro union {:?}", @@ -116,31 +106,27 @@ fn schema_to_field( } else { let fields = schemas .iter() - .map(|s| schema_to_field(s, None, has_nullable, Metadata::default())) + .map(|s| schema_to_field(s, None, Metadata::default())) .collect::>>()?; DataType::Union(fields, None, UnionMode::Dense) } } AvroSchema::Record(Record { name, fields, .. }) => { - let fields: Result> = fields + let fields = fields .iter() .map(|field| { let mut props = Metadata::new(); if let Some(doc) = &field.doc { props.insert("avro::doc".to_string(), doc.clone()); } - /*if let Some(aliases) = fields.aliases { - props.insert("aliases", aliases); - }*/ schema_to_field( &field.schema, Some(&format!("{}.{}", name, field.name)), - false, props, ) }) - .collect(); - DataType::Struct(fields?) + .collect::>()?; + DataType::Struct(fields) } AvroSchema::Enum { .. } => { return Ok(Field::new( diff --git a/tests/it/io/avro/read.rs b/tests/it/io/avro/read.rs index 769b783d136..8ed77f03d38 100644 --- a/tests/it/io/avro/read.rs +++ b/tests/it/io/avro/read.rs @@ -36,6 +36,13 @@ pub(super) fn schema() -> (AvroSchema, Schema) { "default": null } }}, + {"name": "i", "type": { + "type": "record", + "name": "bla", + "fields": [ + {"name": "e", "type": "double"} + ] + }}, {"name": "enum", "type": { "type": "enum", "name": "", @@ -59,6 +66,11 @@ pub(super) fn schema() -> (AvroSchema, Schema) { DataType::List(Box::new(Field::new("item", DataType::Int32, true))), false, ), + Field::new( + "i", + DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]), + false, + ), Field::new( "enum", DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false), @@ -88,6 +100,11 @@ pub(super) fn data() -> Chunk> { Arc::new(BooleanArray::from_slice([true, false])), Arc::new(Utf8Array::::from([Some("foo"), None])), array.into_arc(), + Arc::new(StructArray::from_data( + DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]), + vec![Arc::new(PrimitiveArray::::from_slice([1.0, 2.0]))], + None, + )), Arc::new(DictionaryArray::::from_data( Int32Array::from_slice([1, 0]), Arc::new(Utf8Array::::from_slice(["SPADES", "HEARTS"])), @@ -120,6 +137,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Value::Union(Box::new(Value::Int(3))), ]), ); + record.put( + "i", + Value::Record(vec![("e".to_string(), Value::Double(1.0f64))]), + ); record.put("enum", Value::Enum(1, "HEARTS".to_string())); record.put( "duration", @@ -144,6 +165,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result, avro_rs:: Value::Union(Box::new(Value::Int(3))), ]), ); + record.put( + "i", + Value::Record(vec![("e".to_string(), Value::Double(2.0f64))]), + ); record.put("enum", Value::Enum(0, "SPADES".to_string())); writer.append(record)?; Ok(writer.into_inner().unwrap())