Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for writing interval to avro (#734)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jan 6, 2022
1 parent 9b7118b commit e001ba5
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 3 deletions.
47 changes: 44 additions & 3 deletions src/io/avro/write/serialize.rs
@@ -1,6 +1,7 @@
use avro_schema::Schema as AvroSchema;

use crate::datatypes::{PhysicalType, PrimitiveType};
use crate::datatypes::{IntervalUnit, PhysicalType, PrimitiveType};
use crate::types::months_days_ns;
use crate::{array::*, datatypes::DataType};

use super::super::super::iterator::*;
Expand Down Expand Up @@ -179,12 +180,52 @@ pub fn new_serializer<'a>(array: &'a dyn Array, schema: &AvroSchema) -> BoxSeria
vec![],
))
}
_ => todo!(),
(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Fixed(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<months_days_ns>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.values().iter(),
interval_write,
vec![],
))
}
(PhysicalType::Primitive(PrimitiveType::MonthDayNano), AvroSchema::Union(_)) => {
let values = array
.as_any()
.downcast_ref::<PrimitiveArray<months_days_ns>>()
.unwrap();
Box::new(BufStreamingIterator::new(
values.iter(),
|x, buf| {
util::zigzag_encode(x.is_some() as i64, buf).unwrap();
if let Some(x) = x {
interval_write(x, buf)
}
},
vec![],
))
}
(a, b) => todo!("{:?} -> {:?} not supported", a, b),
}
}

/// Whether [`new_serializer`] supports `data_type`.
pub fn can_serialize(data_type: &DataType) -> bool {
use DataType::*;
matches!(data_type, Boolean | Int32 | Int64 | Utf8 | Binary)
matches!(
data_type,
Boolean | Int32 | Int64 | Utf8 | Binary | Interval(IntervalUnit::MonthDayNano)
)
}

#[inline]
fn interval_write(x: &months_days_ns, buf: &mut Vec<u8>) {
// https://avro.apache.org/docs/current/spec.html#Duration
// 12 bytes, months, days, millis in LE
buf.reserve(12);
buf.extend(x.months().to_le_bytes());
buf.extend(x.days().to_le_bytes());
buf.extend(((x.ns() / 1_000_000) as i32).to_le_bytes());
}
6 changes: 6 additions & 0 deletions tests/it/io/avro/write.rs
Expand Up @@ -5,6 +5,7 @@ use arrow2::chunk::Chunk;
use arrow2::datatypes::*;
use arrow2::error::Result;
use arrow2::io::avro::write;
use arrow2::types::months_days_ns;

fn schema() -> Schema {
Schema::from(vec![
Expand All @@ -16,6 +17,7 @@ fn schema() -> Schema {
Field::new("e", DataType::Float64, false),
Field::new("f", DataType::Boolean, false),
Field::new("g", DataType::Utf8, true),
Field::new("h", DataType::Interval(IntervalUnit::MonthDayNano), true),
])
}

Expand All @@ -29,6 +31,10 @@ fn data() -> Chunk<Arc<dyn Array>> {
Arc::new(PrimitiveArray::<f64>::from_slice([1.0, 2.0])) as Arc<dyn Array>,
Arc::new(BooleanArray::from_slice([true, false])) as Arc<dyn Array>,
Arc::new(Utf8Array::<i32>::from([Some("foo"), None])) as Arc<dyn Array>,
Arc::new(PrimitiveArray::<months_days_ns>::from([
Some(months_days_ns::new(1, 1, 10 * 1_000_000)), // 10 millis
None,
])) as Arc<dyn Array>,
];

Chunk::try_new(columns).unwrap()
Expand Down

0 comments on commit e001ba5

Please sign in to comment.