diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 05e3d5ebb13..d252d6ac781 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -271,7 +271,7 @@ where List(inner) | LargeList(inner) | FixedSizeList(inner, _) => { init.push(InitNested::List(field.is_nullable)); let iter = columns_to_iter_recursive( - vec![columns.pop().unwrap()], + columns, types, inner.as_ref().clone(), init, @@ -291,13 +291,10 @@ where .map(|f| { let mut init = init.clone(); init.push(InitNested::Struct(field.is_nullable)); - columns_to_iter_recursive( - vec![columns.pop().unwrap()], - vec![types.pop().unwrap()], - f.clone(), - init, - chunk_size, - ) + let n = n_columns(f); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) }) .collect::>>()?; let columns = columns.into_iter().rev().collect(); @@ -307,6 +304,34 @@ where }) } +fn n_columns(field: &Field) -> usize { + use crate::datatypes::PhysicalType::*; + match field.data_type.to_physical_type() { + Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 + | Dictionary(_) | LargeUtf8 => 1, + List | FixedSizeList | LargeList => { + let a = field.data_type().to_logical_type(); + if let DataType::List(inner) = a { + n_columns(inner) + } else if let DataType::LargeList(inner) = a { + n_columns(inner) + } else if let DataType::FixedSizeList(inner, _) = a { + n_columns(inner) + } else { + unreachable!() + } + } + Struct => { + if let DataType::Struct(fields) = field.data_type.to_logical_type() { + fields.iter().map(n_columns).sum() + } else { + unreachable!() + } + } + _ => todo!(), + } +} + /// An iterator adapter that maps multiple iterators of [`DataPages`] into an iterator of [`Array`]s. /// /// The arrays are guaranteed to be at most of size `chunk_size` and data type `field.data_type`. diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index 39233efb92e..5fbfd51cf38 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -18,6 +18,10 @@ pub trait Nested: std::fmt::Debug + Send + Sync { fn is_nullable(&self) -> bool; + fn is_repeated(&self) -> bool { + false + } + /// number of rows fn len(&self) -> usize; @@ -79,6 +83,10 @@ impl Nested for NestedOptional { true } + fn is_repeated(&self) -> bool { + true + } + fn push(&mut self, value: i64, is_valid: bool) { self.offsets.push(value); self.validity.push(is_valid); @@ -116,6 +124,10 @@ impl Nested for NestedValid { false } + fn is_repeated(&self) -> bool { + true + } + fn push(&mut self, value: i64, _is_valid: bool) { self.offsets.push(value); } @@ -184,11 +196,11 @@ impl NestedStruct { impl Nested for NestedStruct { fn inner(&mut self) -> (Vec, Option) { - (Default::default(), None) + (Default::default(), Some(std::mem::take(&mut self.validity))) } fn is_nullable(&self) -> bool { - false + true } fn push(&mut self, _value: i64, is_valid: bool) { @@ -389,7 +401,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi let mut cum_sum = vec![0u32; nested.len() + 1]; for (i, nest) in nested.iter().enumerate() { - let delta = if nest.is_nullable() { 2 } else { 1 }; + let delta = nest.is_nullable() as u32 + nest.is_repeated() as u32; cum_sum[i + 1] = cum_sum[i] + delta; } @@ -422,7 +434,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi #[derive(Debug)] pub struct Optional<'a> { iter: HybridRleDecoder<'a>, - max: u32, + max_def: u32, } impl<'a> Iterator for Optional<'a> { @@ -430,10 +442,10 @@ impl<'a> Iterator for Optional<'a> { #[inline] fn next(&mut self) -> Option { - self.iter.next().and_then(|x| { - if x == self.max { + self.iter.next().and_then(|def| { + if def == self.max_def { Some(true) - } else if x == self.max - 1 { + } else if def == self.max_def - 1 { Some(false) } else { self.next() @@ -450,7 +462,7 @@ impl<'a> Optional<'a> { Self { iter: HybridRleDecoder::new(def_levels, get_bit_width(max_def), page.num_values()), - max: max_def as u32, + max_def: max_def as u32, } } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 8db433a5737..4b6e01ab40f 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -191,7 +191,13 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, init: Vec, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: usize, + op: F, + ) -> Self { Self { iter, init, diff --git a/src/io/parquet/read/deserialize/struct_.rs b/src/io/parquet/read/deserialize/struct_.rs index c2daac53013..8ddfca7ddc4 100644 --- a/src/io/parquet/read/deserialize/struct_.rs +++ b/src/io/parquet/read/deserialize/struct_.rs @@ -31,21 +31,29 @@ impl<'a> Iterator for StructIterator<'a> { if values.iter().any(|x| x.is_none()) { return None; } - let values = values - .into_iter() - .map(|x| x.unwrap().map(|x| x.1)) - .collect::, ArrowError>>(); - - match values { - Ok(values) => Some(Ok(( - NestedState::new(vec![]), // todo - Arc::new(StructArray::from_data( - DataType::Struct(self.fields.clone()), - values, - None, - )), - ))), - Err(e) => Some(Err(e)), + + // todo: unzip of Result not yet supportted in stable Rust + let mut nested = vec![]; + let mut new_values = vec![]; + for x in values { + match x.unwrap() { + Ok((nest, values)) => { + new_values.push(values); + nested.push(nest); + } + Err(e) => return Some(Err(e)), + } } + let mut nested = nested.pop().unwrap(); + let (_, validity) = nested.nested.pop().unwrap().inner(); + + Some(Ok(( + nested, + Arc::new(StructArray::from_data( + DataType::Struct(self.fields.clone()), + new_values, + validity.and_then(|x| x.into()), + )), + ))) } }