Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read avro structs (#826)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Feb 9, 2022
1 parent d8717fc commit a026b66
Show file tree
Hide file tree
Showing 4 changed files with 178 additions and 46 deletions.
44 changes: 40 additions & 4 deletions src/io/avro/read/deserialize.rs
@@ -1,6 +1,7 @@
use std::convert::TryInto;
use std::sync::Arc;

use avro_schema::Record;
use avro_schema::{Enum, Schema as AvroSchema};

use crate::array::*;
Expand Down Expand Up @@ -45,17 +46,27 @@ fn make_mutable(
_ => match data_type {
DataType::List(inner) => {
let values = make_mutable(inner.data_type(), None, 0)?;
Box::new(DynMutableListArray::<i32>::new_with_capacity(
values, capacity,
Box::new(DynMutableListArray::<i32>::new_from(
values,
data_type.clone(),
capacity,
)) as Box<dyn MutableArray>
}
DataType::FixedSizeBinary(size) => Box::new(MutableFixedSizeBinaryArray::with_capacity(
*size as usize,
capacity,
)) as Box<dyn MutableArray>,
DataType::Struct(fields) => {
let values = fields
.iter()
.map(|field| make_mutable(field.data_type(), None, capacity))
.collect::<Result<Vec<_>>>()?;
Box::new(DynMutableStructArray::new(values, data_type.clone()))
as Box<dyn MutableArray>
}
other => {
return Err(ArrowError::NotYetImplemented(format!(
"Deserializing type {:?} is still not implemented",
"Deserializing type {:#?} is still not implemented",
other
)))
}
Expand Down Expand Up @@ -96,6 +107,7 @@ fn deserialize_value<'a>(
let data_type = array.data_type();
match data_type {
DataType::List(inner) => {
let is_nullable = inner.is_nullable;
let avro_inner = match avro_field {
AvroSchema::Array(inner) => inner.as_ref(),
AvroSchema::Union(u) => match &u.as_slice() {
Expand All @@ -107,7 +119,6 @@ fn deserialize_value<'a>(
_ => unreachable!(),
};

let is_nullable = inner.is_nullable;
let array = array
.as_mut_any()
.downcast_mut::<DynMutableListArray<i32>>()
Expand Down Expand Up @@ -144,6 +155,31 @@ fn deserialize_value<'a>(
.unwrap();
array.push(Some(value))
}
DataType::Struct(inner_fields) => {
let fields = match avro_field {
AvroSchema::Record(Record { fields, .. }) => fields,
AvroSchema::Union(u) => match &u.as_slice() {
&[AvroSchema::Record(Record { fields, .. }), _]
| &[_, AvroSchema::Record(Record { fields, .. })] => fields,
_ => unreachable!(),
},
_ => unreachable!(),
};

let is_nullable = inner_fields
.iter()
.map(|x| x.is_nullable)
.collect::<Vec<_>>();
let array = array
.as_mut_any()
.downcast_mut::<DynMutableStructArray>()
.unwrap();

for (index, (field, is_nullable)) in fields.iter().zip(is_nullable.iter()).enumerate() {
let values = array.mut_values(index);
block = deserialize_item(values, *is_nullable, &field.schema, block)?;
}
}
_ => match data_type.to_physical_type() {
PhysicalType::Boolean => {
let is_valid = block[0] == 1;
Expand Down
97 changes: 91 additions & 6 deletions src/io/avro/read/nested.rs
Expand Up @@ -28,12 +28,6 @@ impl<O: Offset> DynMutableListArray<O> {
}
}

/// Creates a new [`MutableListArray`] from a [`MutableArray`] and capacity.
pub fn new_with_capacity(values: Box<dyn MutableArray>, capacity: usize) -> Self {
let data_type = ListArray::<O>::default_datatype(values.data_type().clone());
Self::new_from(values, data_type, capacity)
}

/// The values
pub fn mut_values(&mut self) -> &mut dyn MutableArray {
self.values.as_mut()
Expand Down Expand Up @@ -199,3 +193,94 @@ impl MutableArray for FixedItemsUtf8Dictionary {
todo!();
}
}

/// Auxiliary struct
#[derive(Debug)]
pub struct DynMutableStructArray {
data_type: DataType,
values: Vec<Box<dyn MutableArray>>,
validity: Option<MutableBitmap>,
}

impl DynMutableStructArray {
pub fn new(values: Vec<Box<dyn MutableArray>>, data_type: DataType) -> Self {
Self {
data_type,
values,
validity: None,
}
}

/// The values
pub fn mut_values(&mut self, field: usize) -> &mut dyn MutableArray {
self.values[field].as_mut()
}

#[inline]
fn push_null(&mut self) {
match &mut self.validity {
Some(validity) => validity.push(false),
None => self.init_validity(),
}
}

fn init_validity(&mut self) {
let len = self.len();

let mut validity = MutableBitmap::new();
validity.extend_constant(len, true);
validity.set(len - 1, false);
self.validity = Some(validity)
}
}

impl MutableArray for DynMutableStructArray {
fn len(&self) -> usize {
self.values[0].len()
}

fn validity(&self) -> Option<&MutableBitmap> {
self.validity.as_ref()
}

fn as_box(&mut self) -> Box<dyn Array> {
let values = self.values.iter_mut().map(|x| x.as_arc()).collect();

Box::new(StructArray::from_data(
self.data_type.clone(),
values,
std::mem::take(&mut self.validity).map(|x| x.into()),
))
}

fn as_arc(&mut self) -> Arc<dyn Array> {
let values = self.values.iter_mut().map(|x| x.as_arc()).collect();

Arc::new(StructArray::from_data(
self.data_type.clone(),
values,
std::mem::take(&mut self.validity).map(|x| x.into()),
))
}

fn data_type(&self) -> &DataType {
&self.data_type
}

fn as_any(&self) -> &dyn std::any::Any {
self
}

fn as_mut_any(&mut self) -> &mut dyn std::any::Any {
self
}

#[inline]
fn push_null(&mut self) {
self.push_null()
}

fn shrink_to_fit(&mut self) {
todo!();
}
}
58 changes: 22 additions & 36 deletions src/io/avro/read/schema.rs
Expand Up @@ -19,36 +19,29 @@ fn external_props(schema: &AvroSchema) -> Metadata {
props
}

/// Maps an Avro Schema into a [`Schema`].
/// Maps an [`AvroSchema`] into a [`Schema`].
pub fn convert_schema(schema: &AvroSchema) -> Result<Schema> {
let mut schema_fields = vec![];
match schema {
AvroSchema::Record(Record { fields, .. }) => {
for field in fields {
schema_fields.push(schema_to_field(
if let AvroSchema::Record(Record { fields, .. }) = schema {
Ok(fields
.iter()
.map(|field| {
schema_to_field(
&field.schema,
Some(&field.name),
false,
external_props(&field.schema),
)?)
}
}
other => {
return Err(ArrowError::OutOfSpec(format!(
"An avro Schema must be of type Record - it is of type {:?}",
other
)))
}
};
Ok(schema_fields.into())
)
})
.collect::<Result<Vec<_>>>()?
.into())
} else {
Err(ArrowError::OutOfSpec(
"An avro Schema must be of type Record".to_string(),
))
}
}

fn schema_to_field(
schema: &AvroSchema,
name: Option<&str>,
mut nullable: bool,
props: Metadata,
) -> Result<Field> {
fn schema_to_field(schema: &AvroSchema, name: Option<&str>, props: Metadata) -> Result<Field> {
let mut nullable = false;
let data_type = match schema {
AvroSchema::Null => DataType::Null,
AvroSchema::Boolean => DataType::Boolean,
Expand Down Expand Up @@ -91,7 +84,6 @@ fn schema_to_field(
AvroSchema::Array(item_schema) => DataType::List(Box::new(schema_to_field(
item_schema,
Some("item"), // default name for list items
false,
Metadata::default(),
)?)),
AvroSchema::Map(_) => todo!("Avro maps are mapped to MapArrays"),
Expand All @@ -104,9 +96,7 @@ fn schema_to_field(
.iter()
.find(|&schema| !matches!(schema, AvroSchema::Null))
{
schema_to_field(schema, None, has_nullable, Metadata::default())?
.data_type()
.clone()
schema_to_field(schema, None, Metadata::default())?.data_type
} else {
return Err(ArrowError::NotYetImplemented(format!(
"Can't read avro union {:?}",
Expand All @@ -116,31 +106,27 @@ fn schema_to_field(
} else {
let fields = schemas
.iter()
.map(|s| schema_to_field(s, None, has_nullable, Metadata::default()))
.map(|s| schema_to_field(s, None, Metadata::default()))
.collect::<Result<Vec<Field>>>()?;
DataType::Union(fields, None, UnionMode::Dense)
}
}
AvroSchema::Record(Record { name, fields, .. }) => {
let fields: Result<Vec<Field>> = fields
let fields = fields
.iter()
.map(|field| {
let mut props = Metadata::new();
if let Some(doc) = &field.doc {
props.insert("avro::doc".to_string(), doc.clone());
}
/*if let Some(aliases) = fields.aliases {
props.insert("aliases", aliases);
}*/
schema_to_field(
&field.schema,
Some(&format!("{}.{}", name, field.name)),
false,
props,
)
})
.collect();
DataType::Struct(fields?)
.collect::<Result<_>>()?;
DataType::Struct(fields)
}
AvroSchema::Enum { .. } => {
return Ok(Field::new(
Expand Down
25 changes: 25 additions & 0 deletions tests/it/io/avro/read.rs
Expand Up @@ -36,6 +36,13 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
"default": null
}
}},
{"name": "i", "type": {
"type": "record",
"name": "bla",
"fields": [
{"name": "e", "type": "double"}
]
}},
{"name": "enum", "type": {
"type": "enum",
"name": "",
Expand All @@ -59,6 +66,11 @@ pub(super) fn schema() -> (AvroSchema, Schema) {
DataType::List(Box::new(Field::new("item", DataType::Int32, true))),
false,
),
Field::new(
"i",
DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]),
false,
),
Field::new(
"enum",
DataType::Dictionary(i32::KEY_TYPE, Box::new(DataType::Utf8), false),
Expand Down Expand Up @@ -88,6 +100,11 @@ pub(super) fn data() -> Chunk<Arc<dyn Array>> {
Arc::new(BooleanArray::from_slice([true, false])),
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])),
array.into_arc(),
Arc::new(StructArray::from_data(
DataType::Struct(vec![Field::new("bla.e", DataType::Float64, false)]),
vec![Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0]))],
None,
)),
Arc::new(DictionaryArray::<i32>::from_data(
Int32Array::from_slice([1, 0]),
Arc::new(Utf8Array::<i32>::from_slice(["SPADES", "HEARTS"])),
Expand Down Expand Up @@ -120,6 +137,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Union(Box::new(Value::Int(3))),
]),
);
record.put(
"i",
Value::Record(vec![("e".to_string(), Value::Double(1.0f64))]),
);
record.put("enum", Value::Enum(1, "HEARTS".to_string()));
record.put(
"duration",
Expand All @@ -144,6 +165,10 @@ pub(super) fn write_avro(codec: Codec) -> std::result::Result<Vec<u8>, avro_rs::
Value::Union(Box::new(Value::Int(3))),
]),
);
record.put(
"i",
Value::Record(vec![("e".to_string(), Value::Double(2.0f64))]),
);
record.put("enum", Value::Enum(0, "SPADES".to_string()));
writer.append(record)?;
Ok(writer.into_inner().unwrap())
Expand Down

0 comments on commit a026b66

Please sign in to comment.