diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index 14107ed9ac2..bd905948714 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -45,7 +45,10 @@ fn encode_dictionary( let dict_id = field.dictionary_id .ok_or_else(|| ArrowError::InvalidArgumentError("Dictionaries must have an associated id".to_string()))?; - let values = array.as_any().downcast_ref::>().unwrap().values(); + let emit = dictionary_tracker.insert(dict_id, array)?; + + let array = array.as_any().downcast_ref::>().unwrap(); + let values = array.values(); encode_dictionary(field, values, options, @@ -53,12 +56,10 @@ fn encode_dictionary( encoded_dictionaries )?; - let emit = dictionary_tracker.insert(dict_id, array)?; - if emit { - encoded_dictionaries.push(dictionary_batch_to_bytes( + encoded_dictionaries.push(dictionary_batch_to_bytes::<$T>( dict_id, - array.as_ref(), + array, options, is_native_little_endian(), )); @@ -257,9 +258,9 @@ fn columns_to_bytes(columns: &Chunk>, options: &WriteOptions) -> /// Write dictionary values into two sets of bytes, one for the header (ipc::Schema::Message) and the /// other for the data -fn dictionary_batch_to_bytes( +fn dictionary_batch_to_bytes( dict_id: i64, - array: &dyn Array, + array: &DictionaryArray, options: &WriteOptions, is_little_endian: bool, ) -> EncodedData { diff --git a/src/io/ipc/write/mod.rs b/src/io/ipc/write/mod.rs index f4afb0e451f..47167ef491a 100644 --- a/src/io/ipc/write/mod.rs +++ b/src/io/ipc/write/mod.rs @@ -7,7 +7,8 @@ mod writer; pub use common::{Compression, Record, WriteOptions}; pub use schema::schema_to_bytes; -pub use serialize::{write, write_dictionary}; +pub use serialize::write; +pub(self) use serialize::write_dictionary; pub use stream::StreamWriter; pub use writer::FileWriter; diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index d4aed19663c..40eb13a00bd 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -2,18 +2,14 @@ use arrow_format::ipc; use crate::{ - array::*, - bitmap::Bitmap, - datatypes::{DataType, PhysicalType}, - trusted_len::TrustedLen, - types::NativeType, + array::*, bitmap::Bitmap, datatypes::PhysicalType, trusted_len::TrustedLen, types::NativeType, }; use super::super::compression; use super::super::endianess::is_native_little_endian; use super::common::{pad_to_8, Compression}; -fn _write_primitive( +fn write_primitive( array: &PrimitiveArray, buffers: &mut Vec, arrow_data: &mut Vec, @@ -40,35 +36,14 @@ fn _write_primitive( ) } -fn write_primitive( - array: &dyn Array, - buffers: &mut Vec, - arrow_data: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, -) { - let array = array.as_any().downcast_ref::>().unwrap(); - _write_primitive( - array, - buffers, - arrow_data, - offset, - is_little_endian, - compression, - ); -} - fn write_boolean( - array: &dyn Array, + array: &BooleanArray, buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, _: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::().unwrap(); - write_bitmap( array.validity(), array.len(), @@ -139,14 +114,13 @@ fn write_generic_binary( } fn write_binary( - array: &dyn Array, + array: &BinaryArray, buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::>().unwrap(); write_generic_binary( array.validity(), array.offsets(), @@ -160,14 +134,13 @@ fn write_binary( } fn write_utf8( - array: &dyn Array, + array: &Utf8Array, buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::>().unwrap(); write_generic_binary( array.validity(), array.offsets(), @@ -181,17 +154,13 @@ fn write_utf8( } fn write_fixed_size_binary( - array: &dyn Array, + array: &FixedSizeBinaryArray, buffers: &mut Vec, arrow_data: &mut Vec, offset: &mut i64, _is_little_endian: bool, compression: Option, ) { - let array = array - .as_any() - .downcast_ref::() - .unwrap(); write_bitmap( array.validity(), array.len(), @@ -204,7 +173,7 @@ fn write_fixed_size_binary( } fn write_list( - array: &dyn Array, + array: &ListArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -212,7 +181,6 @@ fn write_list( is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::>().unwrap(); let offsets = array.offsets(); let validity = array.validity(); @@ -262,7 +230,7 @@ fn write_list( } pub fn write_struct( - array: &dyn Array, + array: &StructArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -270,7 +238,6 @@ pub fn write_struct( is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::().unwrap(); write_bitmap( array.validity(), array.len(), @@ -293,7 +260,7 @@ pub fn write_struct( } pub fn write_union( - array: &dyn Array, + array: &UnionArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -301,8 +268,6 @@ pub fn write_union( is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::().unwrap(); - write_buffer( array.types(), buffers, @@ -336,7 +301,7 @@ pub fn write_union( } fn write_map( - array: &dyn Array, + array: &MapArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -344,7 +309,6 @@ fn write_map( is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::().unwrap(); let offsets = array.offsets(); let validity = array.validity(); @@ -394,7 +358,7 @@ fn write_map( } fn write_fixed_size_list( - array: &dyn Array, + array: &FixedSizeListArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -402,7 +366,6 @@ fn write_fixed_size_list( is_little_endian: bool, compression: Option, ) { - let array = array.as_any().downcast_ref::().unwrap(); write_bitmap( array.validity(), array.len(), @@ -424,8 +387,8 @@ fn write_fixed_size_list( // use `write_keys` to either write keys or values #[allow(clippy::too_many_arguments)] -pub fn _write_dictionary( - array: &dyn Array, +pub(super) fn write_dictionary( + array: &DictionaryArray, buffers: &mut Vec, arrow_data: &mut Vec, nodes: &mut Vec, @@ -434,9 +397,8 @@ pub fn _write_dictionary( compression: Option, write_keys: bool, ) -> usize { - let array = array.as_any().downcast_ref::>().unwrap(); if write_keys { - _write_primitive( + write_primitive( array.keys(), buffers, arrow_data, @@ -459,37 +421,6 @@ pub fn _write_dictionary( } } -/// Writes a dictionary array -#[allow(clippy::too_many_arguments)] -pub fn write_dictionary( - array: &dyn Array, - buffers: &mut Vec, - arrow_data: &mut Vec, - nodes: &mut Vec, - offset: &mut i64, - is_little_endian: bool, - compression: Option, - write_keys: bool, -) -> usize { - match array.data_type() { - DataType::Dictionary(key_type, _, _) => { - match_integer_type!(key_type, |$T| { - _write_dictionary::<$T>( - array, - buffers, - arrow_data, - nodes, - offset, - is_little_endian, - compression, - write_keys, - ) - }) - } - _ => unreachable!(), - } -} - /// Writes an [`Array`] to `arrow_data` pub fn write( array: &dyn Array, @@ -508,7 +439,7 @@ pub fn write( match array.data_type().to_physical_type() { Null => (), Boolean => write_boolean( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -516,10 +447,11 @@ pub fn write( compression, ), Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { + let array = array.as_any().downcast_ref().unwrap(); write_primitive::<$T>(array, buffers, arrow_data, offset, is_little_endian, compression) }), Binary => write_binary::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -527,7 +459,7 @@ pub fn write( compression, ), LargeBinary => write_binary::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -535,7 +467,7 @@ pub fn write( compression, ), FixedSizeBinary => write_fixed_size_binary( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -543,7 +475,7 @@ pub fn write( compression, ), Utf8 => write_utf8::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -551,7 +483,7 @@ pub fn write( compression, ), LargeUtf8 => write_utf8::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, offset, @@ -559,7 +491,7 @@ pub fn write( compression, ), List => write_list::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -568,7 +500,7 @@ pub fn write( compression, ), LargeList => write_list::( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -577,7 +509,7 @@ pub fn write( compression, ), FixedSizeList => write_fixed_size_list( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -586,7 +518,7 @@ pub fn write( compression, ), Struct => write_struct( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -594,9 +526,9 @@ pub fn write( is_little_endian, compression, ), - Dictionary(_) => { - write_dictionary( - array, + Dictionary(key_type) => match_integer_type!(key_type, |$T| { + write_dictionary::<$T>( + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -605,10 +537,10 @@ pub fn write( compression, true, ); - } + }), Union => { write_union( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes, @@ -619,7 +551,7 @@ pub fn write( } Map => { write_map( - array, + array.as_any().downcast_ref().unwrap(), buffers, arrow_data, nodes,