From 7cc874fa5220f5baded7f39d5da726e52f68d8e5 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Fri, 27 May 2022 19:54:24 +0200 Subject: [PATCH] Fix error in reading nested parquet structs (#1015) --- src/io/parquet/read/deserialize/binary/mod.rs | 2 +- .../parquet/read/deserialize/binary/nested.rs | 4 +- .../parquet/read/deserialize/boolean/mod.rs | 2 +- .../read/deserialize/boolean/nested.rs | 4 +- src/io/parquet/read/deserialize/mod.rs | 144 ++++++++++-------- .../parquet/read/deserialize/nested_utils.rs | 92 +++++------ .../parquet/read/deserialize/primitive/mod.rs | 2 +- .../read/deserialize/primitive/nested.rs | 10 +- src/io/parquet/read/deserialize/struct_.rs | 38 +++-- 9 files changed, 162 insertions(+), 136 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 7c43154073c..103c6c5fcab 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -23,7 +23,7 @@ pub use dictionary::DictIter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, - init: InitNested, + init: Vec, data_type: DataType, chunk_size: usize, ) -> NestedArrayIter<'a> diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index f4492d412ee..95461125553 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -142,7 +142,7 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { pub struct ArrayIterator, I: DataPages> { iter: I, data_type: DataType, - init: InitNested, + init: Vec, items: VecDeque<(Binary, MutableBitmap)>, nested: VecDeque, chunk_size: usize, @@ -150,7 +150,7 @@ pub struct ArrayIterator, I: DataPages> { } impl, I: DataPages> ArrayIterator { - pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize) -> Self { + pub fn new(iter: I, init: Vec, data_type: DataType, chunk_size: usize) -> Self { Self { iter, data_type, diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index 61aeeb97757..ab663682eba 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -16,7 +16,7 @@ pub use self::basic::Iter; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, - init: InitNested, + init: Vec, chunk_size: usize, ) -> NestedArrayIter<'a> where diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 5f30c698a80..b2e2477fb48 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -113,7 +113,7 @@ impl<'a> Decoder<'a> for BooleanDecoder { #[derive(Debug)] pub struct ArrayIterator { iter: I, - init: InitNested, + init: Vec, // invariant: items.len() == nested.len() items: VecDeque<(MutableBitmap, MutableBitmap)>, nested: VecDeque, @@ -121,7 +121,7 @@ pub struct ArrayIterator { } impl ArrayIterator { - pub fn new(iter: I, init: InitNested, chunk_size: usize) -> Self { + pub fn new(iter: I, init: Vec, chunk_size: usize) -> Self { Self { iter, init, diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 7cde9a4f552..d252d6ac781 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -13,7 +13,7 @@ mod utils; use crate::{ array::{Array, BinaryArray, FixedSizeListArray, ListArray, Utf8Array}, datatypes::{DataType, Field}, - error::{Error, Result}, + error::Result, }; use self::nested_utils::{InitNested, NestedArrayIter, NestedState}; @@ -41,10 +41,10 @@ fn create_list( data_type: DataType, nested: &mut NestedState, values: Arc, -) -> Result> { - Ok(match data_type { +) -> Arc { + let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); + match data_type.to_logical_type() { DataType::List(_) => { - let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); offsets.push(values.len() as i64); let offsets = offsets.iter().map(|x| *x as i32).collect::>(); @@ -56,7 +56,6 @@ fn create_list( )) } DataType::LargeList(_) => { - let (mut offsets, validity) = nested.nested.pop().unwrap().inner(); offsets.push(values.len() as i64); Arc::new(ListArray::::new( @@ -66,22 +65,28 @@ fn create_list( validity.and_then(|x| x.into()), )) } - DataType::FixedSizeList(_, _) => { - let (_, validity) = nested.nested.pop().unwrap().inner(); + DataType::FixedSizeList(_, _) => Arc::new(FixedSizeListArray::new( + data_type, + values, + validity.and_then(|x| x.into()), + )), + _ => unreachable!(), + } +} - Arc::new(FixedSizeListArray::new( - data_type, - values, - validity.and_then(|x| x.into()), - )) - } - _ => { - return Err(Error::NotYetImplemented(format!( - "Read nested datatype {:?}", - data_type - ))) - } - }) +fn is_primitive(data_type: &DataType) -> bool { + matches!( + data_type.to_physical_type(), + crate::datatypes::PhysicalType::Primitive(_) + | crate::datatypes::PhysicalType::Null + | crate::datatypes::PhysicalType::Boolean + | crate::datatypes::PhysicalType::Utf8 + | crate::datatypes::PhysicalType::LargeUtf8 + | crate::datatypes::PhysicalType::Binary + | crate::datatypes::PhysicalType::LargeBinary + | crate::datatypes::PhysicalType::FixedSizeBinary + | crate::datatypes::PhysicalType::Dictionary(_) + ) } fn columns_to_iter_recursive<'a, I: 'a>( @@ -95,7 +100,7 @@ where I: DataPages, { use DataType::*; - if init.len() == 1 && init[0].is_primitive() { + if init.is_empty() && is_primitive(&field.data_type) { return Ok(Box::new( page_iter_to_arrays( columns.pop().unwrap(), @@ -109,148 +114,164 @@ where Ok(match field.data_type().to_logical_type() { Boolean => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init.pop().unwrap(), chunk_size) + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size) } Int8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as i8, ) } Int16 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as i16, ) } Int32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x, ) } Int64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i64| x, ) } UInt8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u8, ) } UInt16 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u16, ) } UInt32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i32| x as u32, ) } UInt64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: i64| x as u64, ) } Float32 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: f32| x, ) } Float64 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); primitive::iter_to_arrays_nested( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, |x: f64| x, ) } Utf8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } LargeUtf8 => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } Binary => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } LargeBinary => { + init.push(InitNested::Primitive(field.is_nullable)); types.pop(); binary::iter_to_arrays_nested::, _>( columns.pop().unwrap(), - init.pop().unwrap(), + init, field.data_type().clone(), chunk_size, ) } List(inner) | LargeList(inner) | FixedSizeList(inner, _) => { + init.push(InitNested::List(field.is_nullable)); let iter = columns_to_iter_recursive( - vec![columns.pop().unwrap()], + columns, types, inner.as_ref().clone(), init, @@ -258,7 +279,7 @@ where )?; let iter = iter.map(move |x| { let (mut nested, array) = x?; - let array = create_list(field.data_type().clone(), &mut nested, array)?; + let array = create_list(field.data_type().clone(), &mut nested, array); Ok((nested, array)) }); Box::new(iter) as _ @@ -268,13 +289,12 @@ where .iter() .rev() .map(|f| { - columns_to_iter_recursive( - vec![columns.pop().unwrap()], - vec![types.pop().unwrap()], - f.clone(), - vec![init.pop().unwrap()], - chunk_size, - ) + let mut init = init.clone(); + init.push(InitNested::Struct(field.is_nullable)); + let n = n_columns(f); + let columns = columns.drain(columns.len() - n..).collect(); + let types = types.drain(types.len() - n..).collect(); + columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) }) .collect::>>()?; let columns = columns.into_iter().rev().collect(); @@ -284,38 +304,29 @@ where }) } -fn field_to_init(field: &Field) -> Vec { +fn n_columns(field: &Field) -> usize { use crate::datatypes::PhysicalType::*; match field.data_type.to_physical_type() { Null | Boolean | Primitive(_) | Binary | FixedSizeBinary | LargeBinary | Utf8 - | Dictionary(_) | LargeUtf8 => vec![InitNested::Primitive(field.is_nullable)], + | Dictionary(_) | LargeUtf8 => 1, List | FixedSizeList | LargeList => { let a = field.data_type().to_logical_type(); - let inner = if let DataType::List(inner) = a { - field_to_init(inner) + if let DataType::List(inner) = a { + n_columns(inner) } else if let DataType::LargeList(inner) = a { - field_to_init(inner) + n_columns(inner) } else if let DataType::FixedSizeList(inner, _) = a { - field_to_init(inner) + n_columns(inner) } else { unreachable!() - }; - inner - .into_iter() - .map(|x| InitNested::List(Box::new(x), field.is_nullable)) - .collect() + } } Struct => { - let inner = if let DataType::Struct(fields) = field.data_type.to_logical_type() { - fields.iter().rev().map(field_to_init).collect::>() + if let DataType::Struct(fields) = field.data_type.to_logical_type() { + fields.iter().map(n_columns).sum() } else { unreachable!() - }; - inner - .into_iter() - .flatten() - .map(|x| InitNested::Struct(Box::new(x), field.is_nullable)) - .collect() + } } _ => todo!(), } @@ -333,9 +344,8 @@ pub fn column_iter_to_arrays<'a, I: 'a>( where I: DataPages, { - let init = field_to_init(&field); - Ok(Box::new( - columns_to_iter_recursive(columns, types, field, init, chunk_size)?.map(|x| x.map(|x| x.1)), + columns_to_iter_recursive(columns, types, field, vec![], chunk_size)? + .map(|x| x.map(|x| x.1)), )) } diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index b74f7c24734..5fbfd51cf38 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -18,6 +18,10 @@ pub trait Nested: std::fmt::Debug + Send + Sync { fn is_nullable(&self) -> bool; + fn is_repeated(&self) -> bool { + false + } + /// number of rows fn len(&self) -> usize; @@ -79,6 +83,10 @@ impl Nested for NestedOptional { true } + fn is_repeated(&self) -> bool { + true + } + fn push(&mut self, value: i64, is_valid: bool) { self.offsets.push(value); self.validity.push(is_valid); @@ -116,6 +124,10 @@ impl Nested for NestedValid { false } + fn is_repeated(&self) -> bool { + true + } + fn push(&mut self, value: i64, _is_valid: bool) { self.offsets.push(value); } @@ -184,11 +196,11 @@ impl NestedStruct { impl Nested for NestedStruct { fn inner(&mut self) -> (Vec, Option) { - (Default::default(), None) + (Default::default(), Some(std::mem::take(&mut self.validity))) } fn is_nullable(&self) -> bool { - false + true } fn push(&mut self, _value: i64, is_valid: bool) { @@ -221,46 +233,36 @@ where } } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Copy, PartialEq)] pub enum InitNested { Primitive(bool), - List(Box, bool), - Struct(Box, bool), -} - -impl InitNested { - pub fn is_primitive(&self) -> bool { - matches!(self, Self::Primitive(_)) - } + List(bool), + Struct(bool), } -fn init_nested_recursive(init: &InitNested, capacity: usize, container: &mut Vec>) { - match init { - InitNested::Primitive(is_nullable) => { - container.push(Box::new(NestedPrimitive::new(*is_nullable)) as Box) - } - InitNested::List(inner, is_nullable) => { - container.push(if *is_nullable { - Box::new(NestedOptional::with_capacity(capacity)) as Box - } else { - Box::new(NestedValid::with_capacity(capacity)) as Box - }); - init_nested_recursive(inner, capacity, container) - } - InitNested::Struct(inner, is_nullable) => { - if *is_nullable { - container.push(Box::new(NestedStruct::with_capacity(capacity)) as Box) - } else { - container.push(Box::new(NestedStructValid::new()) as Box) +fn init_nested(init: &[InitNested], capacity: usize) -> NestedState { + let container = init + .iter() + .map(|init| match init { + InitNested::Primitive(is_nullable) => { + Box::new(NestedPrimitive::new(*is_nullable)) as Box } - init_nested_recursive(inner, capacity, container) - } - } -} - -fn init_nested(init: &InitNested, capacity: usize) -> NestedState { - let mut container = vec![]; - init_nested_recursive(init, capacity, &mut container); + InitNested::List(is_nullable) => { + if *is_nullable { + Box::new(NestedOptional::with_capacity(capacity)) as Box + } else { + Box::new(NestedValid::with_capacity(capacity)) as Box + } + } + InitNested::Struct(is_nullable) => { + if *is_nullable { + Box::new(NestedStruct::with_capacity(capacity)) as Box + } else { + Box::new(NestedStructValid::new()) as Box + } + } + }) + .collect(); NestedState::new(container) } @@ -359,7 +361,7 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( /// has less items than `chunk_size` pub fn extend_offsets1<'a>( page: &mut NestedPage<'a>, - init: &InitNested, + init: &[InitNested], items: &mut VecDeque, chunk_size: usize, ) { @@ -399,7 +401,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi let mut cum_sum = vec![0u32; nested.len() + 1]; for (i, nest) in nested.iter().enumerate() { - let delta = if nest.is_nullable() { 2 } else { 1 }; + let delta = nest.is_nullable() as u32 + nest.is_repeated() as u32; cum_sum[i + 1] = cum_sum[i] + delta; } @@ -432,7 +434,7 @@ fn extend_offsets2<'a>(page: &mut NestedPage<'a>, nested: &mut NestedState, addi #[derive(Debug)] pub struct Optional<'a> { iter: HybridRleDecoder<'a>, - max: u32, + max_def: u32, } impl<'a> Iterator for Optional<'a> { @@ -440,10 +442,10 @@ impl<'a> Iterator for Optional<'a> { #[inline] fn next(&mut self) -> Option { - self.iter.next().and_then(|x| { - if x == self.max { + self.iter.next().and_then(|def| { + if def == self.max_def { Some(true) - } else if x == self.max - 1 { + } else if def == self.max_def - 1 { Some(false) } else { self.next() @@ -460,7 +462,7 @@ impl<'a> Optional<'a> { Self { iter: HybridRleDecoder::new(def_levels, get_bit_width(max_def), page.num_values()), - max: max_def as u32, + max_def: max_def as u32, } } @@ -475,7 +477,7 @@ pub(super) fn next<'a, I, D>( iter: &'a mut I, items: &mut VecDeque, nested_items: &mut VecDeque, - init: &InitNested, + init: &[InitNested], chunk_size: usize, decoder: &D, ) -> MaybeNext> diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index b2eba1534ff..400b87f1040 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -16,7 +16,7 @@ use nested::ArrayIterator; /// Converts [`DataPages`] to an [`Iterator`] of [`Array`] pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, - init: InitNested, + init: Vec, data_type: DataType, chunk_size: usize, op: F, diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 0aff18e2578..4b6e01ab40f 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -174,7 +174,7 @@ where F: Copy + Fn(P) -> T, { iter: I, - init: InitNested, + init: Vec, data_type: DataType, // invariant: items.len() == nested.len() items: VecDeque<(Vec, MutableBitmap)>, @@ -191,7 +191,13 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, init: InitNested, data_type: DataType, chunk_size: usize, op: F) -> Self { + pub fn new( + iter: I, + init: Vec, + data_type: DataType, + chunk_size: usize, + op: F, + ) -> Self { Self { iter, init, diff --git a/src/io/parquet/read/deserialize/struct_.rs b/src/io/parquet/read/deserialize/struct_.rs index c5b54adf78c..aa6f1192a8f 100644 --- a/src/io/parquet/read/deserialize/struct_.rs +++ b/src/io/parquet/read/deserialize/struct_.rs @@ -31,21 +31,29 @@ impl<'a> Iterator for StructIterator<'a> { if values.iter().any(|x| x.is_none()) { return None; } - let values = values - .into_iter() - .map(|x| x.unwrap().map(|x| x.1)) - .collect::, Error>>(); - - match values { - Ok(values) => Some(Ok(( - NestedState::new(vec![]), // todo - Arc::new(StructArray::from_data( - DataType::Struct(self.fields.clone()), - values, - None, - )), - ))), - Err(e) => Some(Err(e)), + + // todo: unzip of Result not yet supportted in stable Rust + let mut nested = vec![]; + let mut new_values = vec![]; + for x in values { + match x.unwrap() { + Ok((nest, values)) => { + new_values.push(values); + nested.push(nest); + } + Err(e) => return Some(Err(e)), + } } + let mut nested = nested.pop().unwrap(); + let (_, validity) = nested.nested.pop().unwrap().inner(); + + Some(Ok(( + nested, + Arc::new(StructArray::from_data( + DataType::Struct(self.fields.clone()), + new_values, + validity.and_then(|x| x.into()), + )), + ))) } }