diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 997b3e830a3..3971ff3e548 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -166,6 +166,11 @@ def case_nested() -> Tuple[dict, pa.Schema, str]: pa.list_(pa.field("item", pa.int64(), False)), False, ), + pa.field( + "list_int64_optional_required", + pa.list_(pa.field("item", pa.int64(), True)), + False, + ), pa.field("list_int16", pa.list_(pa.int16())), pa.field("list_bool", pa.list_(pa.bool_())), pa.field("list_utf8", pa.list_(pa.utf8())), @@ -182,6 +187,7 @@ def case_nested() -> Tuple[dict, pa.Schema, str]: "list_int64": items_nullable, "list_int64_required": items_required, "list_int64_required_required": all_required, + "list_int64_optional_required": all_required, "list_int16": i16, "list_bool": boolean, "list_utf8": string, diff --git a/src/io/parquet/write/binary/basic.rs b/src/io/parquet/write/binary/basic.rs index 277a5c192a4..7edaed8c07f 100644 --- a/src/io/parquet/write/binary/basic.rs +++ b/src/io/parquet/write/binary/basic.rs @@ -1,6 +1,5 @@ use parquet2::{ encoding::{delta_bitpacked, Encoding}, - metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, BinaryStatistics, ParquetStatistics, Statistics}, @@ -43,11 +42,11 @@ pub(crate) fn encode_plain( pub fn array_to_page( array: &BinaryArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, encoding: Encoding, ) -> Result { let validity = array.validity(); - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; utils::write_def_levels( @@ -79,7 +78,7 @@ pub fn array_to_page( } let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(build_statistics(array, type_.clone())) } else { None }; @@ -92,7 +91,7 @@ pub fn array_to_page( 0, definition_levels_byte_length, statistics, - descriptor, + type_, options, encoding, ) diff --git a/src/io/parquet/write/binary/nested.rs b/src/io/parquet/write/binary/nested.rs index a6e65e2f7e4..04a387a895c 100644 --- a/src/io/parquet/write/binary/nested.rs +++ b/src/io/parquet/write/binary/nested.rs @@ -1,4 +1,4 @@ -use parquet2::metadata::Descriptor; +use parquet2::schema::types::PrimitiveType; use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{levels, utils, WriteOptions}; @@ -12,14 +12,14 @@ use crate::{ pub fn array_to_page( array: &BinaryArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, nested: levels::NestedInfo, ) -> Result where OO: Offset, O: Offset, { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -27,13 +27,13 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(build_statistics(array, type_.clone())) } else { None }; @@ -46,7 +46,7 @@ where repetition_levels_byte_length, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/boolean/basic.rs b/src/io/parquet/write/boolean/basic.rs index 643a25cd5b2..418d67576c5 100644 --- a/src/io/parquet/write/boolean/basic.rs +++ b/src/io/parquet/write/boolean/basic.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::bitpacked_encode, Encoding}, - metadata::Descriptor, page::DataPage, + schema::types::PrimitiveType, statistics::{serialize_statistics, BooleanStatistics, ParquetStatistics, Statistics}, }; @@ -41,9 +41,9 @@ pub(super) fn encode_plain( pub fn array_to_page( array: &BooleanArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, ) -> Result { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -74,7 +74,7 @@ pub fn array_to_page( 0, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/boolean/nested.rs b/src/io/parquet/write/boolean/nested.rs index 40645eea3d2..0ebe62cd11c 100644 --- a/src/io/parquet/write/boolean/nested.rs +++ b/src/io/parquet/write/boolean/nested.rs @@ -1,4 +1,5 @@ -use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage}; +use parquet2::schema::types::PrimitiveType; +use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{levels, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; @@ -11,13 +12,13 @@ use crate::{ pub fn array_to_page( array: &BooleanArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, nested: levels::NestedInfo, ) -> Result where O: Offset, { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -25,7 +26,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer)?; @@ -44,7 +45,7 @@ where repetition_levels_byte_length, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 29c062d4a49..1fddafad29c 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -1,7 +1,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, - metadata::Descriptor, page::{EncodedDictPage, EncodedPage}, + schema::types::PrimitiveType, statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, }; @@ -27,11 +27,11 @@ use crate::{ fn encode_keys( array: &PrimitiveArray, validity: Option<&Bitmap>, - descriptor: Descriptor, + type_: PrimitiveType, statistics: ParquetStatistics, options: WriteOptions, ) -> Result { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; @@ -107,7 +107,7 @@ fn encode_keys( 0, definition_levels_byte_length, Some(statistics), - descriptor, + type_, options, Encoding::RleDictionary, ) @@ -115,13 +115,12 @@ fn encode_keys( } macro_rules! dyn_prim { - ($from:ty, $to:ty, $array:expr, $options:expr, $descriptor:expr) => {{ + ($from:ty, $to:ty, $array:expr, $options:expr, $type_:expr) => {{ let values = $array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - let stats = - primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()); + let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); let stats = serialize_statistics(&stats); (EncodedDictPage::new(buffer, values.len()), stats) }}; @@ -129,7 +128,7 @@ macro_rules! dyn_prim { pub fn array_to_pages( array: &DictionaryArray, - descriptor: Descriptor, + type_: PrimitiveType, options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -137,28 +136,28 @@ pub fn array_to_pages( Encoding::PlainDictionary | Encoding::RleDictionary => { // write DictPage let (dict_page, statistics) = match array.values().data_type().to_logical_type() { - DataType::Int8 => dyn_prim!(i8, i32, array, options, descriptor), - DataType::Int16 => dyn_prim!(i16, i32, array, options, descriptor), + DataType::Int8 => dyn_prim!(i8, i32, array, options, type_), + DataType::Int16 => dyn_prim!(i16, i32, array, options, type_), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { - dyn_prim!(i32, i32, array, options, descriptor) + dyn_prim!(i32, i32, array, options, type_) } DataType::Int64 | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => dyn_prim!(i64, i64, array, options, descriptor), - DataType::UInt8 => dyn_prim!(u8, i32, array, options, descriptor), - DataType::UInt16 => dyn_prim!(u16, i32, array, options, descriptor), - DataType::UInt32 => dyn_prim!(u32, i32, array, options, descriptor), - DataType::UInt64 => dyn_prim!(i64, i64, array, options, descriptor), - DataType::Float32 => dyn_prim!(f32, f32, array, options, descriptor), - DataType::Float64 => dyn_prim!(f64, f64, array, options, descriptor), + | DataType::Duration(_) => dyn_prim!(i64, i64, array, options, type_), + DataType::UInt8 => dyn_prim!(u8, i32, array, options, type_), + DataType::UInt16 => dyn_prim!(u16, i32, array, options, type_), + DataType::UInt32 => dyn_prim!(u32, i32, array, options, type_), + DataType::UInt64 => dyn_prim!(i64, i64, array, options, type_), + DataType::Float32 => dyn_prim!(f32, f32, array, options, type_), + DataType::Float64 => dyn_prim!(f64, f64, array, options, type_), DataType::Utf8 => { let array = array.values().as_any().downcast_ref().unwrap(); let mut buffer = vec![]; utf8_encode_plain::(array, false, &mut buffer); - let stats = utf8_build_statistics(array, descriptor.primitive_type.clone()); + let stats = utf8_build_statistics(array, type_.clone()); (EncodedDictPage::new(buffer, array.len()), stats) } DataType::LargeUtf8 => { @@ -166,7 +165,7 @@ pub fn array_to_pages( let mut buffer = vec![]; utf8_encode_plain::(array, false, &mut buffer); - let stats = utf8_build_statistics(array, descriptor.primitive_type.clone()); + let stats = utf8_build_statistics(array, type_.clone()); (EncodedDictPage::new(buffer, array.len()), stats) } DataType::Binary => { @@ -174,7 +173,7 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(array, false, &mut buffer); - let stats = binary_build_statistics(array, descriptor.primitive_type.clone()); + let stats = binary_build_statistics(array, type_.clone()); (EncodedDictPage::new(buffer, array.len()), stats) } DataType::LargeBinary => { @@ -182,15 +181,14 @@ pub fn array_to_pages( let mut buffer = vec![]; binary_encode_plain::(array, false, &mut buffer); - let stats = binary_build_statistics(array, descriptor.primitive_type.clone()); + let stats = binary_build_statistics(array, type_.clone()); (EncodedDictPage::new(buffer, array.len()), stats) } DataType::FixedSizeBinary(_) => { let mut buffer = vec![]; let array = array.values().as_any().downcast_ref().unwrap(); fixed_binary_encode_plain(array, false, &mut buffer); - let stats = - fixed_binary_build_statistics(array, descriptor.primitive_type.clone()); + let stats = fixed_binary_build_statistics(array, type_.clone()); let stats = serialize_statistics(&stats); (EncodedDictPage::new(buffer, array.len()), stats) } @@ -207,7 +205,7 @@ pub fn array_to_pages( let data_page = encode_keys( array.keys(), array.values().validity(), - descriptor, + type_, statistics, options, )?; diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index a2494f82e38..db871bb817e 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -1,6 +1,5 @@ use parquet2::{ encoding::Encoding, - metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, FixedLenStatistics}, @@ -29,10 +28,10 @@ pub(crate) fn encode_plain(array: &FixedSizeBinaryArray, is_optional: bool, buff pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, statistics: Option, ) -> Result { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); let mut buffer = vec![]; @@ -56,7 +55,7 @@ pub fn array_to_page( 0, definition_levels_byte_length, statistics.map(|x| serialize_statistics(&x)), - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/levels.rs b/src/io/parquet/write/levels.rs index 68d3c0b343c..26e6a25cd7f 100644 --- a/src/io/parquet/write/levels.rs +++ b/src/io/parquet/write/levels.rs @@ -1,11 +1,7 @@ use parquet2::encoding::hybrid_rle::encode_u32; use parquet2::write::Version; -use crate::{ - array::Offset, - bitmap::{utils::BitmapIter, Bitmap}, - error::Result, -}; +use crate::{array::Offset, bitmap::Bitmap, error::Result}; pub fn num_values(offsets: &[O]) -> usize { offsets @@ -72,88 +68,53 @@ impl Iterator for RepLevelsIter<'_, O> { } } -enum OffsetsIter<'a, O> { - Optional(std::iter::Zip, BitmapIter<'a>>), - Required(std::slice::Windows<'a, O>), -} - /// Iterator adapter of parquet / dremel definition levels -pub struct DefLevelsIter<'a, O: Offset> { - iter: OffsetsIter<'a, O>, - primitive_validity: Option>, +pub struct DefLevelsIter<'a, O: Offset, II: Iterator, I: Iterator> { + iter: std::iter::Zip, II>, + primitive_validity: I, remaining: usize, - is_valid: bool, - length: usize, + is_valid: u32, total_size: usize, } -impl<'a, O: Offset> DefLevelsIter<'a, O> { - pub fn new( - offsets: &'a [O], - validity: Option<&'a Bitmap>, - primitive_validity: Option<&'a Bitmap>, - ) -> Self { +impl<'a, O: Offset, II: Iterator, I: Iterator> DefLevelsIter<'a, O, II, I> { + pub fn new(offsets: &'a [O], validity: II, primitive_validity: I) -> Self { let total_size = num_values(offsets); - let primitive_validity = primitive_validity.map(|x| x.iter()); - - let iter = validity - .map(|x| OffsetsIter::Optional(offsets.windows(2).zip(x.iter()))) - .unwrap_or_else(|| OffsetsIter::Required(offsets.windows(2))); + let iter = offsets.windows(2).zip(validity); Self { iter, primitive_validity, remaining: 0, - length: 0, - is_valid: false, + is_valid: 0, total_size, } } } -impl Iterator for DefLevelsIter<'_, O> { +impl, I: Iterator> Iterator + for DefLevelsIter<'_, O, II, I> +{ type Item = u32; fn next(&mut self) -> Option { - if self.remaining == self.length { - match &mut self.iter { - OffsetsIter::Optional(iter) => { - let (w, is_valid) = iter.next()?; - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.length = end - start; - self.remaining = 0; - self.is_valid = is_valid; - if self.length == 0 { - self.total_size -= 1; - return Some(self.is_valid as u32); - } - } - OffsetsIter::Required(iter) => { - let w = iter.next()?; - let start = w[0].to_usize(); - let end = w[1].to_usize(); - self.length = end - start; - self.remaining = 0; - self.is_valid = true; - if self.length == 0 { - self.total_size -= 1; - return Some(0); - } - } + if self.remaining == 0 { + let (w, is_valid) = self.iter.next()?; + let start = w[0].to_usize(); + let end = w[1].to_usize(); + self.remaining = end - start; + self.is_valid = is_valid + 1; + if self.remaining == 0 { + self.total_size -= 1; + return Some(self.is_valid - 1); } } - self.remaining += 1; + self.remaining -= 1; self.total_size -= 1; - let (base_def, p_is_valid) = self - .primitive_validity - .as_mut() - .map(|x| (1, x.next().unwrap() as u32)) - .unwrap_or((0, 0)); - let def_ = (base_def + 1) * self.is_valid as u32 + p_is_valid; - Some(def_) + let p_is_valid = self.primitive_validity.next().unwrap_or_default(); + Some(self.is_valid + p_is_valid) } fn size_hint(&self) -> (usize, Option) { @@ -163,7 +124,7 @@ impl Iterator for DefLevelsIter<'_, O> { #[derive(Debug)] pub struct NestedInfo<'a, O: Offset> { - _is_optional: bool, + is_optional: bool, offsets: &'a [O], validity: Option<&'a Bitmap>, } @@ -171,7 +132,7 @@ pub struct NestedInfo<'a, O: Offset> { impl<'a, O: Offset> NestedInfo<'a, O> { pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self { Self { - _is_optional: is_optional, + is_optional, offsets, validity, } @@ -182,7 +143,7 @@ impl<'a, O: Offset> NestedInfo<'a, O> { } } -fn write_levels_v1) -> Result<()>>( +fn write_levels_v1) -> Result<()>>( buffer: &mut Vec, encode: F, ) -> Result<()> { @@ -226,30 +187,107 @@ pub fn write_rep_levels( Ok(()) } +fn write_def_levels1>( + buffer: &mut Vec, + levels: I, + num_bits: u8, + version: Version, +) -> Result<()> { + match version { + Version::V1 => write_levels_v1(buffer, move |buffer: &mut Vec| { + encode_u32(buffer, levels, num_bits)?; + Ok(()) + }), + Version::V2 => Ok(encode_u32(buffer, levels, num_bits)?), + } +} + /// writes the rep levels to a `Vec`. pub fn write_def_levels( buffer: &mut Vec, nested: &NestedInfo, validity: Option<&Bitmap>, + primitive_is_optional: bool, version: Version, ) -> Result<()> { - let num_bits = 1 + validity.is_some() as u8; - - match version { - Version::V1 => { - write_levels_v1(buffer, |buffer: &mut Vec| { - let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity); - encode_u32(buffer, levels, num_bits)?; - Ok(()) - })?; + let mut num_bits = 1 + nested.is_optional as u8 + primitive_is_optional as u8; + if num_bits == 3 { + // brute-force log2 - this needs to be generalized for e.g. list of list + num_bits = 2 + }; + + // this match ensures that irrespectively of the arrays' validities, we write def levels + // that are consistent with the declared parquet schema. + // see comments on some of the variants + match ( + nested.is_optional, + nested.validity.as_ref(), + primitive_is_optional, + validity.as_ref(), + ) { + // if the validity is optional and there is no validity in the array, we + // need to write 1 to mark the fields as valid + (true, None, true, None) => { + let nested_validity = std::iter::repeat(1); + let validity = std::iter::repeat(1); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) } - Version::V2 => { - let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity); - encode_u32(buffer, levels, num_bits)?; + (true, Some(nested_validity), true, None) => { + let levels = DefLevelsIter::new( + nested.offsets, + nested_validity.iter().map(|x| x as u32), + std::iter::repeat(1), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, None, true, Some(validity)) => { + let levels = DefLevelsIter::new( + nested.offsets, + std::iter::repeat(1), + validity.iter().map(|x| x as u32), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, Some(nested_validity), true, Some(validity)) => { + let levels = DefLevelsIter::new( + nested.offsets, + nested_validity.iter().map(|x| x as u32), + validity.iter().map(|x| x as u32), + ); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, None, false, _) => { + let nested_validity = std::iter::repeat(1); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (true, Some(nested_validity), false, _) => { + let nested_validity = nested_validity.iter().map(|x| x as u32); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, true, None) => { + let nested_validity = std::iter::repeat(0); + let validity = std::iter::repeat(1); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, true, Some(validity)) => { + let nested_validity = std::iter::repeat(0); + let validity = validity.iter().map(|x| x as u32); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) + } + (false, _, false, _) => { + let nested_validity = std::iter::repeat(0); + let validity = std::iter::repeat(0); + let levels = DefLevelsIter::new(nested.offsets, nested_validity, validity); + write_def_levels1(buffer, levels, num_bits, version) } } - - Ok(()) } #[cfg(test)] @@ -269,20 +307,21 @@ mod tests { fn test_def_levels() { // [[0, 1], None, [2, None, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = Some(Bitmap::from([ - true, false, true, true, true, true, false, true, - ])); - let primitive_validity = Some(Bitmap::from([ + let validity = [true, false, true, true, true, true, false, true] + .into_iter() + .map(|x| x as u32); + let primitive_validity = [ true, true, //[0, 1] true, false, true, //[2, None, 3] true, true, true, //[4, 5, 6] true, true, true, //[7, 8, 9] true, //[10] - ])); + ] + .into_iter() + .map(|x| x as u32); let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3]; - let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref()) - .collect::>(); + let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); assert_eq!(result, expected) } @@ -290,12 +329,11 @@ mod tests { fn test_def_levels1() { // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref(); - let validity = None; - let primitive_validity = None; + let validity = std::iter::repeat(0); + let primitive_validity = std::iter::repeat(0); let expected = vec![1u32, 1, 0, 1, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1]; - let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref()) - .collect::>(); + let result = DefLevelsIter::new(offsets, validity, primitive_validity).collect::>(); assert_eq!(result, expected) } } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 7e6fcc16ba0..94fa584d5b7 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -22,6 +22,7 @@ use crate::types::days_ms; use crate::types::NativeType; use parquet2::page::DataPage; +use parquet2::schema::types::PrimitiveType as ParquetPrimitiveType; pub use parquet2::{ compression::CompressionOptions, encoding::Encoding, @@ -90,7 +91,7 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { /// Returns an iterator of [`EncodedPage`]. pub fn array_to_pages( array: &dyn Array, - descriptor: Descriptor, + type_: ParquetType, options: WriteOptions, encoding: Encoding, ) -> Result>> { @@ -105,8 +106,8 @@ pub fn array_to_pages( let right = array.slice(split_at, array.len() - split_at); Ok(DynIter::new( - array_to_pages(&*left, descriptor.clone(), options, encoding)? - .chain(array_to_pages(&*right, descriptor, options, encoding)?), + array_to_pages(&*left, type_.clone(), options, encoding)? + .chain(array_to_pages(&*right, type_, options, encoding)?), )) } else { match array.data_type() { @@ -114,22 +115,33 @@ pub fn array_to_pages( match_integer_type!(key_type, |$T| { dictionary::array_to_pages::<$T>( array.as_any().downcast_ref().unwrap(), - descriptor, + get_primitive(type_)?, options, encoding, ) }) } - _ => array_to_page(array, descriptor, options, encoding) + _ => array_to_page(array, type_, options, encoding) .map(|page| DynIter::new(std::iter::once(Ok(page)))), } } } +fn get_primitive(type_: ParquetType) -> Result { + if let ParquetType::PrimitiveType(t) = type_ { + Ok(t) + } else { + Err(ArrowError::InvalidArgumentError(format!( + "The {:?} is not a primitive type but it is trying to describe a primitive array", + type_ + ))) + } +} + /// Converts an [`Array`] to a [`CompressedPage`] based on options, descriptor and `encoding`. pub fn array_to_page( array: &dyn Array, - descriptor: Descriptor, + type_: ParquetType, options: WriteOptions, encoding: Encoding, ) -> Result { @@ -143,44 +155,45 @@ pub fn array_to_page( match data_type.to_logical_type() { DataType::Boolean => { - boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, descriptor) + let type_ = get_primitive(type_)?; + boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_) } // casts below MUST match the casts done at the metadata (field -> parquet type). DataType::UInt8 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::UInt16 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::UInt32 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::UInt64 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Int8 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Int16 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ) } DataType::Int64 @@ -190,47 +203,48 @@ pub fn array_to_page( | DataType::Duration(_) => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Float32 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Float64 => primitive::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, ), DataType::Utf8 => utf8::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, encoding, ), DataType::LargeUtf8 => utf8::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, encoding, ), DataType::Binary => binary::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, encoding, ), DataType::LargeBinary => binary::array_to_page::( array.as_any().downcast_ref().unwrap(), options, - descriptor, + get_primitive(type_)?, encoding, ), DataType::Null => { let array = Int32Array::new_null(DataType::Int32, array.len()); - primitive::array_to_page::(&array, options, descriptor) + primitive::array_to_page::(&array, options, get_primitive(type_)?) } DataType::Interval(IntervalUnit::YearMonth) => { + let type_ = get_primitive(type_)?; let array = array .as_any() .downcast_ref::>() @@ -247,16 +261,14 @@ pub fn array_to_page( array.validity().cloned(), ); let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics( - &array, - descriptor.primitive_type.clone(), - )) + Some(fixed_len_bytes::build_statistics(&array, type_.clone())) } else { None }; - fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) + fixed_len_bytes::array_to_page(&array, options, type_, statistics) } DataType::Interval(IntervalUnit::DayTime) => { + let type_ = get_primitive(type_)?; let array = array .as_any() .downcast_ref::>() @@ -273,29 +285,25 @@ pub fn array_to_page( array.validity().cloned(), ); let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics( - &array, - descriptor.primitive_type.clone(), - )) + Some(fixed_len_bytes::build_statistics(&array, type_.clone())) } else { None }; - fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) + fixed_len_bytes::array_to_page(&array, options, type_, statistics) } DataType::FixedSizeBinary(_) => { + let type_ = get_primitive(type_)?; let array = array.as_any().downcast_ref().unwrap(); let statistics = if options.write_statistics { - Some(fixed_len_bytes::build_statistics( - array, - descriptor.primitive_type.clone(), - )) + Some(fixed_len_bytes::build_statistics(array, type_.clone())) } else { None }; - fixed_len_bytes::array_to_page(array, options, descriptor, statistics) + fixed_len_bytes::array_to_page(array, options, type_, statistics) } DataType::Decimal(precision, _) => { + let type_ = get_primitive(type_)?; let precision = *precision; let array = array .as_any() @@ -311,7 +319,7 @@ pub fn array_to_page( let array = PrimitiveArray::::new(DataType::Int32, values, array.validity().cloned()); - primitive::array_to_page::(&array, options, descriptor) + primitive::array_to_page::(&array, options, type_) } else if precision <= 18 { let values = array .values() @@ -322,16 +330,13 @@ pub fn array_to_page( let array = PrimitiveArray::::new(DataType::Int64, values, array.validity().cloned()); - primitive::array_to_page::(&array, options, descriptor) + primitive::array_to_page::(&array, options, type_) } else { let size = decimal_length_from_precision(precision); let statistics = if options.write_statistics { - let stats = fixed_len_bytes::build_statistics_decimal( - array, - descriptor.primitive_type.clone(), - size, - ); + let stats = + fixed_len_bytes::build_statistics_decimal(array, type_.clone(), size); Some(stats) } else { None @@ -347,11 +352,11 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) + fixed_len_bytes::array_to_page(&array, options, type_, statistics) } } DataType::FixedSizeList(_, _) | DataType::List(_) | DataType::LargeList(_) => { - nested_array_to_page(array, descriptor, options) + nested_array_to_page(array, type_, options) } other => Err(ArrowError::NotYetImplemented(format!( "Writing parquet V1 pages for data type {:?}", @@ -378,50 +383,69 @@ fn list_array_to_page( offsets: &[O], validity: Option<&Bitmap>, values: &dyn Array, - descriptor: Descriptor, + type_: ParquetType, options: WriteOptions, ) -> Result { use DataType::*; - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + + let is_optional = is_nullable(type_.get_field_info()); + + let type_ = if let ParquetType::GroupType { mut fields, .. } = type_ { + let inner = fields.pop().unwrap(); + if let ParquetType::GroupType { mut fields, .. } = inner { + get_primitive(fields.pop().unwrap())? + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "The {:?} is not a valid inner type of a list but it is trying to describe a list array", + inner + ))); + } + } else { + return Err(ArrowError::InvalidArgumentError(format!( + "The {:?} is not a group type but it is trying to describe a list array", + type_ + ))); + }; + let nested = NestedInfo::new(offsets, validity, is_optional); match values.data_type() { Boolean => { let values = values.as_any().downcast_ref().unwrap(); - boolean::nested_array_to_page::(values, options, descriptor, nested) + boolean::nested_array_to_page::(values, options, type_, nested) } - UInt8 => dyn_nested_prim!(u8, i32, O, values, nested, descriptor, options), - UInt16 => dyn_nested_prim!(u16, i32, O, values, nested, descriptor, options), - UInt32 => dyn_nested_prim!(u32, i32, O, values, nested, descriptor, options), - UInt64 => dyn_nested_prim!(u64, i64, O, values, nested, descriptor, options), + UInt8 => dyn_nested_prim!(u8, i32, O, values, nested, type_, options), + UInt16 => dyn_nested_prim!(u16, i32, O, values, nested, type_, options), + UInt32 => dyn_nested_prim!(u32, i32, O, values, nested, type_, options), + UInt64 => dyn_nested_prim!(u64, i64, O, values, nested, type_, options), - Int8 => dyn_nested_prim!(i8, i32, O, values, nested, descriptor, options), - Int16 => dyn_nested_prim!(i16, i32, O, values, nested, descriptor, options), + Int8 => dyn_nested_prim!(i8, i32, O, values, nested, type_, options), + Int16 => dyn_nested_prim!(i16, i32, O, values, nested, type_, options), Int32 | Date32 | Time32(_) => { - dyn_nested_prim!(i32, i32, O, values, nested, descriptor, options) + dyn_nested_prim!(i32, i32, O, values, nested, type_, options) } Int64 | Date64 | Time64(_) | Timestamp(_, _) | Duration(_) => { - dyn_nested_prim!(i64, i64, O, values, nested, descriptor, options) + dyn_nested_prim!(i64, i64, O, values, nested, type_, options) } - Float32 => dyn_nested_prim!(f32, f32, O, values, nested, descriptor, options), - Float64 => dyn_nested_prim!(f64, f64, O, values, nested, descriptor, options), + Float32 => dyn_nested_prim!(f32, f32, O, values, nested, type_, options), + Float64 => dyn_nested_prim!(f64, f64, O, values, nested, type_, options), Utf8 => { let values = values.as_any().downcast_ref().unwrap(); - utf8::nested_array_to_page::(values, options, descriptor, nested) + utf8::nested_array_to_page::(values, options, type_, nested) } LargeUtf8 => { let values = values.as_any().downcast_ref().unwrap(); - utf8::nested_array_to_page::(values, options, descriptor, nested) + utf8::nested_array_to_page::(values, options, type_, nested) } Binary => { let values = values.as_any().downcast_ref().unwrap(); - binary::nested_array_to_page::(values, options, descriptor, nested) + binary::nested_array_to_page::(values, options, type_, nested) } LargeBinary => { let values = values.as_any().downcast_ref().unwrap(); - binary::nested_array_to_page::(values, options, descriptor, nested) + binary::nested_array_to_page::(values, options, type_, nested) } _ => todo!(), } @@ -429,7 +453,7 @@ fn list_array_to_page( fn nested_array_to_page( array: &dyn Array, - descriptor: Descriptor, + type_: ParquetType, options: WriteOptions, ) -> Result { match array.data_type() { @@ -439,7 +463,7 @@ fn nested_array_to_page( array.offsets(), array.validity(), array.values().as_ref(), - descriptor, + type_, options, ) } @@ -449,7 +473,7 @@ fn nested_array_to_page( array.offsets(), array.validity(), array.values().as_ref(), - descriptor, + type_, options, ) } @@ -462,7 +486,7 @@ fn nested_array_to_page( &offsets, array.validity(), array.values().as_ref(), - descriptor, + type_, options, ) } diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 70c54f2ad00..36f1f20e1c5 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,6 +1,5 @@ use parquet2::{ encoding::Encoding, - metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, PrimitiveStatistics}, @@ -42,14 +41,14 @@ where pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, ) -> Result where T: ArrowNativeType, R: NativeType, T: num_traits::AsPrimitive, { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -69,7 +68,7 @@ where let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( array, - descriptor.primitive_type.clone(), + type_.clone(), ))) } else { None @@ -83,7 +82,7 @@ where 0, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 6acd467c080..19871fa3f41 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,5 +1,6 @@ +use parquet2::schema::types::PrimitiveType; use parquet2::statistics::serialize_statistics; -use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType}; +use parquet2::{encoding::Encoding, page::DataPage, types::NativeType}; use super::super::levels; use super::super::utils; @@ -15,7 +16,7 @@ use crate::{ pub fn array_to_page( array: &PrimitiveArray, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, nested: levels::NestedInfo, ) -> Result where @@ -24,7 +25,7 @@ where T: num_traits::AsPrimitive, O: Offset, { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -32,7 +33,7 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); @@ -40,7 +41,7 @@ where let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( array, - descriptor.primitive_type.clone(), + type_.clone(), ))) } else { None @@ -54,7 +55,7 @@ where repetition_levels_byte_length, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/row_group.rs b/src/io/parquet/write/row_group.rs index 5c419640395..06c57562319 100644 --- a/src/io/parquet/write/row_group.rs +++ b/src/io/parquet/write/row_group.rs @@ -1,5 +1,6 @@ +use parquet2::schema::types::ParquetType; +use parquet2::write::Compressor; use parquet2::FallibleStreamingIterator; -use parquet2::{metadata::ColumnDescriptor, write::Compressor}; use crate::{ array::Array, @@ -18,25 +19,23 @@ use super::{ pub fn row_group_iter + 'static + Send + Sync>( chunk: Chunk, encodings: Vec, - columns: Vec, + fields: Vec, options: WriteOptions, ) -> RowGroupIter<'static, ArrowError> { DynIter::new( chunk .into_arrays() .into_iter() - .zip(columns.into_iter()) + .zip(fields.into_iter()) .zip(encodings.into_iter()) - .map(move |((array, descriptor), encoding)| { - array_to_pages(array.as_ref(), descriptor.descriptor, options, encoding).map( - move |pages| { - let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); - let compressed_pages = - Compressor::new(encoded_pages, options.compression, vec![]) - .map_err(ArrowError::from); - DynStreamingIterator::new(compressed_pages) - }, - ) + .map(move |((array, type_), encoding)| { + array_to_pages(array.as_ref(), type_, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = + Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }) }), ) } @@ -91,7 +90,7 @@ impl + 'static + Send + Sync, I: Iterator( pub fn array_to_page( array: &Utf8Array, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, encoding: Encoding, ) -> Result { let validity = array.validity(); - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let mut buffer = vec![]; utils::write_def_levels( @@ -78,7 +77,7 @@ pub fn array_to_page( } let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(build_statistics(array, type_.clone())) } else { None }; @@ -91,7 +90,7 @@ pub fn array_to_page( 0, definition_levels_byte_length, statistics, - descriptor, + type_, options, encoding, ) diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 32b83cc0d7e..7637978d272 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,4 +1,5 @@ -use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage}; +use parquet2::schema::types::PrimitiveType; +use parquet2::{encoding::Encoding, page::DataPage}; use super::super::{levels, utils, WriteOptions}; use super::basic::{build_statistics, encode_plain}; @@ -11,14 +12,14 @@ use crate::{ pub fn array_to_page( array: &Utf8Array, options: WriteOptions, - descriptor: Descriptor, + type_: PrimitiveType, nested: levels::NestedInfo, ) -> Result where OO: Offset, O: Offset, { - let is_optional = is_nullable(&descriptor.primitive_type.field_info); + let is_optional = is_nullable(&type_.field_info); let validity = array.validity(); @@ -26,13 +27,13 @@ where levels::write_rep_levels(&mut buffer, &nested, options.version)?; let repetition_levels_byte_length = buffer.len(); - levels::write_def_levels(&mut buffer, &nested, validity, options.version)?; + levels::write_def_levels(&mut buffer, &nested, validity, is_optional, options.version)?; let definition_levels_byte_length = buffer.len() - repetition_levels_byte_length; encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(build_statistics(array, type_.clone())) } else { None }; @@ -45,7 +46,7 @@ where repetition_levels_byte_length, definition_levels_byte_length, statistics, - descriptor, + type_, options, Encoding::Plain, ) diff --git a/src/io/parquet/write/utils.rs b/src/io/parquet/write/utils.rs index 011f3db8c27..0e6ab465f56 100644 --- a/src/io/parquet/write/utils.rs +++ b/src/io/parquet/write/utils.rs @@ -5,6 +5,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_bool, Encoding}, metadata::Descriptor, page::{DataPage, DataPageHeader, DataPageHeaderV1, DataPageHeaderV2}, + schema::types::PrimitiveType, statistics::ParquetStatistics, }; @@ -66,7 +67,7 @@ pub fn build_plain_page( repetition_levels_byte_length: usize, definition_levels_byte_length: usize, statistics: Option, - descriptor: Descriptor, + type_: PrimitiveType, options: WriteOptions, encoding: Encoding, ) -> Result { @@ -93,7 +94,11 @@ pub fn build_plain_page( header, buffer, None, - descriptor, + Descriptor { + primitive_type: type_, + max_def_level: 0, + max_rep_level: 0, + }, Some(num_rows), )) } diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7804804e90b..225f19df9c8 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -88,7 +88,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { Some(10), ])) as Arc } - "list_int64_required" | "list_int64_required_required" => { + "list_int64_required" | "list_int64_optional_required" | "list_int64_required_required" => { // [[0, 1], None, [2, 0, 3], [4, 5, 6], [], [7, 8, 9], None, [10]] Arc::new(PrimitiveArray::::from(&[ Some(0), @@ -178,7 +178,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { | "list_nested_inner_required_required_i64" => { Arc::new(NullArray::from_data(DataType::Null, 1)) } - _ => unreachable!(), + other => unreachable!("{}", other), }; match column { @@ -189,6 +189,13 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { data_type, offsets, values, None, )) } + "list_int64_optional_required" => { + // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] + let data_type = DataType::List(Box::new(Field::new("item", DataType::Int64, true))); + Box::new(ListArray::::from_data( + data_type, offsets, values, None, + )) + } "list_nested_i64" => { // [[0, 1]], None, [[2, None], [3]], [[4, 5], [6]], [], [[7], None, [9]], [[], [None], None], [[10]] let data = [ @@ -253,7 +260,7 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { "list_bool" => Field::new("item", DataType::Boolean, true), "list_utf8" => Field::new("item", DataType::Utf8, true), "list_large_binary" => Field::new("item", DataType::LargeBinary, true), - _ => unreachable!(), + other => unreachable!("{}", other), }; let validity = Some(Bitmap::from([ @@ -594,7 +601,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, - "list_int64_required_required" => Statistics { + "list_int64_required_required" | "list_int64_optional_required" => Statistics { distinct_count: Count::Single(UInt64Array::from([None])), null_count: Count::Single([Some(0)].into()), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), @@ -636,7 +643,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { true, ), }, - _ => todo!(), + other => todo!("{}", other), } } @@ -918,3 +925,62 @@ fn arrow_type() -> Result<()> { assert_eq!(new_batches, vec![batch]); Ok(()) } + +fn data() -> Vec>>> { + // [[0, 1], [], [2, 0, 3], [4, 5, 6], [], [7, 8, 9], [], [10]] + vec![ + Some(vec![Some(0), Some(1)]), + Some(vec![]), + Some(vec![Some(2), Some(0), Some(3)]), + Some(vec![Some(4), Some(5), Some(6)]), + Some(vec![]), + Some(vec![Some(7), Some(8), Some(9)]), + Some(vec![]), + Some(vec![Some(10)]), + ] +} + +fn list_array_generic(inner_is_nullable: bool, is_nullable: bool) -> Result<()> { + let mut array = MutableListArray::::new_with_field( + MutablePrimitiveArray::::new(), + "item", + inner_is_nullable, + ); + array.try_extend(data()).unwrap(); + let array: ListArray = array.into(); + + let schema = Schema::from(vec![Field::new( + "a1", + array.data_type().clone(), + is_nullable, + )]); + let batch = Chunk::try_new(vec![Arc::new(array) as Arc])?; + + let r = integration_write(&schema, &[batch.clone()])?; + + let (new_schema, new_batches) = integration_read(&r)?; + + assert_eq!(new_schema, schema); + assert_eq!(new_batches, vec![batch]); + Ok(()) +} + +#[test] +fn list_array_required_required() -> Result<()> { + list_array_generic(false, false) +} + +#[test] +fn list_array_optional_optional() -> Result<()> { + list_array_generic(true, true) +} + +#[test] +fn list_array_required_optional() -> Result<()> { + list_array_generic(false, true) +} + +#[test] +fn list_array_optional_required() -> Result<()> { + list_array_generic(true, false) +} diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index f8283a099e1..0e4d7726389 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -227,7 +227,7 @@ fn v1_nested_int64_required_required() -> Result<()> { } #[test] -fn v2_nested_i16() -> Result<()> { +fn v2_list_int64_required_required() -> Result<()> { test_pyarrow_integration( "list_int64_required_required", 2, @@ -238,6 +238,19 @@ fn v2_nested_i16() -> Result<()> { ) } +#[test] +#[ignore] // see https://issues.apache.org/jira/browse/ARROW-15073 +fn v2_list_int64_optional_required() -> Result<()> { + test_pyarrow_integration( + "list_int64_optional_required", + 2, + "nested", + false, + false, + None, + ) +} + #[test] fn v1_nested_i16() -> Result<()> { test_pyarrow_integration("list_int16", 1, "nested", false, false, None) diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index 9b0d38fa8db..8dac67ae03e 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -33,32 +33,24 @@ fn pages( version: Version::V1, }; - let pages1 = vec![ - array_to_page( - &array11, - parquet_schema.columns()[0].descriptor.clone(), - options, - Encoding::Plain, - )?, - array_to_page( - &array12, - parquet_schema.columns()[0].descriptor.clone(), - options, - Encoding::Plain, - )?, - array_to_page( - &array13, - parquet_schema.columns()[0].descriptor.clone(), - options, - Encoding::Plain, - )?, - ]; + let pages1 = [array11, array12, array13] + .into_iter() + .map(|array| { + array_to_page( + &array, + parquet_schema.fields()[0].clone(), + options, + Encoding::Plain, + ) + }) + .collect::>>()?; + let pages2 = arrays .iter() .flat_map(|array| { array_to_pages( *array, - parquet_schema.columns()[1].descriptor.clone(), + parquet_schema.fields()[1].clone(), options, encoding, )