diff --git a/src/array/fixed_size_list/mutable.rs b/src/array/fixed_size_list/mutable.rs index c638e84033b..0cba41a1e90 100644 --- a/src/array/fixed_size_list/mutable.rs +++ b/src/array/fixed_size_list/mutable.rs @@ -59,6 +59,11 @@ impl MutableFixedSizeListArray { } } + /// Returns the size (number of elements per slot) of this [`FixedSizeListArray`]. + pub const fn size(&self) -> usize { + self.size + } + /// The inner values pub fn values(&self) -> &M { &self.values diff --git a/src/array/list/mutable.rs b/src/array/list/mutable.rs index 71283104fbc..b4f3c9d1d3b 100644 --- a/src/array/list/mutable.rs +++ b/src/array/list/mutable.rs @@ -1,10 +1,11 @@ use std::sync::Arc; use crate::{ - array::{Array, MutableArray, Offset, TryExtend, TryPush}, + array::{specification::try_check_offsets, Array, MutableArray, Offset, TryExtend, TryPush}, bitmap::MutableBitmap, datatypes::{DataType, Field}, error::{Error, Result}, + trusted_len::TrustedLen, }; use super::ListArray; @@ -152,6 +153,75 @@ impl MutableListArray { } } + /// Expand this array, using elements from the underlying backing array. + /// Assumes the expansion begins at the highest previous offset, or zero if + /// this [MutableListArray] is currently empty. + /// + /// Panics if: + /// - the new offsets are not in monotonic increasing order. + /// - any new offset is not in bounds of the backing array. + /// - the passed iterator has no upper bound. + pub(crate) fn extend_offsets(&mut self, expansion: II) + where + II: TrustedLen>, + { + let current_len = self.offsets.len(); + let (_, upper) = expansion.size_hint(); + let upper = upper.expect("iterator must have upper bound"); + if current_len == 0 && upper > 0 { + self.offsets.push(O::zero()); + } + // safety: checked below + unsafe { self.unsafe_extend_offsets(expansion) }; + if self.offsets.len() > current_len { + // check all inserted offsets + try_check_offsets(&self.offsets[current_len..], self.values.len()) + .expect("invalid offsets"); + } + // else expansion is empty, and this is trivially safe. + } + + /// Expand this array, using elements from the underlying backing array. + /// Assumes the expansion begins at the highest previous offset, or zero if + /// this [MutableListArray] is currently empty. + /// + /// # Safety + /// + /// Assumes that `offsets` are in order, and do not overrun the underlying + /// `values` backing array. + /// + /// Also assumes the expansion begins at the highest previous offset, or + /// zero if the array is currently empty. + /// + /// Panics if the passed iterator has no upper bound. + pub(crate) unsafe fn unsafe_extend_offsets(&mut self, expansion: II) + where + II: TrustedLen>, + { + let (_, upper) = expansion.size_hint(); + let upper = upper.expect("iterator must have upper bound"); + let final_size = self.len() + upper; + self.offsets.reserve(upper); + + for item in expansion { + match item { + Some(offset) => { + self.offsets.push(offset); + if let Some(validity) = &mut self.validity { + validity.push(true); + } + } + None => self.push_null(), + } + + if let Some(validity) = &mut self.validity { + if validity.capacity() < final_size { + validity.reserve(final_size - validity.capacity()); + } + } + } + } + /// The values pub fn mut_values(&mut self) -> &mut M { &mut self.values @@ -209,11 +279,15 @@ impl MutableListArray { validity.shrink_to_fit() } } + + fn len(&self) -> usize { + self.offsets.len() - 1 + } } impl MutableArray for MutableListArray { fn len(&self) -> usize { - self.offsets.len() - 1 + MutableListArray::len(self) } fn validity(&self) -> Option<&MutableBitmap> { diff --git a/src/array/mod.rs b/src/array/mod.rs index 6a053932589..a9c58ab0a87 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -17,6 +17,7 @@ //! Most arrays contain a [`MutableArray`] counterpart that is neither clonable nor slicable, but //! can be operated in-place. use std::any::Any; +use std::sync::Arc; use crate::error::Result; use crate::{ @@ -113,6 +114,15 @@ pub trait Array: Send + Sync + dyn_clone::DynClone + 'static { dyn_clone::clone_trait_object!(Array); +/// A trait describing an array with a backing store that can be preallocated to +/// a given size. +pub(crate) trait Container { + /// Create this array with a given capacity. + fn with_capacity(capacity: usize) -> Self + where + Self: Sized; +} + /// A trait describing a mutable array; i.e. an array whose values can be changed. /// Mutable arrays cannot be cloned but can be mutated in place, /// thereby making them useful to perform numeric operations without allocations. @@ -170,6 +180,49 @@ pub trait MutableArray: std::fmt::Debug + Send + Sync { fn shrink_to_fit(&mut self); } +impl MutableArray for Box { + fn len(&self) -> usize { + self.as_ref().len() + } + + fn validity(&self) -> Option<&MutableBitmap> { + self.as_ref().validity() + } + + fn as_box(&mut self) -> Box { + self.as_mut().as_box() + } + + fn as_arc(&mut self) -> Arc { + self.as_mut().as_arc() + } + + fn data_type(&self) -> &DataType { + self.as_ref().data_type() + } + + fn as_any(&self) -> &dyn std::any::Any { + self.as_ref().as_any() + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self.as_mut().as_mut_any() + } + + #[inline] + fn push_null(&mut self) { + self.as_mut().push_null() + } + + fn shrink_to_fit(&mut self) { + self.as_mut().shrink_to_fit(); + } + + fn reserve(&mut self, additional: usize) { + self.as_mut().reserve(additional); + } +} + macro_rules! general_dyn { ($array:expr, $ty:ty, $f:expr) => {{ let array = $array.as_any().downcast_ref::<$ty>().unwrap(); diff --git a/src/io/json/read/deserialize.rs b/src/io/json/read/deserialize.rs index b7add12bc97..84ad5bea984 100644 --- a/src/io/json/read/deserialize.rs +++ b/src/io/json/read/deserialize.rs @@ -9,9 +9,10 @@ use json_deserializer::{Number, Value}; use crate::{ array::*, bitmap::MutableBitmap, - datatypes::{DataType, IntervalUnit}, + chunk::Chunk, + datatypes::{DataType, Field, IntervalUnit, Schema}, error::Error, - types::NativeType, + types::{f16, NativeType}, }; /// A function that converts a &Value into an optional tuple of a byte slice and a Value. @@ -55,12 +56,15 @@ fn build_extract(data_type: &DataType) -> Extract { } } -fn deserialize_boolean<'a, A: Borrow>>(rows: &[A]) -> BooleanArray { +fn deserialize_boolean_into<'a, A: Borrow>>( + target: &mut MutableBooleanArray, + rows: &[A], +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Bool(v) => Some(v), _ => None, }); - BooleanArray::from_trusted_len_iter(iter) + target.extend_trusted_len(iter); } fn deserialize_int_single(number: Number) -> T @@ -153,32 +157,36 @@ where } } -fn deserialize_int<'a, T: NativeType + lexical_core::FromLexical + Pow10, A: Borrow>>( +fn deserialize_int_into< + 'a, + T: NativeType + lexical_core::FromLexical + Pow10, + A: Borrow>, +>( + target: &mut MutablePrimitiveArray, rows: &[A], - data_type: DataType, -) -> PrimitiveArray { +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => Some(deserialize_int_single(*number)), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); - PrimitiveArray::from_trusted_len_iter(iter).to(data_type) + target.extend_trusted_len(iter); } -fn deserialize_float< +fn deserialize_float_into< 'a, T: NativeType + lexical_core::FromLexical + Powi10, A: Borrow>, >( + target: &mut MutablePrimitiveArray, rows: &[A], - data_type: DataType, -) -> PrimitiveArray { +) { let iter = rows.iter().map(|row| match row.borrow() { Value::Number(number) => Some(deserialize_float_single(number)), Value::Bool(number) => Some(if *number { T::one() } else { T::default() }), _ => None, }); - PrimitiveArray::from_trusted_len_iter(iter).to(data_type) + target.extend_trusted_len(iter); } fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> BinaryArray { @@ -189,12 +197,14 @@ fn deserialize_binary<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Binary BinaryArray::from_trusted_len_iter(iter) } -fn deserialize_utf8<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Utf8Array { - let mut array = MutableUtf8Array::::with_capacity(rows.len()); +fn deserialize_utf8_into<'a, O: Offset, A: Borrow>>( + target: &mut MutableUtf8Array, + rows: &[A], +) { let mut scratch = vec![]; for row in rows { match row.borrow() { - Value::String(v) => array.push(Some(v.as_ref())), + Value::String(v) => target.push(Some(v.as_ref())), Value::Number(number) => match number { Number::Integer(number, exponent) | Number::Float(number, exponent) => { scratch.clear(); @@ -203,11 +213,10 @@ fn deserialize_utf8<'a, O: Offset, A: Borrow>>(rows: &[A]) -> Utf8Arra scratch.extend_from_slice(*exponent); } }, - Value::Bool(v) => array.push(Some(if *v { "true" } else { "false" })), - _ => array.push_null(), + Value::Bool(v) => target.push(Some(if *v { "true" } else { "false" })), + _ => target.push_null(), } } - array.into() } fn deserialize_list<'a, O: Offset, A: Borrow>>( @@ -243,6 +252,137 @@ fn deserialize_list<'a, O: Offset, A: Borrow>>( ListArray::::new(data_type, offsets.into(), values, validity.into()) } +// TODO: due to nesting, deduplicating this from the above is trickier than +// other `deserialize_xxx_into` functions. Punting on that for now. +fn deserialize_list_into<'a, O: Offset, A: Borrow>>( + target: &mut MutableListArray>, + rows: &[A], +) { + let start = { + let empty = vec![]; + let inner: Vec<_> = rows + .iter() + .flat_map(|row| match row.borrow() { + Value::Array(value) => value.iter(), + _ => empty.iter(), + }) + .collect(); + + let child = target.mut_values(); + let start_len = child.len(); + deserialize_into(child, &inner); + + // todo make this an Err + O::from_usize(start_len).expect("Child list size too large") + }; + + let mut position = start; + let arrays = rows.iter().map(|row| { + match row.borrow() { + Value::Array(value) => { + // todo make this an Err + position += O::from_usize(value.len()).expect("List offset is too large :/"); + Some(position) + } + _ => None, + } + }); + + // though this will always be safe, we cannot use unsafe_extend_offsets here + // due to `#![forbid(unsafe_code)]` on the io module + target.extend_offsets(arrays); +} + +fn deserialize_fixed_size_list_into<'a, A: Borrow>>( + target: &mut MutableFixedSizeListArray>, + rows: &[A], +) { + for row in rows { + match row.borrow() { + Value::Array(value) => { + if value.len() == target.size() { + { + let child = target.mut_values(); + deserialize_into(child, value); + } + // unless alignment is already off, the if above should + // prevent this from ever happening. + target.try_push_valid().expect("unaligned backing array"); + } else { + target.push_null(); + } + } + _ => target.push_null(), + } + } +} + +fn deserialize_primitive_into<'a, A: Borrow>, T: NativeType>( + target: &mut Box, + rows: &[A], + deserialize_into: fn(&mut MutablePrimitiveArray, &[A]) -> (), +) { + generic_deserialize_into(target, rows, deserialize_into) +} + +fn generic_deserialize_into<'a, A: Borrow>, M: 'static>( + target: &mut Box, + rows: &[A], + deserialize_into: fn(&mut M, &[A]) -> (), +) { + deserialize_into(target.as_mut_any().downcast_mut::().unwrap(), rows); +} + +/// Deserialize `rows` by extending them into the given `target` +fn deserialize_into<'a, A: Borrow>>(target: &mut Box, rows: &[A]) { + match target.data_type() { + DataType::Boolean => generic_deserialize_into(target, rows, deserialize_boolean_into), + DataType::Float32 => { + deserialize_primitive_into::<_, f32>(target, rows, deserialize_float_into) + } + DataType::Float64 => { + deserialize_primitive_into::<_, f64>(target, rows, deserialize_float_into) + } + DataType::Int8 => deserialize_primitive_into::<_, i8>(target, rows, deserialize_int_into), + DataType::Int16 => deserialize_primitive_into::<_, i16>(target, rows, deserialize_int_into), + DataType::Int32 => deserialize_primitive_into::<_, i32>(target, rows, deserialize_int_into), + DataType::Int64 => deserialize_primitive_into::<_, i64>(target, rows, deserialize_int_into), + DataType::UInt8 => deserialize_primitive_into::<_, u8>(target, rows, deserialize_int_into), + DataType::UInt16 => { + deserialize_primitive_into::<_, u16>(target, rows, deserialize_int_into) + } + DataType::UInt32 => { + deserialize_primitive_into::<_, u32>(target, rows, deserialize_int_into) + } + DataType::UInt64 => { + deserialize_primitive_into::<_, u64>(target, rows, deserialize_int_into) + } + DataType::Utf8 => generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ), + DataType::LargeUtf8 => generic_deserialize_into::<_, MutableUtf8Array>( + target, + rows, + deserialize_utf8_into, + ), + DataType::FixedSizeList(_, _) => { + generic_deserialize_into(target, rows, deserialize_fixed_size_list_into) + } + DataType::List(_) => deserialize_list_into( + target + .as_mut_any() + .downcast_mut::>>() + .unwrap(), + rows, + ), + _ => { + todo!() + } + } +} + fn deserialize_struct<'a, A: Borrow>>(rows: &[A], data_type: DataType) -> StructArray { let fields = StructArray::get_fields(&data_type); @@ -315,20 +455,95 @@ fn deserialize_dictionary<'a, K: DictionaryKey, A: Borrow>>( DictionaryArray::::try_new(data_type, keys, values).unwrap() } +fn fill_array_from( + f: fn(&mut MutablePrimitiveArray, &[B]), + data_type: DataType, + rows: &[B], +) -> Box +where + T: NativeType, + A: From> + Array, +{ + let mut array = MutablePrimitiveArray::::with_capacity(rows.len()).to(data_type); + f(&mut array, rows); + Box::new(A::from(array)) +} + +/// A trait describing an array with a backing store that can be preallocated to +/// a given size. +pub(crate) trait Container { + /// Create this array with a given capacity. + fn with_capacity(capacity: usize) -> Self + where + Self: Sized; +} + +impl Container for MutableBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableBinaryArray::with_capacity(capacity) + } +} + +impl Container for MutableBooleanArray { + fn with_capacity(capacity: usize) -> Self { + MutableBooleanArray::with_capacity(capacity) + } +} + +impl Container for MutableFixedSizeBinaryArray { + fn with_capacity(capacity: usize) -> Self { + MutableFixedSizeBinaryArray::with_capacity(capacity, 0) + } +} + +impl Container for MutableListArray { + fn with_capacity(capacity: usize) -> Self { + MutableListArray::with_capacity(capacity) + } +} + +impl Container for MutablePrimitiveArray { + fn with_capacity(capacity: usize) -> Self { + MutablePrimitiveArray::with_capacity(capacity) + } +} + +impl Container for MutableUtf8Array { + fn with_capacity(capacity: usize) -> Self { + MutableUtf8Array::with_capacity(capacity) + } +} + +fn fill_generic_array_from(f: fn(&mut M, &[B]), rows: &[B]) -> Box +where + M: Container, + A: From + Array, +{ + let mut array = M::with_capacity(rows.len()); + f(&mut array, rows); + Box::new(A::from(array)) +} + pub(crate) fn _deserialize<'a, A: Borrow>>( rows: &[A], data_type: DataType, ) -> Box { match &data_type { DataType::Null => Box::new(NullArray::new(data_type, rows.len())), - DataType::Boolean => Box::new(deserialize_boolean(rows)), - DataType::Int8 => Box::new(deserialize_int::(rows, data_type)), - DataType::Int16 => Box::new(deserialize_int::(rows, data_type)), + DataType::Boolean => { + fill_generic_array_from::<_, _, BooleanArray>(deserialize_boolean_into, rows) + } + DataType::Int8 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::Int16 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } DataType::Int32 | DataType::Date32 | DataType::Time32(_) | DataType::Interval(IntervalUnit::YearMonth) => { - Box::new(deserialize_int::(rows, data_type)) + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) } DataType::Interval(IntervalUnit::DayTime) => { unimplemented!("There is no natural representation of DayTime in JSON.") @@ -337,16 +552,34 @@ pub(crate) fn _deserialize<'a, A: Borrow>>( | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt8 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt16 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt32 => Box::new(deserialize_int::(rows, data_type)), - DataType::UInt64 => Box::new(deserialize_int::(rows, data_type)), + | DataType::Duration(_) => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt8 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt16 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt32 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } + DataType::UInt64 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_int_into, data_type, rows) + } DataType::Float16 => unreachable!(), - DataType::Float32 => Box::new(deserialize_float::(rows, data_type)), - DataType::Float64 => Box::new(deserialize_float::(rows, data_type)), - DataType::Utf8 => Box::new(deserialize_utf8::(rows)), - DataType::LargeUtf8 => Box::new(deserialize_utf8::(rows)), + DataType::Float32 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) + } + DataType::Float64 => { + fill_array_from::<_, _, PrimitiveArray>(deserialize_float_into, data_type, rows) + } + DataType::Utf8 => { + fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) + } + DataType::LargeUtf8 => { + fill_generic_array_from::<_, _, Utf8Array>(deserialize_utf8_into, rows) + } DataType::List(_) => Box::new(deserialize_list::(rows, data_type)), DataType::LargeList(_) => Box::new(deserialize_list::(rows, data_type)), DataType::Binary => Box::new(deserialize_binary::(rows)), @@ -383,3 +616,89 @@ pub fn deserialize(json: &Value, data_type: DataType) -> Result, _ => Err(Error::nyi("read an Array from a non-Array JSON")), } } + +fn allocate_array(f: &Field) -> Box { + match f.data_type() { + DataType::Int8 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Int64 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt8 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt16 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt32 => Box::new(MutablePrimitiveArray::::new()), + DataType::UInt64 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float16 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float32 => Box::new(MutablePrimitiveArray::::new()), + DataType::Float64 => Box::new(MutablePrimitiveArray::::new()), + DataType::FixedSizeList(inner, size) => Box::new(MutableFixedSizeListArray::<_>::new_from( + allocate_array(inner), + f.data_type().clone(), + *size, + )), + DataType::List(inner) => match inner.data_type() { + DataType::List(_) => Box::new(MutableListArray::::new_from( + allocate_array(inner), + inner.data_type().clone(), + 0, + )), + _ => allocate_array(inner), + }, + _ => todo!(), + } +} + +/// Deserializes a `json` [`Value`] serialized in Pandas record format into +/// a [`Chunk`]. +/// +/// Uses the `Schema` provided, which can be inferred from arbitrary JSON with +/// [`infer_records_schema`]. +/// +/// This is CPU-bounded. +/// +/// # Errors +/// +/// This function errors iff either: +/// +/// * `json` is not a [`Value::Array`] +/// * `data_type` contains any incompatible types: +/// * [`DataType::Struct`] +/// * [`DataType::Dictionary`] +/// * [`DataType::LargeList`] +pub fn deserialize_records(json: &Value, schema: &Schema) -> Result>, Error> { + let mut results = schema + .fields + .iter() + .map(|f| (&f.name, allocate_array(f))) + .collect::>(); + + match json { + Value::Array(rows) => { + for row in rows.iter() { + match row { + Value::Object(record) => { + for (key, value) in record.iter() { + let arr = results.get_mut(key).ok_or_else(|| { + Error::ExternalFormat(format!("unexpected key: '{}'", key)) + })?; + deserialize_into(arr, &[value]); + } + } + _ => { + return Err(Error::ExternalFormat( + "each row must be an Object".to_string(), + )) + } + } + } + } + _ => { + return Err(Error::ExternalFormat( + "outer type must be an Array".to_string(), + )) + } + } + + Ok(Chunk::new( + results.into_values().map(|mut ma| ma.as_box()).collect(), + )) +} diff --git a/src/io/json/read/infer_schema.rs b/src/io/json/read/infer_schema.rs index 38c147728a1..1bab0251ac9 100644 --- a/src/io/json/read/infer_schema.rs +++ b/src/io/json/read/infer_schema.rs @@ -5,7 +5,7 @@ use indexmap::set::IndexSet as HashSet; use json_deserializer::{Number, Value}; use crate::datatypes::*; -use crate::error::Result; +use crate::error::{Error, Result}; const ITEM_NAME: &str = "item"; @@ -21,6 +21,46 @@ pub fn infer(json: &Value) -> Result { }) } +/// Infers [`Schema`] from JSON [`Value`] in (pandas-compatible) records format. +pub fn infer_records_schema(json: &Value) -> Result { + let outer_array = match json { + Value::Array(array) => Ok(array), + _ => Err(Error::ExternalFormat( + "outer type is not an array".to_string(), + )), + }?; + + let fields = match outer_array.iter().next() { + Some(Value::Object(record)) => record + .iter() + .map(|(name, json)| { + let data_type = infer(json)?; + + Ok(Field { + name: name.clone(), + data_type: DataType::List(Box::new(Field { + name: format!("{}-records", name), + data_type, + is_nullable: true, + metadata: Metadata::default(), + })), + is_nullable: true, + metadata: Metadata::default(), + }) + }) + .collect::>>(), + None => Ok(vec![]), + _ => Err(Error::ExternalFormat( + "first element in array is not a record".to_string(), + )), + }?; + + Ok(Schema { + fields, + metadata: Metadata::default(), + }) +} + fn filter_map_nulls(dt: DataType) -> Option { if dt == DataType::Null { None diff --git a/src/io/json/read/mod.rs b/src/io/json/read/mod.rs index ed1ad17f103..686390df2b3 100644 --- a/src/io/json/read/mod.rs +++ b/src/io/json/read/mod.rs @@ -3,8 +3,8 @@ mod deserialize; mod infer_schema; pub(crate) use deserialize::_deserialize; -pub use deserialize::deserialize; +pub use deserialize::{deserialize, deserialize_records}; pub(crate) use infer_schema::coerce_data_type; -pub use infer_schema::infer; +pub use infer_schema::{infer, infer_records_schema}; pub use json_deserializer; diff --git a/src/io/json/write/mod.rs b/src/io/json/write/mod.rs index 2278a636f2c..53cc3f20e68 100644 --- a/src/io/json/write/mod.rs +++ b/src/io/json/write/mod.rs @@ -5,8 +5,11 @@ mod utf8; pub use fallible_streaming_iterator::*; pub(crate) use serialize::new_serializer; use serialize::serialize; +use std::io::Write; -use crate::{array::Array, error::Error}; +use crate::{ + array::Array, chunk::Chunk, datatypes::Schema, error::Error, io::iterator::StreamingIterator, +}; /// [`FallibleStreamingIterator`] that serializes an [`Array`] to bytes of valid JSON /// # Implementation @@ -59,6 +62,79 @@ where } } +/// [`FallibleStreamingIterator`] that serializes a [`Chunk`] into bytes of JSON +/// in a (pandas-compatible) record-oriented format. +/// +/// # Implementation +/// Advancing this iterator is CPU-bounded. +pub struct RecordSerializer<'a> { + schema: Schema, + index: usize, + end: usize, + iterators: Vec + Send + Sync + 'a>>, + buffer: Vec, +} + +impl<'a> RecordSerializer<'a> { + /// Creates a new [`RecordSerializer`]. + pub fn new(schema: Schema, chunk: &'a Chunk, buffer: Vec) -> Self + where + A: AsRef, + { + let end = chunk.len(); + let iterators = chunk + .arrays() + .iter() + .map(|arr| new_serializer(arr.as_ref())) + .collect(); + + Self { + schema, + index: 0, + end, + iterators, + buffer, + } + } +} + +impl<'a> FallibleStreamingIterator for RecordSerializer<'a> { + type Item = [u8]; + + type Error = Error; + + fn advance(&mut self) -> Result<(), Error> { + self.buffer.clear(); + if self.index == self.end { + return Ok(()); + } + + let mut is_first_row = true; + write!(&mut self.buffer, "{{")?; + for (f, ref mut it) in self.schema.fields.iter().zip(self.iterators.iter_mut()) { + if !is_first_row { + write!(&mut self.buffer, ",")?; + } + write!(&mut self.buffer, "\"{}\":", f.name)?; + + self.buffer.extend_from_slice(it.next().unwrap()); + is_first_row = false; + } + write!(&mut self.buffer, "}}")?; + + self.index += 1; + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + if !self.buffer.is_empty() { + Some(&self.buffer) + } else { + None + } + } +} + /// Writes valid JSON from an iterator of (assumed JSON-encoded) bytes to `writer` pub fn write(writer: &mut W, mut blocks: I) -> Result<(), Error> where diff --git a/src/io/json/write/serialize.rs b/src/io/json/write/serialize.rs index 1e2c8445eb2..ebdb2f3b81b 100644 --- a/src/io/json/write/serialize.rs +++ b/src/io/json/write/serialize.rs @@ -165,6 +165,34 @@ fn list_serializer<'a, O: Offset>( )) } +fn fixed_size_list_serializer<'a>( + array: &'a FixedSizeListArray, +) -> Box + 'a + Send + Sync> { + let mut serializer = new_serializer(array.values().as_ref()); + + Box::new(BufStreamingIterator::new( + ZipValidity::new(0..array.len(), array.validity().map(|x| x.iter())), + move |ix, buf| { + if ix.is_some() { + let length = array.size(); + buf.push(b'['); + let mut is_first_row = true; + for _ in 0..length { + if !is_first_row { + buf.push(b','); + } + is_first_row = false; + buf.extend(serializer.next().unwrap()); + } + buf.push(b']'); + } else { + buf.extend(b"null"); + } + }, + vec![], + )) +} + fn date_serializer<'a, T, F>( array: &'a PrimitiveArray, convert: F, @@ -226,6 +254,9 @@ pub(crate) fn new_serializer<'a>( DataType::Utf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), DataType::LargeUtf8 => utf8_serializer::(array.as_any().downcast_ref().unwrap()), DataType::Struct(_) => struct_serializer(array.as_any().downcast_ref().unwrap()), + DataType::FixedSizeList(_, _) => { + fixed_size_list_serializer(array.as_any().downcast_ref().unwrap()) + } DataType::List(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), DataType::LargeList(_) => list_serializer::(array.as_any().downcast_ref().unwrap()), DataType::Date32 => date_serializer(array.as_any().downcast_ref().unwrap(), date32_to_date), diff --git a/tests/it/io/json/mod.rs b/tests/it/io/json/mod.rs index e84971431ea..59a68f8dd86 100644 --- a/tests/it/io/json/mod.rs +++ b/tests/it/io/json/mod.rs @@ -2,6 +2,8 @@ mod read; mod write; use arrow2::array::*; +use arrow2::chunk::Chunk; +use arrow2::datatypes::Schema; use arrow2::error::Result; use arrow2::io::json::write as json_write; @@ -12,3 +14,11 @@ fn write_batch(array: Box) -> Result> { json_write::write(&mut buf, &mut serializer)?; Ok(buf) } + +fn write_record_batch>(schema: Schema, chunk: Chunk) -> Result> { + let mut serializer = json_write::RecordSerializer::new(schema, &chunk, vec![]); + + let mut buf = vec![]; + json_write::write(&mut buf, &mut serializer)?; + Ok(buf) +} diff --git a/tests/it/io/json/read.rs b/tests/it/io/json/read.rs index 11bd93cd264..f2fb8570b8c 100644 --- a/tests/it/io/json/read.rs +++ b/tests/it/io/json/read.rs @@ -33,3 +33,133 @@ fn read_json() -> Result<()> { Ok(()) } + +#[test] +fn read_json_records() -> Result<()> { + let data = br#"[ + { + "a": [ + [1.1, 2, 3], + [2, 3], + [4, 5, 6] + ], + "b": [1, 2, 3] + }, + { + "a": [ + [3, 2, 1], + [3, 2], + [6, 5, 4] + ] + }, + { + "b": [7, 8, 9] + } + ]"#; + + let a_iter = vec![ + vec![ + Some(vec![Some(1.1), Some(2.), Some(3.)]), + Some(vec![Some(2.), Some(3.)]), + Some(vec![Some(4.), Some(5.), Some(6.)]), + ], + vec![ + Some(vec![Some(3.), Some(2.), Some(1.)]), + Some(vec![Some(3.), Some(2.)]), + Some(vec![Some(6.), Some(5.), Some(4.)]), + ], + ]; + + let a_iter = a_iter.into_iter().map(Some); + let a_inner = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "item", + true, + ); + let mut a_outer = + MutableListArray::>>::new_with_field( + a_inner, "item", true, + ); + a_outer.try_extend(a_iter).unwrap(); + let a_expected: ListArray = a_outer.into(); + + let b_iter = vec![ + vec![Some(1), Some(2), Some(3)], + vec![Some(7), Some(8), Some(9)], + ]; + let b_iter = b_iter.into_iter().map(Some); + let mut b = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "item", + true, + ); + b.try_extend(b_iter).unwrap(); + let b_expected: ListArray = b.into(); + + let json = json_deserializer::parse(data)?; + + let schema = read::infer_records_schema(&json)?; + let actual = read::deserialize_records(&json, &schema)?; + + for (f, arr) in schema.fields.iter().zip(actual.arrays().iter()) { + let (expected, actual) = if f.name == "a" { + (&a_expected, arr.as_ref()) + } else if f.name == "b" { + (&b_expected, arr.as_ref()) + } else { + panic!("unexpected field found: {}", f.name); + }; + + assert_eq!(expected.to_boxed().as_ref(), actual); + } + + Ok(()) +} + +#[test] +fn read_json_fixed_size_records() -> Result<()> { + let data = br#"[ + { + "a": [1, 2.2, 3, 4] + }, + { + "a": [5, 6, 7, 8] + }, + { + "a": [7, 8, 9] + } + ]"#; + + let a_iter = vec![ + Some(vec![Some(1.), Some(2.2), Some(3.), Some(4.)]), + Some(vec![Some(5.), Some(6.), Some(7.), Some(8.)]), + None, + ]; + + let a_iter = a_iter.into_iter(); + let mut a = MutableFixedSizeListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "inner", + true, + 4, + ); + a.try_extend(a_iter).unwrap(); + let a_expected: FixedSizeListArray = a.into(); + + let json = json_deserializer::parse(data)?; + + let schema: Schema = vec![Field::new("a", a_expected.data_type().clone(), true)].into(); + let actual = read::deserialize_records(&json, &schema)?; + + for (f, arr) in schema.fields.iter().zip(actual.arrays().iter()) { + let (expected, actual) = if f.name == "a" { + (&a_expected, arr.as_ref()) + } else { + panic!("unexpected field found: {}", f.name); + }; + + assert_eq!(expected.to_boxed().as_ref(), actual); + } + + Ok(()) +} diff --git a/tests/it/io/json/write.rs b/tests/it/io/json/write.rs index f98ef1fc71f..8c7830f8ef7 100644 --- a/tests/it/io/json/write.rs +++ b/tests/it/io/json/write.rs @@ -2,7 +2,7 @@ use arrow2::{ array::*, bitmap::Bitmap, buffer::Buffer, - datatypes::{DataType, Field, TimeUnit}, + datatypes::{DataType, Field, Metadata, Schema, TimeUnit}, error::Result, }; @@ -211,6 +211,80 @@ fn nested_list() -> Result<()> { test!(array, expected) } +#[test] +fn nested_list_records() -> Result<()> { + let iter = vec![ + vec![Some(vec![Some(1), Some(2)]), Some(vec![Some(3)])], + vec![], + vec![Some(vec![Some(4), Some(5), Some(6)])], + ]; + + let iter = iter.into_iter().map(Some); + + let inner = MutableListArray::>::new_with_field( + MutablePrimitiveArray::::new(), + "b", + false, + ); + let mut c1 = + MutableListArray::>>::new_with_field( + inner, "c1", false, + ); + c1.try_extend(iter).unwrap(); + let c1: ListArray = c1.into(); + + let c2 = Utf8Array::::from(&vec![Some("foo"), Some("bar"), None]); + + let schema: Schema = vec![ + Field::new("c1", c1.data_type().clone(), true), + Field::new("c2", c2.data_type().clone(), true), + ] + .into(); + + let arrays: Vec> = vec![Box::new(c1), Box::new(c2)]; + let chunk = Chunk::new(arrays); + + let expected = + r#"[{"c1":[[1,2],[3]],"c2":"foo"},{"c1":[],"c2":"bar"},{"c1":[[4,5,6]],"c2":null}]"#; + + let buf = write_record_batch(schema, chunk)?; + assert_eq!(String::from_utf8(buf).unwrap(), expected); + Ok(()) +} + +#[test] +fn fixed_size_list_records() -> Result<()> { + let iter = vec![ + vec![Some(1), Some(2), Some(3)], + vec![Some(4), Some(5), Some(6)], + ]; + + let iter = iter.into_iter().map(Some); + + let mut inner = MutableFixedSizeListArray::>::new_with_field( + MutablePrimitiveArray::new(), + "vs", + false, + 3, + ); + inner.try_extend(iter).unwrap(); + let inner: FixedSizeListArray = inner.into(); + + let schema = Schema { + fields: vec![Field::new("vs", inner.data_type().clone(), true)], + metadata: Metadata::default(), + }; + + let arrays: Vec> = vec![Box::new(inner)]; + let chunk = Chunk::new(arrays); + + let expected = r#"[{"vs":[1,2,3]},{"vs":[4,5,6]}]"#; + + let buf = write_record_batch(schema, chunk)?; + assert_eq!(String::from_utf8(buf).unwrap(), expected); + Ok(()) +} + #[test] fn list_of_struct() -> Result<()> { let inner = vec![Field::new("c121", DataType::Utf8, false)];