diff --git a/README.md b/README.md index c739732b364..9292eb07c5b 100644 --- a/README.md +++ b/README.md @@ -65,6 +65,7 @@ venv/bin/python parquet_integration/write_parquet.py * `MutableArray` API to work in-memory in-place. * faster IPC reader (different design that avoids an extra copy of all data) * IPC supports 2.0 (compression) +* Extension type supported * All implemented arrow types pass FFI integration tests against pyarrow / C++ * All implemented arrow types pass IPC integration tests against other implementations @@ -83,7 +84,7 @@ venv/bin/python parquet_integration/write_parquet.py ## Features in the original not available in this crate * Parquet read and write of struct and nested lists. -* Map types +* Map type ## Features in this crate not in pyarrow diff --git a/examples/extension.rs b/examples/extension.rs new file mode 100644 index 00000000000..25210214aa9 --- /dev/null +++ b/examples/extension.rs @@ -0,0 +1,52 @@ +use std::io::{Cursor, Seek, Write}; +use std::sync::Arc; + +use arrow2::array::*; +use arrow2::datatypes::*; +use arrow2::error::Result; +use arrow2::io::ipc::read; +use arrow2::io::ipc::write; +use arrow2::record_batch::RecordBatch; + +fn main() -> Result<()> { + // declare an extension. + let extension_type = + DataType::Extension("date16".to_string(), Box::new(DataType::UInt16), None); + + // initialize an array with it. + let array = UInt16Array::from_slice([1, 2]).to(extension_type.clone()); + + // from here on, it works as usual + let mut buffer = Cursor::new(vec![]); + + // write to IPC + write_ipc(&mut buffer, array)?; + + // read it back + let batch = read_ipc(&buffer.into_inner())?; + + // and verify that the datatype is preserved. + let array = &batch.columns()[0]; + assert_eq!(array.data_type(), &extension_type); + + // see https://arrow.apache.org/docs/format/Columnar.html#extension-types + // for consuming by other consumers. + Ok(()) +} + +fn write_ipc(writer: &mut W, array: impl Array + 'static) -> Result<()> { + let schema = Schema::new(vec![Field::new("a", array.data_type().clone(), false)]); + + let mut writer = write::FileWriter::try_new(writer, &schema)?; + + let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array)])?; + + writer.write(&batch) +} + +fn read_ipc(reader: &[u8]) -> Result { + let mut reader = Cursor::new(reader); + let metadata = read::read_file_metadata(&mut reader)?; + let mut reader = read::FileReader::new(&mut reader, metadata, None); + reader.next().unwrap() +} diff --git a/guide/src/SUMMARY.md b/guide/src/SUMMARY.md index 6ba760bec79..8158f21788f 100644 --- a/guide/src/SUMMARY.md +++ b/guide/src/SUMMARY.md @@ -7,6 +7,7 @@ - [Compute](./compute.md) - [Metadata](./metadata.md) - [Foreign interfaces](./ffi.md) +- [Extension](./extension.md) - [IO](./io/README.md) - [Read CSV](./io/csv_reader.md) - [Write CSV](./io/csv_write.md) diff --git a/guide/src/extension.md b/guide/src/extension.md new file mode 100644 index 00000000000..fe85860bed3 --- /dev/null +++ b/guide/src/extension.md @@ -0,0 +1,8 @@ +# Extension types + +This crate supports Arrows' ["extension type"](https://arrow.apache.org/docs/format/Columnar.html#extension-types), to declare, use, and share custom logical types. +The follow example shows how to declare one: + +```rust +{{#include ../../../examples/extension.rs}} +``` diff --git a/src/array/dictionary/mod.rs b/src/array/dictionary/mod.rs index acab539d6cb..f9d5668d969 100644 --- a/src/array/dictionary/mod.rs +++ b/src/array/dictionary/mod.rs @@ -43,25 +43,19 @@ pub struct DictionaryArray { impl DictionaryArray { /// Returns a new empty [`DictionaryArray`]. pub fn new_empty(data_type: DataType) -> Self { - if let DataType::Dictionary(_, values) = data_type { - let values = new_empty_array(values.as_ref().clone()).into(); - Self::from_data(PrimitiveArray::::new_empty(K::DATA_TYPE), values) - } else { - panic!("DictionaryArray must be initialized with DataType::Dictionary"); - } + let values = Self::get_child(&data_type); + let values = new_empty_array(values.clone()).into(); + Self::from_data(PrimitiveArray::::new_empty(K::DATA_TYPE), values) } /// Returns an [`DictionaryArray`] whose all elements are null #[inline] pub fn new_null(data_type: DataType, length: usize) -> Self { - if let DataType::Dictionary(_, values) = data_type { - Self::from_data( - PrimitiveArray::::new_null(K::DATA_TYPE, length), - new_empty_array(values.as_ref().clone()).into(), - ) - } else { - panic!("DictionaryArray must be initialized with DataType::Dictionary"); - } + let values = Self::get_child(&data_type); + Self::from_data( + PrimitiveArray::::new_null(K::DATA_TYPE, length), + new_empty_array(values.clone()).into(), + ) } /// The canonical method to create a new [`DictionaryArray`]. @@ -112,10 +106,10 @@ impl DictionaryArray { impl DictionaryArray { pub(crate) fn get_child(data_type: &DataType) -> &DataType { - if let DataType::Dictionary(_, values) = data_type { - values.as_ref() - } else { - panic!("Wrong DataType") + match data_type { + DataType::Dictionary(_, values) => values.as_ref(), + DataType::Extension(_, inner, _) => Self::get_child(inner), + _ => panic!("DictionaryArray must be initialized with DataType::Dictionary"), } } } diff --git a/src/array/display.rs b/src/array/display.rs index 7a216993b5a..ecf6036e84f 100644 --- a/src/array/display.rs +++ b/src/array/display.rs @@ -234,6 +234,7 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box Strin displays[field](index) }) } + Extension(_, _, _) => todo!(), } } diff --git a/src/array/fixed_size_binary/mod.rs b/src/array/fixed_size_binary/mod.rs index 2a91db09ce6..401b24e8ec3 100644 --- a/src/array/fixed_size_binary/mod.rs +++ b/src/array/fixed_size_binary/mod.rs @@ -99,6 +99,7 @@ impl FixedSizeBinaryArray { pub(crate) fn get_size(data_type: &DataType) -> &i32 { match data_type { DataType::FixedSizeBinary(size) => size, + DataType::Extension(_, child, _) => Self::get_size(child), _ => panic!("Wrong DataType"), } } diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 1376a56386b..cfb020d9072 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -35,10 +35,11 @@ impl From for FixedSizeBinaryArray { impl MutableFixedSizeBinaryArray { /// Canonical method to create a new [`MutableFixedSizeBinaryArray`]. pub fn from_data( - size: usize, + data_type: DataType, values: MutableBuffer, validity: Option, ) -> Self { + let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; assert_eq!( values.len() % size, 0, @@ -52,7 +53,7 @@ impl MutableFixedSizeBinaryArray { ); } Self { - data_type: DataType::FixedSizeBinary(size as i32), + data_type, size, values, validity, @@ -67,7 +68,7 @@ impl MutableFixedSizeBinaryArray { /// Creates a new [`MutableFixedSizeBinaryArray`] with capacity for `capacity` entries. pub fn with_capacity(size: usize, capacity: usize) -> Self { Self::from_data( - size, + DataType::FixedSizeBinary(size as i32), MutableBuffer::::with_capacity(capacity * size), None, ) diff --git a/src/array/fixed_size_list/mod.rs b/src/array/fixed_size_list/mod.rs index 6b5e1e4e7b1..6c8b2f34834 100644 --- a/src/array/fixed_size_list/mod.rs +++ b/src/array/fixed_size_list/mod.rs @@ -96,12 +96,12 @@ impl FixedSizeListArray { pub(crate) fn get_child_and_size(data_type: &DataType) -> (&Field, &i32) { match data_type { DataType::FixedSizeList(child, size) => (child.as_ref(), size), + DataType::Extension(_, child, _) => Self::get_child_and_size(child), _ => panic!("Wrong DataType"), } } /// Returns a [`DataType`] consistent with this Array. - #[inline] pub fn default_datatype(data_type: DataType, size: usize) -> DataType { let field = Box::new(Field::new("item", data_type, true)); DataType::FixedSizeList(field, size as i32) diff --git a/src/array/list/mod.rs b/src/array/list/mod.rs index 95808e2c191..670822b2aa6 100644 --- a/src/array/list/mod.rs +++ b/src/array/list/mod.rs @@ -120,7 +120,6 @@ impl ListArray { } impl ListArray { - #[inline] pub fn default_datatype(data_type: DataType) -> DataType { let field = Box::new(Field::new("item", data_type, true)); if O::is_large() { @@ -130,22 +129,22 @@ impl ListArray { } } - #[inline] pub fn get_child_field(data_type: &DataType) -> &Field { if O::is_large() { match data_type { DataType::LargeList(child) => child.as_ref(), + DataType::Extension(_, child, _) => Self::get_child_field(child), _ => panic!("Wrong DataType"), } } else { match data_type { DataType::List(child) => child.as_ref(), + DataType::Extension(_, child, _) => Self::get_child_field(child), _ => panic!("Wrong DataType"), } } } - #[inline] pub fn get_child_type(data_type: &DataType) -> &DataType { Self::get_child_field(data_type).data_type() } diff --git a/src/array/struct_.rs b/src/array/struct_.rs index a18ee3bdd56..d773b51e1d9 100644 --- a/src/array/struct_.rs +++ b/src/array/struct_.rs @@ -70,20 +70,17 @@ impl StructArray { values: Vec>, validity: Option, ) -> Self { - if let DataType::Struct(fields) = &data_type { - assert!(!fields.is_empty()); - assert_eq!(fields.len(), values.len()); - assert!(values.iter().all(|x| x.len() == values[0].len())); - if let Some(ref validity) = validity { - assert_eq!(values[0].len(), validity.len()); - } - Self { - data_type, - values, - validity, - } - } else { - panic!("StructArray must be initialized with DataType::Struct"); + let fields = Self::get_fields(&data_type); + assert!(!fields.is_empty()); + assert_eq!(fields.len(), values.len()); + assert!(values.iter().all(|x| x.len() == values[0].len())); + if let Some(ref validity) = validity { + assert_eq!(values[0].len(), validity.len()); + } + Self { + data_type, + values, + validity, } } @@ -134,10 +131,10 @@ impl StructArray { impl StructArray { /// Returns the fields the `DataType::Struct`. pub fn get_fields(data_type: &DataType) -> &[Field] { - if let DataType::Struct(fields) = data_type { - fields - } else { - panic!("Wrong datatype passed to Struct.") + match data_type { + DataType::Struct(fields) => fields, + DataType::Extension(_, inner, _) => Self::get_fields(inner), + _ => panic!("Wrong datatype passed to Struct."), } } } diff --git a/src/array/union/mod.rs b/src/array/union/mod.rs index f688656e2f3..f00dcdff945 100644 --- a/src/array/union/mod.rs +++ b/src/array/union/mod.rs @@ -89,33 +89,30 @@ impl UnionArray { fields: Vec>, offsets: Option>, ) -> Self { - let fields_hash = if let DataType::Union(f, ids, is_sparse) = &data_type { - if f.len() != fields.len() { - panic!( - "The number of `fields` must equal the number of fields in the Union DataType" - ) - }; - let same_data_types = f - .iter() - .zip(fields.iter()) - .all(|(f, array)| f.data_type() == array.data_type()); - if !same_data_types { - panic!("All fields' datatype in the union must equal the datatypes on the fields.") - } - if offsets.is_none() != *is_sparse { - panic!("Sparsness flag must equal to noness of offsets in UnionArray") - } - ids.as_ref().map(|ids| { - ids.iter() - .map(|x| *x as i8) - .enumerate() - .zip(fields.iter().cloned()) - .map(|((i, type_), field)| (type_, (i, field))) - .collect() - }) - } else { - panic!("Union struct must be created with the corresponding Union DataType") + let (f, ids, is_sparse) = Self::get_all(&data_type); + + if f.len() != fields.len() { + panic!("The number of `fields` must equal the number of fields in the Union DataType") }; + let same_data_types = f + .iter() + .zip(fields.iter()) + .all(|(f, array)| f.data_type() == array.data_type()); + if !same_data_types { + panic!("All fields' datatype in the union must equal the datatypes on the fields.") + } + if offsets.is_none() != is_sparse { + panic!("Sparsness flag must equal to noness of offsets in UnionArray") + } + let fields_hash = ids.as_ref().map(|ids| { + ids.iter() + .map(|x| *x as i8) + .enumerate() + .zip(fields.iter().cloned()) + .map(|((i, type_), field)| (type_, (i, field))) + .collect() + }); + // not validated: // * `offsets` is valid // * max id < fields.len() @@ -218,20 +215,22 @@ impl Array for UnionArray { } impl UnionArray { - pub fn get_fields(data_type: &DataType) -> &[Field] { - if let DataType::Union(fields, _, _) = data_type { - fields - } else { - panic!("Wrong datatype passed to UnionArray.") + fn get_all(data_type: &DataType) -> (&[Field], Option<&[i32]>, bool) { + match data_type { + DataType::Union(fields, ids, is_sparse) => { + (fields, ids.as_ref().map(|x| x.as_ref()), *is_sparse) + } + DataType::Extension(_, inner, _) => Self::get_all(inner), + _ => panic!("Wrong datatype passed to UnionArray."), } } + pub fn get_fields(data_type: &DataType) -> &[Field] { + Self::get_all(data_type).0 + } + pub fn is_sparse(data_type: &DataType) -> bool { - if let DataType::Union(_, _, is_sparse) = data_type { - *is_sparse - } else { - panic!("Wrong datatype passed to UnionArray.") - } + Self::get_all(data_type).2 } } diff --git a/src/datatypes/field.rs b/src/datatypes/field.rs index 3927a878f21..d5a66a52507 100644 --- a/src/datatypes/field.rs +++ b/src/datatypes/field.rs @@ -23,7 +23,7 @@ use super::DataType; /// A logical [`DataType`] and its associated metadata per /// [Arrow specification](https://arrow.apache.org/docs/cpp/api/datatype.html) -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct Field { /// Its name pub name: String, @@ -255,6 +255,7 @@ impl Field { | DataType::FixedSizeBinary(_) | DataType::Utf8 | DataType::LargeUtf8 + | DataType::Extension(_, _, _) | DataType::Decimal(_, _) => { if self.data_type != from.data_type { return Err(ArrowError::Schema( diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index fe39d1e03f2..9b406e4a208 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -22,7 +22,7 @@ pub use schema::Schema; /// Nested types can themselves be nested within other arrays. /// For more information on these types please see /// [the physical memory layout of Apache Arrow](https://arrow.apache.org/docs/format/Columnar.html#physical-memory-layout). -#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[derive(Debug, Clone, PartialEq, Eq, Hash)] pub enum DataType { /// Null type, representing an array without values or validity, only a length. Null, @@ -116,6 +116,8 @@ pub enum DataType { /// scale is the number of decimal places. /// The number 999.99 has a precision of 5 and scale of 2. Decimal(usize, usize), + /// Extension type. + Extension(String, Box, Option), } impl std::fmt::Display for DataType { @@ -206,6 +208,7 @@ impl DataType { Struct(_) => PhysicalType::Struct, Union(_, _, _) => PhysicalType::Union, Dictionary(key, _) => PhysicalType::Dictionary(to_dictionary_index_type(key.as_ref())), + Extension(_, key, _) => key.to_physical_type(), } } } diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 75a06650004..083e006beed 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -356,6 +356,7 @@ fn to_format(data_type: &DataType) -> String { r } DataType::Dictionary(index, _) => to_format(index.as_ref()), + DataType::Extension(_, inner, _) => to_format(inner.as_ref()), } } diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index fc8082fa7b9..4f218189966 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -32,6 +32,9 @@ use std::collections::{BTreeMap, HashMap}; use DataType::*; +type Metadata = Option>; +type Extension = Option<(String, Option)>; + pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, schema: &Schema, @@ -67,36 +70,54 @@ pub fn schema_to_fb_offset<'a>( builder.finish() } +fn read_metadata(field: &ipc::Field) -> Metadata { + if let Some(list) = field.custom_metadata() { + let mut metadata_map = BTreeMap::default(); + for kv in list { + if let (Some(k), Some(v)) = (kv.key(), kv.value()) { + metadata_map.insert(k.to_string(), v.to_string()); + } + } + Some(metadata_map) + } else { + None + } +} + +pub(crate) fn get_extension(metadata: &Metadata) -> Extension { + if let Some(metadata) = metadata { + if let Some(name) = metadata.get("ARROW:extension:name") { + let metadata = metadata.get("ARROW:extension:metadata").cloned(); + Some((name.clone(), metadata)) + } else { + None + } + } else { + None + } +} + /// Convert an IPC Field to Arrow Field impl<'a> From> for Field { fn from(field: ipc::Field) -> Field { + let metadata = read_metadata(&field); + + let extension = get_extension(&metadata); + + let data_type = get_data_type(field, extension, true); + let mut arrow_field = if let Some(dictionary) = field.dictionary() { Field::new_dict( field.name().unwrap(), - get_data_type(field, true), + data_type, field.nullable(), dictionary.id(), dictionary.isOrdered(), ) } else { - Field::new( - field.name().unwrap(), - get_data_type(field, true), - field.nullable(), - ) + Field::new(field.name().unwrap(), data_type, field.nullable()) }; - let mut metadata = None; - if let Some(list) = field.custom_metadata() { - let mut metadata_map = BTreeMap::default(); - for kv in list { - if let (Some(k), Some(v)) = (kv.key(), kv.value()) { - metadata_map.insert(k.to_string(), v.to_string()); - } - } - metadata = Some(metadata_map); - } - arrow_field.set_metadata(metadata); arrow_field } @@ -132,7 +153,7 @@ pub fn fb_to_schema(fb: ipc::Schema) -> (Schema, bool) { } /// Get the Arrow data type from the flatbuffer Field table -pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataType { +fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: bool) -> DataType { if let Some(dictionary) = field.dictionary() { if may_be_dictionary { let int = dictionary.indexType().unwrap(); @@ -149,11 +170,17 @@ pub(crate) fn get_data_type(field: ipc::Field, may_be_dictionary: bool) -> DataT }; return DataType::Dictionary( Box::new(index_type), - Box::new(get_data_type(field, false)), + Box::new(get_data_type(field, extension, false)), ); } } + if let Some(extension) = extension { + let (name, metadata) = extension; + let data_type = get_data_type(field, None, false); + return DataType::Extension(name, Box::new(data_type), metadata); + } + match field.type_type() { ipc::Type::Null => DataType::Null, ipc::Type::Bool => DataType::Boolean, @@ -303,27 +330,58 @@ pub(crate) struct FbFieldType<'b> { pub(crate) children: Option>>>>, } +fn write_metadata<'a>( + fbb: &mut FlatBufferBuilder<'a>, + metadata: &BTreeMap, + kv_vec: &mut Vec>>, +) { + for (k, v) in metadata { + if k != "ARROW:extension:name" && k != "ARROW:extension:metadata" { + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string(k.as_str())), + value: Some(fbb.create_string(v.as_str())), + }; + kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + } + } +} + /// Create an IPC Field from an Arrow Field pub(crate) fn build_field<'a>( fbb: &mut FlatBufferBuilder<'a>, field: &Field, ) -> WIPOffset> { - // Optional custom metadata. - let mut fb_metadata = None; + // custom metadata. + let mut kv_vec = vec![]; + if let DataType::Extension(name, _, metadata) = field.data_type() { + // append extension information. + + // metadata + if let Some(metadata) = metadata { + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string("ARROW:extension:metadata")), + value: Some(fbb.create_string(metadata.as_str())), + }; + kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + } + + // name + let kv_args = ipc::KeyValueArgs { + key: Some(fbb.create_string("ARROW:extension:name")), + value: Some(fbb.create_string(name.as_str())), + }; + kv_vec.push(ipc::KeyValue::create(fbb, &kv_args)); + } if let Some(metadata) = field.metadata() { if !metadata.is_empty() { - let mut kv_vec = vec![]; - for (k, v) in metadata { - let kv_args = ipc::KeyValueArgs { - key: Some(fbb.create_string(k.as_str())), - value: Some(fbb.create_string(v.as_str())), - }; - let kv_offset = ipc::KeyValue::create(fbb, &kv_args); - kv_vec.push(kv_offset); - } - fb_metadata = Some(fbb.create_vector(&kv_vec)); + write_metadata(fbb, metadata, &mut kv_vec); } }; + let fb_metadata = if !kv_vec.is_empty() { + Some(fbb.create_vector(&kv_vec)) + } else { + None + }; let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); @@ -386,6 +444,7 @@ fn type_to_field_type(data_type: &DataType) -> ipc::Type { Union(_, _, _) => ipc::Type::Union, Struct(_) => ipc::Type::Struct_, Dictionary(_, v) => type_to_field_type(v), + Extension(_, v, _) => type_to_field_type(v), } } @@ -625,6 +684,7 @@ pub(crate) fn get_fb_field_type<'a>( // type in the DictionaryEncoding metadata in the parent field get_fb_field_type(value_type, is_nullable, fbb) } + Extension(_, value_type, _) => get_fb_field_type(value_type, is_nullable, fbb), Decimal(precision, scale) => { let mut builder = ipc::DecimalBuilder::new(fbb); builder.add_precision(*precision as i32); diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index f6b347534c9..49e22fc18e0 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -13,6 +13,7 @@ mod compression; mod convert; pub use convert::fb_to_schema; +pub(crate) use convert::get_extension; pub use gen::Message::root_as_message; pub mod read; pub mod write; diff --git a/src/io/json_integration/read.rs b/src/io/json_integration/read.rs index 7810967ffc1..cba5271985c 100644 --- a/src/io/json_integration/read.rs +++ b/src/io/json_integration/read.rs @@ -24,7 +24,7 @@ use crate::{ array::*, bitmap::{Bitmap, MutableBitmap}, buffer::Buffer, - datatypes::{DataType, Field, IntervalUnit, Schema}, + datatypes::{DataType, PhysicalType, PrimitiveType, Schema}, error::{ArrowError, Result}, record_batch::RecordBatch, types::{days_ms, NativeType}, @@ -181,7 +181,12 @@ fn to_list( let child_field = ListArray::::get_child_field(&data_type); let children = &json_col.children.as_ref().unwrap()[0]; - let values = to_array(child_field, children, dictionaries)?; + let values = to_array( + child_field.data_type().clone(), + child_field.dict_id(), + children, + dictionaries, + )?; let offsets = to_offsets::(json_col.offset.as_ref()); Ok(Arc::new(ListArray::::from_data( data_type, offsets, values, validity, @@ -189,13 +194,11 @@ fn to_list( } fn to_dictionary( - field: &Field, + data_type: DataType, + dict_id: i64, json_col: &ArrowJsonColumn, dictionaries: &HashMap, ) -> Result> { - let dict_id = field - .dict_id() - .ok_or_else(|| ArrowError::Ipc(format!("Unable to find dict_id for field {:?}", field)))?; // find dictionary let dictionary = dictionaries .get(&dict_id) @@ -204,27 +207,28 @@ fn to_dictionary( let keys = to_primitive(json_col, K::DATA_TYPE); // todo: make DataType::Dictionary hold a Field so that it can hold dictionary_id - let data_type = DictionaryArray::::get_child(field.data_type()); - // note: not enough info on nullability of dictionary - let value_field = Field::new("value", data_type.clone(), true); - let values = to_array(&value_field, &dictionary.data.columns[0], &HashMap::new())?; + let inner_data_type = DictionaryArray::::get_child(&data_type); + let values = to_array( + inner_data_type.clone(), + None, // this should not be None: we need to propagate the id as dicts can be nested. + &dictionary.data.columns[0], + dictionaries, + )?; Ok(Arc::new(DictionaryArray::::from_data(keys, values))) } /// Construct an [`Array`] from the JSON integration format pub fn to_array( - field: &Field, + data_type: DataType, + dict_id: Option, json_col: &ArrowJsonColumn, dictionaries: &HashMap, ) -> Result> { - let data_type = field.data_type(); - match data_type { - DataType::Null => Ok(Arc::new(NullArray::from_data( - data_type.clone(), - json_col.count, - ))), - DataType::Boolean => { + use PhysicalType::*; + match data_type.to_physical_type() { + Null => Ok(Arc::new(NullArray::from_data(data_type, json_col.count))), + Boolean => { let validity = to_validity(&json_col.validity); let values = json_col .data @@ -234,39 +238,28 @@ pub fn to_array( .map(|value| value.as_bool().unwrap()) .collect::(); Ok(Arc::new(BooleanArray::from_data( - data_type.clone(), - values, - validity, + data_type, values, validity, ))) } - DataType::Int8 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Int16 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Int32 - | DataType::Date32 - | DataType::Time32(_) - | DataType::Interval(IntervalUnit::YearMonth) => { - Ok(Arc::new(to_primitive::(json_col, data_type.clone()))) - } - DataType::Int64 - | DataType::Date64 - | DataType::Time64(_) - | DataType::Timestamp(_, _) - | DataType::Duration(_) => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Interval(IntervalUnit::DayTime) => { - Ok(Arc::new(to_primitive_interval(json_col, data_type.clone()))) + Primitive(PrimitiveType::Int8) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Int16) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Int32) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Int64) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Int128) => Ok(Arc::new(to_decimal(json_col, data_type))), + Primitive(PrimitiveType::DaysMs) => { + Ok(Arc::new(to_primitive_interval(json_col, data_type))) } - DataType::Decimal(_, _) => Ok(Arc::new(to_decimal(json_col, data_type.clone()))), - DataType::UInt8 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::UInt16 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::UInt32 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::UInt64 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Float32 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Float64 => Ok(Arc::new(to_primitive::(json_col, data_type.clone()))), - DataType::Binary => Ok(to_binary::(json_col, data_type.clone())), - DataType::LargeBinary => Ok(to_binary::(json_col, data_type.clone())), - DataType::Utf8 => Ok(to_utf8::(json_col, data_type.clone())), - DataType::LargeUtf8 => Ok(to_utf8::(json_col, data_type.clone())), - DataType::FixedSizeBinary(_) => { + Primitive(PrimitiveType::UInt8) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::UInt16) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::UInt32) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::UInt64) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Float32) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Primitive(PrimitiveType::Float64) => Ok(Arc::new(to_primitive::(json_col, data_type))), + Binary => Ok(to_binary::(json_col, data_type)), + LargeBinary => Ok(to_binary::(json_col, data_type)), + Utf8 => Ok(to_utf8::(json_col, data_type)), + LargeUtf8 => Ok(to_utf8::(json_col, data_type)), + FixedSizeBinary => { let validity = to_validity(&json_col.validity); let values = json_col @@ -278,50 +271,60 @@ pub fn to_array( .flatten() .collect(); Ok(Arc::new(FixedSizeBinaryArray::from_data( - data_type.clone(), - values, - validity, + data_type, values, validity, ))) } - - DataType::List(_) => to_list::(json_col, data_type.clone(), dictionaries), - DataType::LargeList(_) => to_list::(json_col, data_type.clone(), dictionaries), - - DataType::FixedSizeList(child_field, _) => { + List => to_list::(json_col, data_type, dictionaries), + LargeList => to_list::(json_col, data_type, dictionaries), + FixedSizeList => { let validity = to_validity(&json_col.validity); + let (child_field, _) = FixedSizeListArray::get_child_and_size(&data_type); + let children = &json_col.children.as_ref().unwrap()[0]; - let values = to_array(child_field, children, dictionaries)?; + let values = to_array( + child_field.data_type().clone(), + child_field.dict_id(), + children, + dictionaries, + )?; Ok(Arc::new(FixedSizeListArray::from_data( - data_type.clone(), - values, - validity, + data_type, values, validity, ))) } - DataType::Struct(fields) => { + Struct => { let validity = to_validity(&json_col.validity); + let fields = StructArray::get_fields(&data_type); + let values = fields .iter() .zip(json_col.children.as_ref().unwrap()) - .map(|(field, col)| to_array(field, col, dictionaries)) + .map(|(field, col)| to_array(field.data_type().clone(), None, col, dictionaries)) .collect::>>()?; - let array = StructArray::from_data(data_type.clone(), values, validity); + let array = StructArray::from_data(data_type, values, validity); Ok(Arc::new(array)) } - DataType::Dictionary(key_type, _) => { - with_match_dictionary_key_type!(key_type.as_ref(), |$T| { - to_dictionary::<$T>(field, json_col, dictionaries) + Dictionary(key_type) => { + with_match_physical_dictionary_key_type!(key_type, |$T| { + to_dictionary::<$T>(data_type, dict_id.unwrap(), json_col, dictionaries) }) } - DataType::Float16 => unreachable!(), - DataType::Union(fields, _, _) => { + Union => { + let fields = UnionArray::get_fields(&data_type); let fields = fields .iter() .zip(json_col.children.as_ref().unwrap()) - .map(|(field, col)| to_array(field, col, dictionaries)) + .map(|(field, col)| { + to_array( + field.data_type().clone(), + field.dict_id(), + col, + dictionaries, + ) + }) .collect::>>()?; let types = json_col @@ -359,7 +362,7 @@ pub fn to_array( }) .unwrap_or_default(); - let array = UnionArray::from_data(data_type.clone(), types, fields, offsets); + let array = UnionArray::from_data(data_type, types, fields, offsets); Ok(Arc::new(array)) } } @@ -374,7 +377,14 @@ pub fn to_record_batch( .fields() .iter() .zip(&json_batch.columns) - .map(|(field, json_col)| to_array(field, json_col, json_dictionaries)) + .map(|(field, json_col)| { + to_array( + field.data_type().clone(), + field.dict_id(), + json_col, + json_dictionaries, + ) + }) .collect::>>()?; RecordBatch::try_new(Arc::new(schema.clone()), columns) diff --git a/src/io/json_integration/schema.rs b/src/io/json_integration/schema.rs index b6c85bc22df..839e27550c6 100644 --- a/src/io/json_integration/schema.rs +++ b/src/io/json_integration/schema.rs @@ -26,6 +26,7 @@ use serde_json::{json, Value}; use crate::error::{ArrowError, Result}; use crate::datatypes::{DataType, Field, IntervalUnit, Schema, TimeUnit}; +use crate::io::ipc::get_extension; pub trait ToJson { /// Generate a JSON representation @@ -114,6 +115,7 @@ impl ToJson for DataType { DataType::Decimal(precision, scale) => { json!({"name": "decimal", "precision": precision, "scale": scale}) } + DataType::Extension(_, inner_data_type, _) => inner_data_type.to_json(), } } } @@ -176,6 +178,63 @@ fn children(children: Option<&Value>) -> Result> { .unwrap_or_else(|| Ok(vec![])) } +fn read_metadata(metadata: &Value) -> Result> { + match metadata { + Value::Array(ref values) => { + let mut res: BTreeMap = BTreeMap::new(); + for value in values { + match value.as_object() { + Some(map) => { + if map.len() != 2 { + return Err(ArrowError::Schema( + "Field 'metadata' must have exact two entries for each key-value map".to_string(), + )); + } + if let (Some(k), Some(v)) = (map.get("key"), map.get("value")) { + if let (Some(k_str), Some(v_str)) = (k.as_str(), v.as_str()) { + res.insert(k_str.to_string().clone(), v_str.to_string().clone()); + } else { + return Err(ArrowError::Schema( + "Field 'metadata' must have map value of string type" + .to_string(), + )); + } + } else { + return Err(ArrowError::Schema( + "Field 'metadata' lacks map keys named \"key\" or \"value\"" + .to_string(), + )); + } + } + _ => { + return Err(ArrowError::Schema( + "Field 'metadata' contains non-object key-value pair".to_string(), + )); + } + } + } + Ok(res) + } + Value::Object(ref values) => { + let mut res: BTreeMap = BTreeMap::new(); + for (k, v) in values { + if let Some(str_value) = v.as_str() { + res.insert(k.clone(), str_value.to_string().clone()); + } else { + return Err(ArrowError::Schema(format!( + "Field 'metadata' contains non-string value for key {}", + k + ))); + } + } + Ok(res) + } + _ => Err(ArrowError::Schema( + "Invalid json value type for field".to_string(), + )), + } +} + fn to_data_type(item: &Value, mut children: Vec) -> Result { let type_ = item .get("name") @@ -380,104 +439,60 @@ impl TryFrom<&Value> for Field { let children = children(map.get("children"))?; - let data_type = to_data_type( - map.get("type") - .ok_or_else(|| ArrowError::Schema("type missing".to_string()))?, - children, - )?; + let metadata = if let Some(metadata) = map.get("metadata") { + Some(read_metadata(metadata)?) + } else { + None + }; - // Referenced example file: testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_custom_metadata.json.gz - let metadata = match map.get("metadata") { - Some(&Value::Array(ref values)) => { - let mut res: BTreeMap = BTreeMap::new(); - for value in values { - match value.as_object() { - Some(map) => { - if map.len() != 2 { - return Err(ArrowError::Schema( - "Field 'metadata' must have exact two entries for each key-value map".to_string(), - )); - } - if let (Some(k), Some(v)) = (map.get("key"), map.get("value")) { - if let (Some(k_str), Some(v_str)) = (k.as_str(), v.as_str()) - { - res.insert( - k_str.to_string().clone(), - v_str.to_string().clone(), - ); - } else { - return Err(ArrowError::Schema("Field 'metadata' must have map value of string type".to_string())); - } - } else { - return Err(ArrowError::Schema("Field 'metadata' lacks map keys named \"key\" or \"value\"".to_string())); - } - } - _ => { - return Err(ArrowError::Schema( - "Field 'metadata' contains non-object key-value pair" - .to_string(), - )); - } - } - } - Some(res) - } - // We also support map format, because Schema's metadata supports this. - // See https://github.com/apache/arrow/pull/5907 - Some(&Value::Object(ref values)) => { - let mut res: BTreeMap = BTreeMap::new(); - for (k, v) in values { - if let Some(str_value) = v.as_str() { - res.insert(k.clone(), str_value.to_string().clone()); - } else { - return Err(ArrowError::Schema(format!( - "Field 'metadata' contains non-string value for key {}", - k - ))); - } - } - Some(res) - } - Some(_) => { - return Err(ArrowError::Schema( - "Field `metadata` is not json array".to_string(), - )); - } - _ => None, + let extension = get_extension(&metadata); + + let type_ = map + .get("type") + .ok_or_else(|| ArrowError::Schema("type missing".to_string()))?; + + let data_type = to_data_type(type_, children)?; + + let data_type = if let Some((name, metadata)) = extension { + DataType::Extension(name, Box::new(data_type), metadata) + } else { + data_type }; - let mut dict_id = 0; - let mut dict_is_ordered = false; + let data_type = if let Some(dictionary) = map.get("dictionary") { + let index_type = match dictionary.get("indexType") { + Some(t) => to_data_type(t, vec![])?, + _ => { + return Err(ArrowError::Schema( + "Field missing 'indexType' attribute".to_string(), + )); + } + }; + DataType::Dictionary(Box::new(index_type), Box::new(data_type)) + } else { + data_type + }; - let data_type = match map.get("dictionary") { - Some(dictionary) => { - let index_type = match dictionary.get("indexType") { - Some(t) => to_data_type(t, vec![])?, - _ => { - return Err(ArrowError::Schema( - "Field missing 'indexType' attribute".to_string(), - )); - } - }; - dict_id = match dictionary.get("id") { - Some(Value::Number(n)) => n.as_i64().unwrap(), - _ => { - return Err(ArrowError::Schema( - "Field missing 'id' attribute".to_string(), - )); - } - }; - dict_is_ordered = match dictionary.get("isOrdered") { - Some(&Value::Bool(n)) => n, - _ => { - return Err(ArrowError::Schema( - "Field missing 'isOrdered' attribute".to_string(), - )); - } - }; - DataType::Dictionary(Box::new(index_type), Box::new(data_type)) - } - _ => data_type, + let (dict_id, dict_is_ordered) = if let Some(dictionary) = map.get("dictionary") { + let dict_id = match dictionary.get("id") { + Some(Value::Number(n)) => n.as_i64().unwrap(), + _ => { + return Err(ArrowError::Schema( + "Field missing 'id' attribute".to_string(), + )); + } + }; + let dict_is_ordered = match dictionary.get("isOrdered") { + Some(&Value::Bool(n)) => n, + _ => { + return Err(ArrowError::Schema( + "Field missing 'isOrdered' attribute".to_string(), + )); + } + }; + (dict_id, dict_is_ordered) + } else { + (0, false) }; let mut f = Field::new_dict(&name, data_type, nullable, dict_id, dict_is_ordered); f.set_metadata(metadata); diff --git a/src/scalar/mod.rs b/src/scalar/mod.rs index 39bb9a858ca..da3f2241af3 100644 --- a/src/scalar/mod.rs +++ b/src/scalar/mod.rs @@ -29,21 +29,6 @@ pub trait Scalar: std::fmt::Debug { fn data_type(&self) -> &DataType; } -macro_rules! dyn_new { - ($array:expr, $index:expr, $type:ty) => {{ - let array = $array - .as_any() - .downcast_ref::>() - .unwrap(); - let value = if array.is_valid($index) { - Some(array.value($index)) - } else { - None - }; - Box::new(PrimitiveScalar::new(array.data_type().clone(), value)) - }}; -} - macro_rules! dyn_new_utf8 { ($array:expr, $index:expr, $type:ty) => {{ let array = $array.as_any().downcast_ref::>().unwrap(); @@ -85,8 +70,8 @@ macro_rules! dyn_new_list { /// creates a new [`Scalar`] from an [`Array`]. pub fn new_scalar(array: &dyn Array, index: usize) -> Box { - use DataType::*; - match array.data_type() { + use PhysicalType::*; + match array.data_type().to_physical_type() { Null => Box::new(NullScalar::new()), Boolean => { let array = array.as_any().downcast_ref::().unwrap(); @@ -97,28 +82,25 @@ pub fn new_scalar(array: &dyn Array, index: usize) -> Box { }; Box::new(BooleanScalar::new(value)) } - Int8 => dyn_new!(array, index, i8), - Int16 => dyn_new!(array, index, i16), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { - dyn_new!(array, index, i32) - } - Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => dyn_new!(array, index, i64), - Interval(IntervalUnit::DayTime) => dyn_new!(array, index, days_ms), - UInt8 => dyn_new!(array, index, u8), - UInt16 => dyn_new!(array, index, u16), - UInt32 => dyn_new!(array, index, u32), - UInt64 => dyn_new!(array, index, u64), - Decimal(_, _) => dyn_new!(array, index, i128), - Float16 => unreachable!(), - Float32 => dyn_new!(array, index, f32), - Float64 => dyn_new!(array, index, f64), + Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { + let array = array + .as_any() + .downcast_ref::>() + .unwrap(); + let value = if array.is_valid(index) { + Some(array.value(index)) + } else { + None + }; + Box::new(PrimitiveScalar::new(array.data_type().clone(), value)) + }), Utf8 => dyn_new_utf8!(array, index, i32), LargeUtf8 => dyn_new_utf8!(array, index, i64), Binary => dyn_new_binary!(array, index, i32), LargeBinary => dyn_new_binary!(array, index, i64), - List(_) => dyn_new_list!(array, index, i32), - LargeList(_) => dyn_new_list!(array, index, i64), - Struct(_) => { + List => dyn_new_list!(array, index, i32), + LargeList => dyn_new_list!(array, index, i64), + Struct => { let array = array.as_any().downcast_ref::().unwrap(); if array.is_valid(index) { let values = array @@ -131,9 +113,9 @@ pub fn new_scalar(array: &dyn Array, index: usize) -> Box { Box::new(StructScalar::new(array.data_type().clone(), None)) } } - FixedSizeBinary(_) => todo!(), - FixedSizeList(_, _) => todo!(), - Union(_, _, _) => todo!(), - Dictionary(_, _) => todo!(), + FixedSizeBinary => todo!(), + FixedSizeList => todo!(), + Union => todo!(), + Dictionary(_) => todo!(), } } diff --git a/tests/it/array/fixed_size_binary/mutable.rs b/tests/it/array/fixed_size_binary/mutable.rs index d8c27ccee53..7373182084b 100644 --- a/tests/it/array/fixed_size_binary/mutable.rs +++ b/tests/it/array/fixed_size_binary/mutable.rs @@ -5,7 +5,11 @@ use arrow2::datatypes::DataType; #[test] fn basic() { - let a = MutableFixedSizeBinaryArray::from_data(2, MutableBuffer::from([1, 2, 3, 4]), None); + let a = MutableFixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(2), + MutableBuffer::from([1, 2, 3, 4]), + None, + ); assert_eq!(a.len(), 2); assert_eq!(a.data_type(), &DataType::FixedSizeBinary(2)); assert_eq!(a.values(), &MutableBuffer::from([1, 2, 3, 4])); @@ -17,18 +21,26 @@ fn basic() { #[allow(clippy::eq_op)] #[test] fn equal() { - let a = MutableFixedSizeBinaryArray::from_data(2, MutableBuffer::from([1, 2, 3, 4]), None); + let a = MutableFixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(2), + MutableBuffer::from([1, 2, 3, 4]), + None, + ); assert_eq!(a, a); - let b = MutableFixedSizeBinaryArray::from_data(2, MutableBuffer::from([1, 2]), None); + let b = MutableFixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(2), + MutableBuffer::from([1, 2]), + None, + ); assert_eq!(b, b); assert!(a != b); let a = MutableFixedSizeBinaryArray::from_data( - 2, + DataType::FixedSizeBinary(2), MutableBuffer::from([1, 2, 3, 4]), Some(MutableBitmap::from([true, false])), ); let b = MutableFixedSizeBinaryArray::from_data( - 2, + DataType::FixedSizeBinary(2), MutableBuffer::from([1, 2, 3, 4]), Some(MutableBitmap::from([false, true])), ); diff --git a/tests/it/io/ipc/common.rs b/tests/it/io/ipc/common.rs index 6a46c03393e..8eff523789e 100644 --- a/tests/it/io/ipc/common.rs +++ b/tests/it/io/ipc/common.rs @@ -12,7 +12,7 @@ use arrow2::{ use flate2::read::GzDecoder; /// Read gzipped JSON file -pub fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec) { +pub fn read_gzip_json(version: &str, file_name: &str) -> Result<(Schema, Vec)> { let testdata = crate::test_util::arrow_test_data(); let file = File::open(format!( "{}/arrow-ipc-stream/integration/{}/{}.json.gz", @@ -23,10 +23,11 @@ pub fn read_gzip_json(version: &str, file_name: &str) -> (Schema, Vec (Schema, Vec>>() - .unwrap(); + .collect::>>()?; - (schema, batches) + Ok((schema, batches)) } pub fn read_arrow_stream(version: &str, file_name: &str) -> (Schema, Vec) { diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index 08e45b68c27..cf1920ab252 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -16,7 +16,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let reader = FileReader::new(&mut file, metadata, None); // read expected JSON output - let (schema, batches) = read_gzip_json(version, file_name); + let (schema, batches) = read_gzip_json(version, file_name)?; assert_eq!(&schema, reader.schema().as_ref()); @@ -117,6 +117,11 @@ fn read_generated_100_union() -> Result<()> { test_file("1.0.0-bigendian", "generated_union") } +#[test] +fn read_generated_100_extension() -> Result<()> { + test_file("1.0.0-littleendian", "generated_extension") +} + #[test] fn read_generated_017_union() -> Result<()> { test_file("0.17.1", "generated_union") diff --git a/tests/it/io/ipc/read/stream.rs b/tests/it/io/ipc/read/stream.rs index d6ad005e78d..7a69e191785 100644 --- a/tests/it/io/ipc/read/stream.rs +++ b/tests/it/io/ipc/read/stream.rs @@ -16,7 +16,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let reader = StreamReader::new(file, metadata); // read expected JSON output - let (schema, batches) = read_gzip_json(version, file_name); + let (schema, batches) = read_gzip_json(version, file_name)?; assert_eq!(&schema, reader.schema().as_ref()); diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 7a463938044..cc7d292b223 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -36,7 +36,7 @@ fn round_trip(batch: RecordBatch) -> Result<()> { } fn test_file(version: &str, file_name: &str) -> Result<()> { - let (schema, batches) = read_gzip_json(version, file_name); + let (schema, batches) = read_gzip_json(version, file_name)?; let mut result = Vec::::new(); @@ -56,7 +56,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let reader = FileReader::new(&mut reader, metadata, None); // read expected JSON output - let (expected_schema, expected_batches) = read_gzip_json(version, file_name); + let (expected_schema, expected_batches) = read_gzip_json(version, file_name)?; assert_eq!(schema.as_ref(), &expected_schema); diff --git a/tests/it/io/ipc/write/stream.rs b/tests/it/io/ipc/write/stream.rs index e9019d01f9b..16a41530713 100644 --- a/tests/it/io/ipc/write/stream.rs +++ b/tests/it/io/ipc/write/stream.rs @@ -30,7 +30,7 @@ fn test_file(version: &str, file_name: &str) { let schema = reader.schema().clone(); // read expected JSON output - let (expected_schema, expected_batches) = read_gzip_json(version, file_name); + let (expected_schema, expected_batches) = read_gzip_json(version, file_name).unwrap(); assert_eq!(schema.as_ref(), &expected_schema); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 2810aaf4673..31964d6d4ac 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -440,7 +440,7 @@ fn integration_read(data: &[u8]) -> Result<(Arc, Vec)> { } fn test_file(version: &str, file_name: &str) -> Result<()> { - let (schema, batches) = read_gzip_json(version, file_name); + let (schema, batches) = read_gzip_json(version, file_name)?; let data = integration_write(&schema, &batches)?;