From 3df39d76e06e634b4b7dedd9e470a8e79d34d1d4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 24 Apr 2022 21:59:48 +0000 Subject: [PATCH 1/3] Fixed stats --- parquet_integration/write_parquet.py | 19 +- src/io/parquet/read/statistics/binary.rs | 134 +---- src/io/parquet/read/statistics/boolean.rs | 60 +-- src/io/parquet/read/statistics/dictionary.rs | 60 +++ src/io/parquet/read/statistics/fixlen.rs | 161 ++---- src/io/parquet/read/statistics/list.rs | 79 +++ src/io/parquet/read/statistics/mod.rs | 375 ++++++++++---- src/io/parquet/read/statistics/primitive.rs | 121 +---- src/io/parquet/read/statistics/utf8.rs | 30 ++ tests/it/io/parquet/mod.rs | 496 ++++++++++--------- tests/it/io/parquet/read.rs | 11 +- tests/it/io/parquet/write.rs | 4 +- 12 files changed, 858 insertions(+), 692 deletions(-) create mode 100644 src/io/parquet/read/statistics/dictionary.rs create mode 100644 src/io/parquet/read/statistics/list.rs create mode 100644 src/io/parquet/read/statistics/utf8.rs diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 92c94b30186..997b3e830a3 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -9,7 +9,8 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: - int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9] + int64 = [-256, -1, None, 3, None, 5, 6, 7, None, 9] + uint32 = [0, 1, None, 3, None, 5, 6, 7, None, 9] float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] boolean = [True, None, False, False, None, True, None, None, True, True] @@ -24,7 +25,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: pa.field("float64", pa.float64()), pa.field("string", pa.utf8()), pa.field("bool", pa.bool_()), - pa.field("date", pa.timestamp("ms")), + pa.field("timestamp_ms", pa.timestamp("ms")), pa.field("uint32", pa.uint32()), pa.field("string_large", pa.utf8()), # decimal testing @@ -44,8 +45,8 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: "float64": float64, "string": string, "bool": boolean, - "date": int64, - "uint32": int64, + "timestamp_ms": uint32, + "uint32": uint32, "string_large": string_large, "decimal_9": decimal, "decimal_18": decimal, @@ -61,7 +62,7 @@ def case_basic_nullable() -> Tuple[dict, pa.Schema, str]: def case_basic_required() -> Tuple[dict, pa.Schema, str]: - int64 = [-256, -1, 0, 1, 2, 3, 4, 5, 6, 7] + int64 = [-256, -1, 2, 3, 4, 5, 6, 7, 8, 9] uint32 = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] float64 = [0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0] string = ["Hello", "bbb", "aa", "", "bbb", "abc", "bbb", "bbb", "def", "aaa"] @@ -74,10 +75,8 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: pa.field("string", pa.utf8(), nullable=False), pa.field("bool", pa.bool_(), nullable=False), pa.field( - "date", - pa.timestamp( - "ms", - ), + "timestamp_ms", + pa.timestamp("ms"), nullable=False, ), pa.field("uint32", pa.uint32(), nullable=False), @@ -93,7 +92,7 @@ def case_basic_required() -> Tuple[dict, pa.Schema, str]: "float64": float64, "string": string, "bool": boolean, - "date": int64, + "timestamp_ms": uint32, "uint32": uint32, "decimal_9": decimal, "decimal_18": decimal, diff --git a/src/io/parquet/read/statistics/binary.rs b/src/io/parquet/read/statistics/binary.rs index 12421994925..1786477e6bb 100644 --- a/src/io/parquet/read/statistics/binary.rs +++ b/src/io/parquet/read/statistics/binary.rs @@ -1,113 +1,23 @@ -use std::any::Any; -use std::convert::TryFrom; - -use crate::datatypes::DataType; -use parquet2::statistics::BinaryStatistics as ParquetByteArrayStatistics; - -use super::Statistics; -use crate::error::{ArrowError, Result}; - -/// Represents a `Binary` or `LargeBinary` -#[derive(Debug, Clone, PartialEq)] -pub struct BinaryStatistics { - /// number of nulls - pub null_count: Option, - /// number of dictinct values - pub distinct_count: Option, - /// Minimum - pub min_value: Option>, - /// Maximum - pub max_value: Option>, -} - -impl Statistics for BinaryStatistics { - fn data_type(&self) -> &DataType { - &DataType::Binary - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn null_count(&self) -> Option { - self.null_count - } -} - -impl From<&ParquetByteArrayStatistics> for BinaryStatistics { - fn from(stats: &ParquetByteArrayStatistics) -> Self { - Self { - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats.min_value.clone(), - max_value: stats.max_value.clone(), - } - } -} - -/// Statistics of a string parquet column -#[derive(Debug, Clone, PartialEq)] -pub struct Utf8Statistics { - /// number of nulls - pub null_count: Option, - /// number of dictinct values - pub distinct_count: Option, - /// Minimum - pub min_value: Option, - /// Maximum - pub max_value: Option, -} - -impl Statistics for Utf8Statistics { - fn data_type(&self) -> &DataType { - &DataType::Utf8 - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn null_count(&self) -> Option { - self.null_count - } -} - -impl TryFrom<&ParquetByteArrayStatistics> for Utf8Statistics { - type Error = ArrowError; - - fn try_from(stats: &ParquetByteArrayStatistics) -> Result { - Ok(Self { - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats - .min_value - .as_ref() - .map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string())) - .transpose()?, - max_value: stats - .max_value - .as_ref() - .map(|x| simdutf8::basic::from_utf8(x).map(|x| x.to_string())) - .transpose()?, - }) - } -} - -pub(super) fn statistics_from_byte_array( - stats: &ParquetByteArrayStatistics, - data_type: DataType, -) -> Result> { - use DataType::*; - Ok(match data_type { - Utf8 => Box::new(Utf8Statistics::try_from(stats)?), - LargeUtf8 => Box::new(Utf8Statistics::try_from(stats)?), - Binary => Box::new(BinaryStatistics::from(stats)), - LargeBinary => Box::new(BinaryStatistics::from(stats)), - other => { - return Err(ArrowError::NotYetImplemented(format!( - "Can't read {:?} from parquet", - other - ))) - } - }) +use crate::array::{MutableArray, MutableBinaryArray, Offset}; +use parquet2::statistics::{BinaryStatistics, Statistics as ParquetStatistics}; + +use crate::error::Result; + +pub(super) fn push( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + min.push(from.and_then(|s| s.min_value.as_ref())); + max.push(from.and_then(|s| s.max_value.as_ref())); + Ok(()) } diff --git a/src/io/parquet/read/statistics/boolean.rs b/src/io/parquet/read/statistics/boolean.rs index 30c462e1b9f..c94b23b4f52 100644 --- a/src/io/parquet/read/statistics/boolean.rs +++ b/src/io/parquet/read/statistics/boolean.rs @@ -1,43 +1,23 @@ -use crate::datatypes::DataType; -use parquet2::statistics::BooleanStatistics as ParquetBooleanStatistics; -use std::any::Any; +use crate::array::{MutableArray, MutableBooleanArray}; +use parquet2::statistics::{BooleanStatistics, Statistics as ParquetStatistics}; -use super::Statistics; +use crate::error::Result; -/// Statistics of a boolean parquet column -#[derive(Debug, Clone, PartialEq)] -pub struct BooleanStatistics { - /// number of nulls - pub null_count: Option, - /// number of dictinct values - pub distinct_count: Option, - /// Minimum - pub min_value: Option, - /// Maximum - pub max_value: Option, -} - -impl Statistics for BooleanStatistics { - fn data_type(&self) -> &DataType { - &DataType::Boolean - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn null_count(&self) -> Option { - self.null_count - } -} - -impl From<&ParquetBooleanStatistics> for BooleanStatistics { - fn from(stats: &ParquetBooleanStatistics) -> Self { - Self { - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats.min_value, - max_value: stats.max_value, - } - } +pub(super) fn push( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + min.push(from.and_then(|s| s.min_value)); + max.push(from.and_then(|s| s.max_value)); + Ok(()) } diff --git a/src/io/parquet/read/statistics/dictionary.rs b/src/io/parquet/read/statistics/dictionary.rs new file mode 100644 index 00000000000..29a60fd1862 --- /dev/null +++ b/src/io/parquet/read/statistics/dictionary.rs @@ -0,0 +1,60 @@ +use crate::array::*; +use crate::datatypes::DataType; +use crate::error::Result; + +use super::make_mutable; + +#[derive(Debug)] +pub struct DynMutableDictionary { + data_type: DataType, + pub inner: Box, +} + +impl DynMutableDictionary { + pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result { + let inner = if let DataType::Dictionary(_, inner, _) = &data_type { + inner.as_ref() + } else { + unreachable!() + }; + let inner = make_mutable(inner, capacity)?; + + Ok(Self { data_type, inner }) + } +} + +impl MutableArray for DynMutableDictionary { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { + self.inner.validity() + } + + fn as_box(&mut self) -> Box { + let inner = self.inner.as_arc(); + let keys = PrimitiveArray::::from_iter((0..inner.len() as i32).map(Some)); + Box::new(DictionaryArray::::from_data(keys, inner)) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn push_null(&mut self) { + todo!() + } + + fn shrink_to_fit(&mut self) { + todo!() + } +} diff --git a/src/io/parquet/read/statistics/fixlen.rs b/src/io/parquet/read/statistics/fixlen.rs index 2cbe8e97751..b97f6458643 100644 --- a/src/io/parquet/read/statistics/fixlen.rs +++ b/src/io/parquet/read/statistics/fixlen.rs @@ -1,116 +1,55 @@ -use std::any::Any; -use std::convert::{TryFrom, TryInto}; - -use super::primitive::PrimitiveStatistics; -use crate::datatypes::DataType; -use crate::error::{ArrowError, Result}; -use parquet2::{ - schema::types::PhysicalType, - statistics::{ - FixedLenStatistics as ParquetFixedLenStatistics, Statistics as ParquetStatistics, - }, -}; - -use super::Statistics; - -/// Arrow-deserialized parquet Statistics of a fixed-len binary -#[derive(Debug, Clone, PartialEq)] -pub struct FixedLenStatistics { - /// number of nulls - pub null_count: Option, - /// number of dictinct values - pub distinct_count: Option, - /// Minimum - pub min_value: Option>, - /// Maximum - pub max_value: Option>, - /// data type - pub data_type: DataType, -} - -impl Statistics for FixedLenStatistics { - fn data_type(&self) -> &DataType { - &self.data_type - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn null_count(&self) -> Option { - self.null_count - } +use parquet2::statistics::{FixedLenStatistics, Statistics as ParquetStatistics}; + +use crate::array::*; +use crate::error::Result; + +fn convert(value: &[u8], n: usize) -> i128 { + // Copy the fixed-size byte value to the start of a 16 byte stack + // allocated buffer, then use an arithmetic right shift to fill in + // MSBs, which accounts for leading 1's in negative (two's complement) + // values. + let mut bytes = [0u8; 16]; + bytes[..n].copy_from_slice(value); + i128::from_be_bytes(bytes) >> (8 * (16 - n)) } -impl From<&ParquetFixedLenStatistics> for FixedLenStatistics { - fn from(stats: &ParquetFixedLenStatistics) -> Self { - let byte_lens = match stats.physical_type() { - PhysicalType::FixedLenByteArray(size) => *size, - _ => unreachable!(), - }; - Self { - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats.min_value.clone(), - max_value: stats.max_value.clone(), - data_type: DataType::FixedSizeBinary(byte_lens as usize), - } - } -} - -impl TryFrom<(&ParquetFixedLenStatistics, DataType)> for PrimitiveStatistics { - type Error = ArrowError; - fn try_from((stats, data_type): (&ParquetFixedLenStatistics, DataType)) -> Result { - let byte_lens = match stats.physical_type() { - PhysicalType::FixedLenByteArray(size) => *size, - _ => unreachable!(), - }; - if byte_lens > 16 { - Err(ArrowError::ExternalFormat(format!( - "Can't deserialize i128 from Fixed Len Byte array with length {:?}", - byte_lens - ))) - } else { - let paddings = (0..(16 - byte_lens)).map(|_| 0u8).collect::>(); - let max_value = stats.max_value.as_ref().and_then(|value| { - [paddings.as_slice(), value] - .concat() - .try_into() - .map(i128::from_be_bytes) - .ok() - }); - - let min_value = stats.min_value.as_ref().and_then(|value| { - [paddings.as_slice(), value] - .concat() - .try_into() - .map(i128::from_be_bytes) - .ok() - }); - Ok(Self { - data_type, - null_count: stats.null_count, - distinct_count: stats.distinct_count, - max_value, - min_value, - }) - } - } +pub(super) fn push_i128( + from: Option<&dyn ParquetStatistics>, + n: usize, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push(from.and_then(|s| s.min_value.as_deref().map(|x| convert(x, n)))); + max.push(from.and_then(|s| s.max_value.as_deref().map(|x| convert(x, n)))); + + Ok(()) } -pub(super) fn statistics_from_fix_len( - stats: &ParquetFixedLenStatistics, - data_type: DataType, -) -> Result> { - use DataType::*; - Ok(match data_type { - Decimal(_, _) => Box::new(PrimitiveStatistics::::try_from((stats, data_type))?), - FixedSizeBinary(_) => Box::new(FixedLenStatistics::from(stats)), - other => { - return Err(ArrowError::NotYetImplemented(format!( - "Can't read {:?} from parquet", - other - ))) - } - }) +pub(super) fn push( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + min.push(from.and_then(|s| s.min_value.as_ref())); + max.push(from.and_then(|s| s.max_value.as_ref())); + Ok(()) } diff --git a/src/io/parquet/read/statistics/list.rs b/src/io/parquet/read/statistics/list.rs new file mode 100644 index 00000000000..5ce562c5bec --- /dev/null +++ b/src/io/parquet/read/statistics/list.rs @@ -0,0 +1,79 @@ +use crate::array::*; +use crate::datatypes::DataType; +use crate::error::Result; + +use super::make_mutable; + +#[derive(Debug)] +pub struct DynMutableListArray { + data_type: DataType, + pub inner: Box, +} + +impl DynMutableListArray { + pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result { + let inner = match data_type.to_logical_type() { + DataType::List(inner) | DataType::LargeList(inner) => inner.data_type(), + _ => unreachable!(), + }; + let inner = make_mutable(inner, capacity)?; + + Ok(Self { data_type, inner }) + } +} + +impl MutableArray for DynMutableListArray { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { + self.inner.validity() + } + + fn as_box(&mut self) -> Box { + let inner = self.inner.as_arc(); + + match self.data_type.to_logical_type() { + DataType::List(_) => { + let offsets = vec![0, inner.len() as i32].into(); + Box::new(ListArray::::new( + self.data_type.clone(), + offsets, + inner, + None, + )) + } + DataType::LargeList(_) => { + let offsets = vec![0, inner.len() as i64].into(); + Box::new(ListArray::::new( + self.data_type.clone(), + offsets, + inner, + None, + )) + } + _ => unreachable!(), + } + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn push_null(&mut self) { + todo!() + } + + fn shrink_to_fit(&mut self) { + todo!() + } +} diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index d42a2ad16e6..4e6e41fbdf4 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -1,108 +1,290 @@ //! APIs exposing `parquet2`'s statistics as arrow's statistics. -use std::any::Any; +use std::collections::VecDeque; +use std::sync::Arc; -use parquet2::metadata::ColumnChunkMetaData; -use parquet2::schema::types::PhysicalType; -use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; -use parquet2::statistics::Statistics as ParquetStatistics; +use parquet2::metadata::RowGroupMetaData; +use parquet2::schema::types::{ + PhysicalType as ParquetPhysicalType, PrimitiveType as ParquetPrimitiveType, +}; +use parquet2::statistics::{ + BinaryStatistics, BooleanStatistics, FixedLenStatistics, PrimitiveStatistics, + Statistics as ParquetStatistics, +}; -use crate::datatypes::DataType; -use crate::datatypes::Field; +use crate::array::*; +use crate::datatypes::IntervalUnit; +use crate::datatypes::{DataType, Field, PhysicalType}; use crate::error::ArrowError; use crate::error::Result; -mod primitive; -pub use primitive::*; mod binary; -pub use binary::*; mod boolean; -pub use boolean::*; +mod dictionary; mod fixlen; -pub use fixlen::*; +mod list; +mod primitive; +mod utf8; -use super::get_field_columns; +use self::list::DynMutableListArray; -/// Trait representing a deserialized parquet statistics into arrow. -pub trait Statistics: std::fmt::Debug { - /// returns the [`DataType`] of the statistics. - fn data_type(&self) -> &DataType; +use super::get_field_columns; - /// Returns `dyn Any` can used to downcast to a physical type. - fn as_any(&self) -> &dyn Any; +/// Arrow-deserialized parquet Statistics of a file +#[derive(Debug, PartialEq)] +pub struct Statistics { + /// number of nulls + pub null_count: UInt64Array, + /// number of dictinct values + pub distinct_count: UInt64Array, + /// Minimum + pub min_value: Box, + /// Maximum + pub max_value: Box, +} - /// Return the null count statistic - fn null_count(&self) -> Option; +/// Arrow-deserialized parquet Statistics of a file +#[derive(Debug)] +struct MutableStatistics { + /// number of nulls + pub null_count: UInt64Vec, + /// number of dictinct values + pub distinct_count: UInt64Vec, + /// Minimum + pub min_value: Box, + /// Maximum + pub max_value: Box, } -impl PartialEq for &dyn Statistics { - fn eq(&self, other: &Self) -> bool { - self.data_type() == other.data_type() +impl From for Statistics { + fn from(mut s: MutableStatistics) -> Self { + Self { + null_count: s.null_count.into(), + distinct_count: s.distinct_count.into(), + min_value: s.min_value.as_box(), + max_value: s.max_value.as_box(), + } } } -impl PartialEq for Box { - fn eq(&self, other: &Self) -> bool { - self.data_type() == other.data_type() +fn make_mutable(data_type: &DataType, capacity: usize) -> Result> { + Ok(match data_type.to_physical_type() { + PhysicalType::Boolean => { + Box::new(MutableBooleanArray::with_capacity(capacity)) as Box + } + PhysicalType::Primitive(primitive) => with_match_primitive_type!(primitive, |$T| { + Box::new(MutablePrimitiveArray::<$T>::with_capacity(capacity).to(data_type.clone())) + as Box + }), + PhysicalType::Binary => { + Box::new(MutableBinaryArray::::with_capacity(capacity)) as Box + } + PhysicalType::LargeBinary => { + Box::new(MutableBinaryArray::::with_capacity(capacity)) as Box + } + PhysicalType::Utf8 => { + Box::new(MutableUtf8Array::::with_capacity(capacity)) as Box + } + PhysicalType::LargeUtf8 => { + Box::new(MutableUtf8Array::::with_capacity(capacity)) as Box + } + PhysicalType::FixedSizeBinary => Box::new(MutableFixedSizeBinaryArray::from_data( + data_type.clone(), + vec![], + None, + )) as _, + PhysicalType::LargeList | PhysicalType::List => Box::new( + DynMutableListArray::try_with_capacity(data_type.clone(), capacity)?, + ) as Box, + PhysicalType::Dictionary(_) => Box::new( + dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?, + ), + other => { + return Err(ArrowError::NotYetImplemented(format!( + "Deserializing parquet stats from {:?} is still not implemented", + other + ))) + } + }) +} + +impl MutableStatistics { + fn try_new(field: &Field) -> Result { + let min_value = make_mutable(&field.data_type, 0)?; + let max_value = make_mutable(&field.data_type, 0)?; + + Ok(Self { + null_count: UInt64Vec::new(), + distinct_count: UInt64Vec::new(), + min_value, + max_value, + }) } } -/// Deserializes [`ParquetStatistics`] into [`Statistics`] based on `data_type`. -/// This takes into account the Arrow schema declared in Parquet's schema -fn _deserialize_statistics( - stats: &dyn ParquetStatistics, - data_type: DataType, -) -> Result> { - match stats.physical_type() { - PhysicalType::Int32 => { - let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i32(stats, data_type) - } - PhysicalType::Int64 => { - let stats = stats.as_any().downcast_ref().unwrap(); - primitive::statistics_from_i64(stats, data_type) - } - PhysicalType::ByteArray => { - let stats = stats.as_any().downcast_ref().unwrap(); - binary::statistics_from_byte_array(stats, data_type) +fn push_others( + from: Option<&dyn ParquetStatistics>, + distinct_count: &mut UInt64Vec, + null_count: &mut UInt64Vec, +) { + let from = if let Some(from) = from { + from + } else { + distinct_count.push(None); + null_count.push(None); + return; + }; + let (distinct, null_count1) = match from.physical_type() { + ParquetPhysicalType::Boolean => { + let from = from.as_any().downcast_ref::().unwrap(); + (from.distinct_count, from.null_count) } - PhysicalType::Boolean => { - let stats = stats.as_any().downcast_ref().unwrap(); - Ok(Box::new(BooleanStatistics::from(stats))) + ParquetPhysicalType::Int32 => { + let from = from + .as_any() + .downcast_ref::>() + .unwrap(); + (from.distinct_count, from.null_count) } - PhysicalType::Float => { - let stats = stats + ParquetPhysicalType::Int64 => { + let from = from .as_any() - .downcast_ref::>() + .downcast_ref::>() .unwrap(); - Ok(Box::new(PrimitiveStatistics::::from(( - stats, data_type, - )))) + (from.distinct_count, from.null_count) } - PhysicalType::Double => { - let stats = stats + ParquetPhysicalType::Int96 => { + let from = from .as_any() - .downcast_ref::>() + .downcast_ref::>() .unwrap(); - Ok(Box::new(PrimitiveStatistics::::from(( - stats, data_type, - )))) + (from.distinct_count, from.null_count) } - PhysicalType::FixedLenByteArray(_) => { - let stats = stats.as_any().downcast_ref().unwrap(); - fixlen::statistics_from_fix_len(stats, data_type) + ParquetPhysicalType::Float => { + let from = from + .as_any() + .downcast_ref::>() + .unwrap(); + (from.distinct_count, from.null_count) } - _ => Err(ArrowError::NotYetImplemented( - "Reading Fixed-len array statistics is not yet supported".to_string(), - )), - } + ParquetPhysicalType::Double => { + let from = from + .as_any() + .downcast_ref::>() + .unwrap(); + (from.distinct_count, from.null_count) + } + ParquetPhysicalType::ByteArray => { + let from = from.as_any().downcast_ref::().unwrap(); + (from.distinct_count, from.null_count) + } + ParquetPhysicalType::FixedLenByteArray(_) => { + let from = from.as_any().downcast_ref::().unwrap(); + (from.distinct_count, from.null_count) + } + }; + + distinct_count.push(distinct.map(|x| x as u64)); + null_count.push(null_count1.map(|x| x as u64)); } -fn get_fields(field: &Field) -> Vec<&Field> { - match field.data_type.to_logical_type() { - DataType::List(inner) => get_fields(inner), - DataType::LargeList(inner) => get_fields(inner), - DataType::Struct(fields) => fields.iter().flat_map(get_fields).collect(), - _ => vec![field], +fn push( + mut stats: VecDeque<(Option>, ParquetPrimitiveType)>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, + distinct_count: &mut UInt64Vec, + null_count: &mut UInt64Vec, +) -> Result<()> { + match min.data_type().to_logical_type() { + List(_) | LargeList(_) => { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + return push( + stats, + min.inner.as_mut(), + max.inner.as_mut(), + distinct_count, + null_count, + ); + } + Dictionary(_, _, _) => { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + return push( + stats, + min.inner.as_mut(), + max.inner.as_mut(), + distinct_count, + null_count, + ); + } + _ => {} + } + + let (from, type_) = stats.pop_front().unwrap(); + let from = from.as_deref(); + + push_others(from, distinct_count, null_count); + + let physical_type = &type_.physical_type; + + use DataType::*; + match min.data_type().to_logical_type() { + Boolean => boolean::push(from, min, max), + Int8 => primitive::push(from, min, max, |x: i32| Ok(x as i8)), + Int16 => primitive::push(from, min, max, |x: i32| Ok(x as i16)), + Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::push(from, min, max, |x: i32| Ok(x as i32)) + } + UInt8 => primitive::push(from, min, max, |x: i32| Ok(x as u8)), + UInt16 => primitive::push(from, min, max, |x: i32| Ok(x as u16)), + UInt32 => primitive::push(from, min, max, |x: i32| Ok(x as u32)), + Int32 => primitive::push(from, min, max, |x: i32| Ok(x as i32)), + Int64 | Date64 | Time64(_) | Duration(_) => { + primitive::push(from, min, max, |x: i64| Ok(x as i64)) + } + UInt64 => primitive::push(from, min, max, |x: i64| Ok(x as u64)), + Timestamp(time_unit, _) => { + let time_unit = *time_unit; + primitive::push(from, min, max, |x: i64| { + Ok(primitive::timestamp( + type_.logical_type.as_ref(), + time_unit, + x, + )) + }) + } + Float32 => primitive::push(from, min, max, |x: f32| Ok(x as f32)), + Float64 => primitive::push(from, min, max, |x: f64| Ok(x as f64)), + Decimal(_, _) => match physical_type { + ParquetPhysicalType::Int32 => primitive::push(from, min, max, |x: i32| Ok(x as i128)), + ParquetPhysicalType::Int64 => primitive::push(from, min, max, |x: i64| Ok(x as i128)), + ParquetPhysicalType::FixedLenByteArray(n) if *n > 16 => { + return Err(ArrowError::NotYetImplemented(format!( + "Can't decode Decimal128 type from Fixed Size Byte Array of len {:?}", + n + ))) + } + ParquetPhysicalType::FixedLenByteArray(n) => fixlen::push_i128(from, *n, min, max), + _ => unreachable!(), + }, + Binary => binary::push::(from, min, max), + LargeBinary => binary::push::(from, min, max), + Utf8 => utf8::push::(from, min, max), + LargeUtf8 => utf8::push::(from, min, max), + FixedSizeBinary(_) => fixlen::push(from, min, max), + other => todo!("{:?}", other), } } @@ -110,22 +292,33 @@ fn get_fields(field: &Field) -> Vec<&Field> { /// /// For non-nested types, it returns a single column. /// For nested types, it returns one column per parquet primitive column. -pub fn deserialize_statistics( - field: &Field, - columns: &[ColumnChunkMetaData], -) -> Result>>> { - let columns = get_field_columns(columns, field.name.as_ref()); - - let fields = get_fields(field); - - columns - .into_iter() - .zip(fields.into_iter()) - .map(|(column, field)| { - column - .statistics() - .map(|x| _deserialize_statistics(x?.as_ref(), field.data_type.clone())) - .transpose() - }) - .collect() +pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Result { + if groups.is_empty() { + todo!("Return an empty statistics") + } + + let mut statistics = MutableStatistics::try_new(field)?; + + // transpose + groups.iter().try_for_each(|group| { + let columns = get_field_columns(group.columns(), field.name.as_ref()); + let stats = columns + .into_iter() + .map(|column| { + Ok(( + column.statistics().transpose()?, + column.descriptor().descriptor.primitive_type.clone(), + )) + }) + .collect::, ParquetPrimitiveType)>>>()?; + push( + stats, + statistics.min_value.as_mut(), + statistics.max_value.as_mut(), + &mut statistics.distinct_count, + &mut statistics.null_count, + ) + })?; + + Ok(statistics.into()) } diff --git a/src/io/parquet/read/statistics/primitive.rs b/src/io/parquet/read/statistics/primitive.rs index 9ef9ec7d44e..849363028ad 100644 --- a/src/io/parquet/read/statistics/primitive.rs +++ b/src/io/parquet/read/statistics/primitive.rs @@ -1,81 +1,14 @@ -use std::any::Any; - -use parquet2::schema::types::{PrimitiveLogicalType, PrimitiveType, TimeUnit as ParquetTimeUnit}; -use parquet2::statistics::PrimitiveStatistics as ParquetPrimitiveStatistics; +use parquet2::schema::types::{PrimitiveLogicalType, TimeUnit as ParquetTimeUnit}; +use parquet2::statistics::{PrimitiveStatistics, Statistics as ParquetStatistics}; use parquet2::types::NativeType as ParquetNativeType; +use crate::array::*; use crate::datatypes::TimeUnit; use crate::error::Result; -use crate::{datatypes::DataType, types::NativeType}; - -use super::Statistics; - -/// Arrow-deserialized parquet Statistics of a primitive type -#[derive(Debug, Clone, PartialEq)] -pub struct PrimitiveStatistics { - /// the data type - pub data_type: DataType, - /// number of nulls - pub null_count: Option, - /// number of dictinct values - pub distinct_count: Option, - /// Minimum - pub min_value: Option, - /// Maximum - pub max_value: Option, -} - -impl Statistics for PrimitiveStatistics { - fn data_type(&self) -> &DataType { - &self.data_type - } - - fn as_any(&self) -> &dyn Any { - self - } - - fn null_count(&self) -> Option { - self.null_count - } -} - -impl From<(&ParquetPrimitiveStatistics, DataType)> for PrimitiveStatistics -where - T: NativeType, - R: ParquetNativeType, - R: num_traits::AsPrimitive, -{ - fn from((stats, data_type): (&ParquetPrimitiveStatistics, DataType)) -> Self { - Self { - data_type, - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats.min_value.map(|x| x.as_()), - max_value: stats.max_value.map(|x| x.as_()), - } - } -} - -pub(super) fn statistics_from_i32( - stats: &ParquetPrimitiveStatistics, - data_type: DataType, -) -> Result> { - use DataType::*; - Ok(match data_type { - UInt8 => { - Box::new(PrimitiveStatistics::::from((stats, data_type))) as Box - } - UInt16 => Box::new(PrimitiveStatistics::::from((stats, data_type))), - UInt32 => Box::new(PrimitiveStatistics::::from((stats, data_type))), - Int8 => Box::new(PrimitiveStatistics::::from((stats, data_type))), - Int16 => Box::new(PrimitiveStatistics::::from((stats, data_type))), - Decimal(_, _) => Box::new(PrimitiveStatistics::::from((stats, data_type))), - _ => Box::new(PrimitiveStatistics::::from((stats, data_type))), - }) -} +use crate::types::NativeType; -fn timestamp(type_: &PrimitiveType, time_unit: TimeUnit, x: i64) -> i64 { - let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = &type_.logical_type { +pub fn timestamp(logical_type: Option<&PrimitiveLogicalType>, time_unit: TimeUnit, x: i64) -> i64 { + let unit = if let Some(PrimitiveLogicalType::Timestamp { unit, .. }) = logical_type { unit } else { return x; @@ -100,27 +33,23 @@ fn timestamp(type_: &PrimitiveType, time_unit: TimeUnit, x: i64) -> i64 { } } -pub(super) fn statistics_from_i64( - stats: &ParquetPrimitiveStatistics, - data_type: DataType, -) -> Result> { - use DataType::*; - Ok(match data_type { - UInt64 => { - Box::new(PrimitiveStatistics::::from((stats, data_type))) as Box - } - Timestamp(time_unit, None) => Box::new(PrimitiveStatistics:: { - data_type, - null_count: stats.null_count, - distinct_count: stats.distinct_count, - min_value: stats - .min_value - .map(|x| timestamp(&stats.primitive_type, time_unit, x)), - max_value: stats - .max_value - .map(|x| timestamp(&stats.primitive_type, time_unit, x)), - }), - Decimal(_, _) => Box::new(PrimitiveStatistics::::from((stats, data_type))), - _ => Box::new(PrimitiveStatistics::::from((stats, data_type))), - }) +pub(super) fn push Result + Copy>( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, + map: F, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::>().unwrap()); + min.push(from.and_then(|s| s.min_value.map(map)).transpose()?); + max.push(from.and_then(|s| s.max_value.map(map)).transpose()?); + + Ok(()) } diff --git a/src/io/parquet/read/statistics/utf8.rs b/src/io/parquet/read/statistics/utf8.rs new file mode 100644 index 00000000000..7a447e2334f --- /dev/null +++ b/src/io/parquet/read/statistics/utf8.rs @@ -0,0 +1,30 @@ +use crate::array::{MutableArray, MutableUtf8Array, Offset}; +use parquet2::statistics::{BinaryStatistics, Statistics as ParquetStatistics}; + +use crate::error::Result; + +pub(super) fn push( + from: Option<&dyn ParquetStatistics>, + min: &mut dyn MutableArray, + max: &mut dyn MutableArray, +) -> Result<()> { + let min = min + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::>() + .unwrap(); + let from = from.map(|s| s.as_any().downcast_ref::().unwrap()); + + min.push( + from.and_then(|s| s.min_value.as_deref().map(simdutf8::basic::from_utf8)) + .transpose()?, + ); + max.push( + from.and_then(|s| s.max_value.as_deref().map(simdutf8::basic::from_utf8)) + .transpose()?, + ); + Ok(()) +} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 7c8daf23641..623f8023e82 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -13,13 +13,9 @@ mod read_indexes; mod write; mod write_async; -type ArrayStats = (Arc, Option>); +type ArrayStats = (Arc, Statistics); -pub fn read_column( - mut reader: R, - row_group: usize, - column: &str, -) -> Result { +pub fn read_column(mut reader: R, column: &str) -> Result { let metadata = read_metadata(&mut reader)?; let schema = infer_schema(&metadata)?; @@ -41,11 +37,11 @@ pub fn read_column( let field = &schema.fields[column]; - let mut statistics = deserialize_statistics(field, metadata.row_groups[row_group].columns())?; + let statistics = deserialize_statistics(field, &metadata.row_groups)?; Ok(( reader.next().unwrap()?.into_arrays().pop().unwrap(), - statistics.pop().unwrap(), + statistics, )) } @@ -275,6 +271,18 @@ pub fn pyarrow_nested_nullable(column: &str) -> Box { pub fn pyarrow_nullable(column: &str) -> Box { let i64_values = &[ + Some(-256), + Some(-1), + None, + Some(3), + None, + Some(5), + Some(6), + Some(7), + None, + Some(9), + ]; + let u32_values = &[ Some(0), Some(1), None, @@ -325,17 +333,11 @@ pub fn pyarrow_nullable(column: &str) -> Box { Some(true), Some(true), ])), - "date" => Box::new( - PrimitiveArray::::from(i64_values) + "timestamp_ms" => Box::new( + PrimitiveArray::::from_iter(u32_values.iter().map(|x| x.map(|x| x as i64))) .to(DataType::Timestamp(TimeUnit::Millisecond, None)), ), - "uint32" => { - let values = i64_values - .iter() - .map(|x| x.map(|x| x as u32)) - .collect::>(); - Box::new(PrimitiveArray::::from(values)) - } + "uint32" => Box::new(PrimitiveArray::::from(u32_values)), "int32_dict" => { let keys = PrimitiveArray::::from([Some(0), Some(1), None, Some(1)]); let values = Arc::new(PrimitiveArray::::from_slice([10, 200])); @@ -376,101 +378,115 @@ pub fn pyarrow_nullable(column: &str) -> Box { } } -pub fn pyarrow_nullable_statistics(column: &str) -> Option> { - Some(match column { - "int64" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int64, - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), - "float64" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Float64, - distinct_count: None, - null_count: Some(3), - min_value: Some(0.0), - max_value: Some(9.0), - }), - "string" => Box::new(Utf8Statistics { - null_count: Some(4), - distinct_count: None, - min_value: Some("".to_string()), - max_value: Some("def".to_string()), - }), - "bool" => Box::new(BooleanStatistics { - null_count: Some(4), - distinct_count: None, - - min_value: Some(false), - max_value: Some(true), - }), - "date" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Timestamp(TimeUnit::Millisecond, None), - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), - "uint32" => Box::new(PrimitiveStatistics:: { - data_type: DataType::UInt32, - null_count: Some(3), - distinct_count: None, - - min_value: Some(0), - max_value: Some(9), - }), - "int32_dict" => Box::new(PrimitiveStatistics { - data_type: DataType::Dictionary(IntegerType::Int32, Box::new(DataType::Int32), false), - null_count: Some(0), - distinct_count: None, - min_value: Some(10), - max_value: Some(200), - }), - "decimal_9" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(3), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(9, 0), - }), - "decimal_18" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(3), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(18, 0), - }), - "decimal_26" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(3), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(26, 0), - }), - "timestamp_us" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Timestamp(TimeUnit::Microsecond, None), - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), - "timestamp_s" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Timestamp(TimeUnit::Second, None), - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), - "timestamp_s_utc" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Timestamp(TimeUnit::Second, Some("UTC".to_string())), - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), +pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { + match column { + "int64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Int64Array::from_slice([-256])), + max_value: Box::new(Int64Array::from_slice([9])), + }, + "float64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Float64Array::from_slice([0.0])), + max_value: Box::new(Float64Array::from_slice([9.0])), + }, + "string" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(4)]), + min_value: Box::new(Utf8Array::::from_slice([""])), + max_value: Box::new(Utf8Array::::from_slice(["def"])), + }, + "bool" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(4)]), + min_value: Box::new(BooleanArray::from_slice([false])), + max_value: Box::new(BooleanArray::from_slice([true])), + }, + "timestamp_ms" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new( + Int64Array::from_slice([0]).to(DataType::Timestamp(TimeUnit::Millisecond, None)), + ), + max_value: Box::new( + Int64Array::from_slice([9]).to(DataType::Timestamp(TimeUnit::Millisecond, None)), + ), + }, + "uint32" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(UInt32Array::from_slice([0])), + max_value: Box::new(UInt32Array::from_slice([9])), + }, + "int32_dict" => { + let new_dict = |array: Arc| -> Box { + Box::new(DictionaryArray::::from_data( + vec![Some(0)].into(), + array, + )) + }; + + Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(0)]), + min_value: new_dict(Arc::new(Int32Array::from_slice([10]))), + max_value: new_dict(Arc::new(Int32Array::from_slice([200]))), + } + } + "decimal_9" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(9, 0))), + max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(9, 0))), + }, + "decimal_18" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(18, 0))), + max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(18, 0))), + }, + "decimal_26" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(26, 0))), + max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(26, 0))), + }, + "timestamp_us" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new( + Int64Array::from_slice([-256]).to(DataType::Timestamp(TimeUnit::Microsecond, None)), + ), + max_value: Box::new( + Int64Array::from_slice([9]).to(DataType::Timestamp(TimeUnit::Microsecond, None)), + ), + }, + "timestamp_s" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new( + Int64Array::from_slice([-256]).to(DataType::Timestamp(TimeUnit::Second, None)), + ), + max_value: Box::new( + Int64Array::from_slice([9]).to(DataType::Timestamp(TimeUnit::Second, None)), + ), + }, + "timestamp_s_utc" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(3)]), + min_value: Box::new(Int64Array::from_slice([-256]).to(DataType::Timestamp( + TimeUnit::Second, + Some("UTC".to_string()), + ))), + max_value: Box::new(Int64Array::from_slice([9]).to(DataType::Timestamp( + TimeUnit::Second, + Some("UTC".to_string()), + ))), + }, _ => unreachable!(), - }) + } } // these values match the values in `integration` @@ -478,14 +494,14 @@ pub fn pyarrow_required(column: &str) -> Box { let i64_values = &[ Some(-256), Some(-1), - Some(0), - Some(1), Some(2), Some(3), Some(4), Some(5), Some(6), Some(7), + Some(8), + Some(9), ]; match column { @@ -521,107 +537,138 @@ pub fn pyarrow_required(column: &str) -> Box { } } -pub fn pyarrow_required_statistics(column: &str) -> Option> { - Some(match column { - "int64" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int64, - null_count: Some(0), - distinct_count: None, - min_value: Some(0), - max_value: Some(9), - }), - "bool" => Box::new(BooleanStatistics { - null_count: Some(0), - distinct_count: None, - min_value: Some(false), - max_value: Some(true), - }), - "string" => Box::new(Utf8Statistics { - null_count: Some(0), - distinct_count: None, - min_value: Some("".to_string()), - max_value: Some("def".to_string()), - }), - "decimal_9" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(0), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(9, 0), - }), - "decimal_18" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(0), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(18, 0), - }), - "decimal_26" => Box::new(PrimitiveStatistics:: { - distinct_count: None, - null_count: Some(0), - min_value: Some(0i128), - max_value: Some(9i128), - data_type: DataType::Decimal(26, 0), - }), - _ => unreachable!(), - }) +pub fn pyarrow_required_statistics(column: &str) -> Statistics { + let mut s = pyarrow_nullable_statistics(column); + s.null_count = UInt64Array::from([Some(0)]); + s } -pub fn pyarrow_nested_nullable_statistics(column: &str) -> Option> { - Some(match column { - "list_int16" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int16, - distinct_count: None, - null_count: Some(1), - min_value: Some(0), - max_value: Some(10), - }), - "list_bool" => Box::new(BooleanStatistics { - distinct_count: None, - null_count: Some(1), - min_value: Some(false), - max_value: Some(true), - }), - "list_utf8" => Box::new(Utf8Statistics { - distinct_count: None, - null_count: Some(1), - min_value: Some("".to_string()), - max_value: Some("def".to_string()), - }), - "list_large_binary" => Box::new(BinaryStatistics { - distinct_count: None, - null_count: Some(1), - min_value: Some(b"".to_vec()), - max_value: Some(b"def".to_vec()), - }), - _ => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int64, - distinct_count: None, - null_count: Some(3), - min_value: Some(0), - max_value: Some(9), - }), - }) +pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { + let new_list = |array: Arc, nullable: bool| { + Box::new(ListArray::::new( + DataType::List(Box::new(Field::new( + "item", + array.data_type().clone(), + nullable, + ))), + vec![0, array.len() as i32].into(), + array, + None, + )) as Box + }; + + match column { + "list_int16" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(4)]), // this should be 1, see ARROW-16299 + min_value: new_list(Arc::new(Int16Array::from_slice([0])), true), + max_value: new_list(Arc::new(Int16Array::from_slice([10])), true), + }, + "list_bool" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(4)]), + min_value: new_list(Arc::new(BooleanArray::from_slice([false])), true), + max_value: new_list(Arc::new(BooleanArray::from_slice([true])), true), + }, + "list_utf8" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(4)].into(), + min_value: new_list(Arc::new(Utf8Array::::from_slice([""])), true), + max_value: new_list(Arc::new(Utf8Array::::from_slice(["ccc"])), true), + }, + "list_large_binary" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + min_value: new_list(Arc::new(BinaryArray::::from_slice([b""])), true), + max_value: new_list(Arc::new(BinaryArray::::from_slice([b"ccc"])), true), + }, + "list_int64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + min_value: new_list(Arc::new(Int64Array::from_slice([0])), true), + max_value: new_list(Arc::new(Int64Array::from_slice([10])), true), + }, + "list_int64_required" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(3)].into(), // this should be 1, see ARROW-16299 + min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), + max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), + }, + "list_int64_required_required" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), + max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), + }, + "list_nested_i64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(7)].into(), // this should be 2, see ARROW-16299 + min_value: new_list( + new_list(Arc::new(Int64Array::from_slice([0])), true).into(), + true, + ), + max_value: new_list( + new_list(Arc::new(Int64Array::from_slice([10])), true).into(), + true, + ), + }, + "list_nested_inner_required_required_i64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + min_value: new_list( + new_list(Arc::new(Int64Array::from_slice([0])), true).into(), + true, + ), + max_value: new_list( + new_list(Arc::new(Int64Array::from_slice([10])), true).into(), + true, + ), + }, + "list_nested_inner_required_i64" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: [Some(4)].into(), // this should be 0, see ARROW-16299 + min_value: new_list( + new_list(Arc::new(Int64Array::from_slice([0])), true).into(), + true, + ), + max_value: new_list( + new_list(Arc::new(Int64Array::from_slice([10])), true).into(), + true, + ), + }, + _ => todo!(), + } } -pub fn pyarrow_nested_edge_statistics(column: &str) -> Option> { - Some(match column { - "simple" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int64, - distinct_count: None, - null_count: Some(0), - min_value: Some(0), - max_value: Some(1), - }), - "null" => Box::new(PrimitiveStatistics:: { - data_type: DataType::Int64, - distinct_count: None, - null_count: Some(0), - min_value: None, - max_value: None, - }), +pub fn pyarrow_nested_edge_statistics(column: &str) -> Statistics { + let new_list = |array: Arc| { + Box::new(ListArray::::new( + DataType::List(Box::new(Field::new( + "item", + array.data_type().clone(), + true, + ))), + vec![0, array.len() as i32].into(), + array, + None, + )) + }; + + match column { + "simple" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(0)]), + min_value: new_list(Arc::new(Int64Array::from([Some(0)]))), + max_value: new_list(Arc::new(Int64Array::from([Some(1)]))), + }, + "null" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(1)]), + min_value: new_list(Arc::new(Int64Array::from([None]))), + max_value: new_list(Arc::new(Int64Array::from([None]))), + }, _ => unreachable!(), - }) + } } pub fn pyarrow_struct(column: &str) -> Box { @@ -682,20 +729,20 @@ pub fn pyarrow_struct(column: &str) -> Box { } } -pub fn pyarrow_struct_statistics(column: &str) -> Option> { +pub fn pyarrow_struct_statistics(column: &str) -> Statistics { match column { - "struct" => Some(Box::new(BooleanStatistics { - distinct_count: None, - null_count: Some(4), - min_value: Some(false), - max_value: Some(true), - })), - "struct_struct" => Some(Box::new(BooleanStatistics { - distinct_count: None, - null_count: Some(1), - min_value: Some(false), - max_value: Some(true), - })), + "struct" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(4)]), + min_value: Box::new(BooleanArray::from_slice([false])), + max_value: Box::new(BooleanArray::from_slice([true])), + }, + "struct_struct" => Statistics { + distinct_count: UInt64Array::from([None]), + null_count: UInt64Array::from([Some(1)]), + min_value: Box::new(BooleanArray::from_slice([false])), + max_value: Box::new(BooleanArray::from_slice([true])), + }, _ => todo!(), } } @@ -742,6 +789,11 @@ fn integration_read(data: &[u8]) -> Result { let reader = Cursor::new(data); let reader = FileReader::try_new(reader, None, None, None, None)?; let schema = reader.schema().clone(); + + for field in &schema.fields { + let mut _statistics = deserialize_statistics(field, &reader.metadata().row_groups)?; + } + let batches = reader.collect::>>()?; Ok((schema, batches)) diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 3a946f71a0d..c92d92fb925 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -30,7 +30,7 @@ fn test_pyarrow_integration( ); let mut file = File::open(path).unwrap(); - let (array, statistics) = read_column(&mut file, 0, column)?; + let (array, statistics) = read_column(&mut file, column)?; let expected = match (type_, required) { ("basic", true) => pyarrow_required(column), @@ -104,8 +104,8 @@ fn v1_boolean_required() -> Result<()> { } #[test] -fn v1_timestamp_nullable() -> Result<()> { - test_pyarrow_integration("date", 1, "basic", false, false, None) +fn v1_timestamp_ms_nullable() -> Result<()> { + test_pyarrow_integration("timestamp_ms", 1, "basic", false, false, None) } #[test] @@ -200,11 +200,6 @@ fn v2_nested_int64_nullable_required() -> Result<()> { test_pyarrow_integration("list_int64", 2, "nested", false, false, None) } -#[test] -fn v1_nested_int64_nullable_required() -> Result<()> { - test_pyarrow_integration("list_int64", 1, "nested", false, false, None) -} - #[test] fn v2_nested_int64_required_required() -> Result<()> { test_pyarrow_integration("list_int64_required", 2, "nested", false, false, None) diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index 06f3a706f02..591512a3253 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -55,9 +55,9 @@ fn round_trip( let data = writer.into_inner().into_inner(); - let (result, stats) = read_column(&mut Cursor::new(data), 0, "a1")?; + let (result, stats) = read_column(&mut Cursor::new(data), "a1")?; assert_eq!(array.as_ref(), result.as_ref()); - assert_eq!(statistics.as_ref(), stats.as_ref()); + assert_eq!(statistics, stats); Ok(()) } From 218a0315f1ac0f2180abe79ee4749411dd87a1ca Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 25 Apr 2022 06:04:12 +0000 Subject: [PATCH 2/3] Fixed error in decimal stats --- src/io/parquet/write/dictionary.rs | 11 ++--- src/io/parquet/write/fixed_len_bytes.rs | 42 +++++++++++++------ src/io/parquet/write/mod.rs | 52 ++++++++++++++++++++---- src/io/parquet/write/primitive/basic.rs | 14 ++++--- src/io/parquet/write/primitive/nested.rs | 6 ++- tests/it/io/parquet/mod.rs | 20 ++++----- tests/it/io/parquet/read.rs | 18 +++++++- 7 files changed, 119 insertions(+), 44 deletions(-) diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index 7a36f2bbef1..29c062d4a49 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -2,7 +2,7 @@ use parquet2::{ encoding::{hybrid_rle::encode_u32, Encoding}, metadata::Descriptor, page::{EncodedDictPage, EncodedPage}, - statistics::ParquetStatistics, + statistics::{serialize_statistics, ParquetStatistics}, write::DynIter, }; @@ -120,10 +120,10 @@ macro_rules! dyn_prim { let mut buffer = vec![]; primitive_encode_plain::<$from, $to>(values, false, &mut buffer); - ( - EncodedDictPage::new(buffer, values.len()), - primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()), - ) + let stats = + primitive_build_statistics::<$from, $to>(values, $descriptor.primitive_type.clone()); + let stats = serialize_statistics(&stats); + (EncodedDictPage::new(buffer, values.len()), stats) }}; } @@ -191,6 +191,7 @@ pub fn array_to_pages( fixed_binary_encode_plain(array, false, &mut buffer); let stats = fixed_binary_build_statistics(array, descriptor.primitive_type.clone()); + let stats = serialize_statistics(&stats); (EncodedDictPage::new(buffer, array.len()), stats) } other => { diff --git a/src/io/parquet/write/fixed_len_bytes.rs b/src/io/parquet/write/fixed_len_bytes.rs index 59ae75134e8..a2494f82e38 100644 --- a/src/io/parquet/write/fixed_len_bytes.rs +++ b/src/io/parquet/write/fixed_len_bytes.rs @@ -3,12 +3,12 @@ use parquet2::{ metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, - statistics::{serialize_statistics, FixedLenStatistics, ParquetStatistics, Statistics}, + statistics::{serialize_statistics, FixedLenStatistics}, }; use super::{binary::ord_binary, utils, WriteOptions}; use crate::{ - array::{Array, FixedSizeBinaryArray}, + array::{Array, FixedSizeBinaryArray, PrimitiveArray}, error::Result, io::parquet::read::schema::is_nullable, }; @@ -30,6 +30,7 @@ pub fn array_to_page( array: &FixedSizeBinaryArray, options: WriteOptions, descriptor: Descriptor, + statistics: Option, ) -> Result { let is_optional = is_nullable(&descriptor.primitive_type.field_info); let validity = array.validity(); @@ -47,12 +48,6 @@ pub fn array_to_page( encode_plain(array, is_optional, &mut buffer); - let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) - } else { - None - }; - utils::build_plain_page( buffer, array.len(), @@ -60,7 +55,7 @@ pub fn array_to_page( array.null_count(), 0, definition_levels_byte_length, - statistics, + statistics.map(|x| serialize_statistics(&x)), descriptor, options, Encoding::Plain, @@ -70,8 +65,8 @@ pub fn array_to_page( pub(super) fn build_statistics( array: &FixedSizeBinaryArray, primitive_type: PrimitiveType, -) -> ParquetStatistics { - let statistics = &FixedLenStatistics { +) -> FixedLenStatistics { + FixedLenStatistics { primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, @@ -85,6 +80,27 @@ pub(super) fn build_statistics( .flatten() .min_by(|x, y| ord_binary(x, y)) .map(|x| x.to_vec()), - } as &dyn Statistics; - serialize_statistics(statistics) + } +} + +pub(super) fn build_statistics_decimal( + array: &PrimitiveArray, + primitive_type: PrimitiveType, + size: usize, +) -> FixedLenStatistics { + FixedLenStatistics { + primitive_type, + null_count: Some(array.null_count() as i64), + distinct_count: None, + max_value: array + .iter() + .flatten() + .max() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()), + min_value: array + .iter() + .flatten() + .min() + .map(|x| x.to_be_bytes()[16 - size..].to_vec()), + } } diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 1e32f1745da..5081df4fb5d 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -243,7 +243,15 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + &array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) } DataType::Interval(IntervalUnit::DayTime) => { let array = array @@ -261,13 +269,29 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + &array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) + } + DataType::FixedSizeBinary(_) => { + let array = array.as_any().downcast_ref().unwrap(); + let statistics = if options.write_statistics { + Some(fixed_len_bytes::build_statistics( + array, + descriptor.primitive_type.clone(), + )) + } else { + None + }; + + fixed_len_bytes::array_to_page(array, options, descriptor, statistics) } - DataType::FixedSizeBinary(_) => fixed_len_bytes::array_to_page( - array.as_any().downcast_ref().unwrap(), - options, - descriptor, - ), DataType::Decimal(precision, _) => { let precision = *precision; let array = array @@ -298,6 +322,18 @@ pub fn array_to_page( primitive::array_to_page::(&array, options, descriptor) } else { let size = decimal_length_from_precision(precision); + + let statistics = if options.write_statistics { + let stats = fixed_len_bytes::build_statistics_decimal( + array, + descriptor.primitive_type.clone(), + size, + ); + Some(stats) + } else { + None + }; + let mut values = Vec::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { let bytes = &x.to_be_bytes()[16 - size..]; @@ -308,7 +344,7 @@ pub fn array_to_page( values.into(), array.validity().cloned(), ); - fixed_len_bytes::array_to_page(&array, options, descriptor) + fixed_len_bytes::array_to_page(&array, options, descriptor, statistics) } } DataType::FixedSizeList(_, _) | DataType::List(_) | DataType::LargeList(_) => { diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 1c58804fa1a..70c54f2ad00 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -3,7 +3,7 @@ use parquet2::{ metadata::Descriptor, page::DataPage, schema::types::PrimitiveType, - statistics::{serialize_statistics, ParquetStatistics, PrimitiveStatistics, Statistics}, + statistics::{serialize_statistics, PrimitiveStatistics}, types::NativeType, }; @@ -67,7 +67,10 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(serialize_statistics(&build_statistics( + array, + descriptor.primitive_type.clone(), + ))) } else { None }; @@ -89,13 +92,13 @@ where pub fn build_statistics( array: &PrimitiveArray, primitive_type: PrimitiveType, -) -> ParquetStatistics +) -> PrimitiveStatistics where T: ArrowNativeType, R: NativeType, T: num_traits::AsPrimitive, { - let statistics = &PrimitiveStatistics:: { + PrimitiveStatistics:: { primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, @@ -115,6 +118,5 @@ where x }) .min_by(|x, y| x.ord(y)), - } as &dyn Statistics; - serialize_statistics(statistics) + } } diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index 86732fdae97..6acd467c080 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -1,3 +1,4 @@ +use parquet2::statistics::serialize_statistics; use parquet2::{encoding::Encoding, metadata::Descriptor, page::DataPage, types::NativeType}; use super::super::levels; @@ -37,7 +38,10 @@ where encode_plain(array, is_optional, &mut buffer); let statistics = if options.write_statistics { - Some(build_statistics(array, descriptor.primitive_type.clone())) + Some(serialize_statistics(&build_statistics( + array, + descriptor.primitive_type.clone(), + ))) } else { None }; diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 623f8023e82..995ad2bf40c 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -560,49 +560,49 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { match column { "list_int16" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), // this should be 1, see ARROW-16299 + null_count: UInt64Array::from([Some(1)]), min_value: new_list(Arc::new(Int16Array::from_slice([0])), true), max_value: new_list(Arc::new(Int16Array::from_slice([10])), true), }, "list_bool" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), + null_count: UInt64Array::from([Some(1)]), min_value: new_list(Arc::new(BooleanArray::from_slice([false])), true), max_value: new_list(Arc::new(BooleanArray::from_slice([true])), true), }, "list_utf8" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Utf8Array::::from_slice([""])), true), max_value: new_list(Arc::new(Utf8Array::::from_slice(["ccc"])), true), }, "list_large_binary" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(BinaryArray::::from_slice([b""])), true), max_value: new_list(Arc::new(BinaryArray::::from_slice([b"ccc"])), true), }, "list_int64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), true), max_value: new_list(Arc::new(Int64Array::from_slice([10])), true), }, "list_int64_required" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 1, see ARROW-16299 + null_count: [Some(1)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_int64_required_required" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_nested_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(7)].into(), // this should be 2, see ARROW-16299 + null_count: [Some(2)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -614,7 +614,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { }, "list_nested_inner_required_required_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(3)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -626,7 +626,7 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { }, "list_nested_inner_required_i64" => Statistics { distinct_count: UInt64Array::from([None]), - null_count: [Some(4)].into(), // this should be 0, see ARROW-16299 + null_count: [Some(0)].into(), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index c92d92fb925..8e9d5b06cad 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -51,7 +51,23 @@ fn test_pyarrow_integration( }; assert_eq!(expected.as_ref(), array.as_ref()); - assert_eq!(expected_statistics, statistics); + if ![ + "list_int16", + "list_large_binary", + "list_int64", + "list_int64_required", + "list_int64_required_required", + "list_nested_i64", + "list_utf8", + "list_bool", + "list_nested_inner_required_required_i64", + "list_nested_inner_required_i64", + ] + .contains(&column) + { + // pyarrow outputs an incorrect number of null count for nested types - ARROW-16299 + assert_eq!(expected_statistics, statistics); + } Ok(()) } From c9fe387d30b76b33016500ed1fabfe0528e4c0a9 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 25 Apr 2022 06:31:40 +0000 Subject: [PATCH 3/3] Fixed struct stats --- examples/parquet_read.rs | 10 +- src/io/parquet/read/statistics/dictionary.rs | 11 +- src/io/parquet/read/statistics/mod.rs | 153 ++++++++++++++++--- src/io/parquet/read/statistics/struct_.rs | 61 ++++++++ tests/it/io/parquet/mod.rs | 153 ++++++++++++------- 5 files changed, 300 insertions(+), 88 deletions(-) create mode 100644 src/io/parquet/read/statistics/struct_.rs diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 4cf14fe82ef..07bd9fcacdd 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -13,7 +13,15 @@ fn main() -> Result<()> { let reader = File::open(file_path)?; let reader = read::FileReader::try_new(reader, Some(&[8]), None, None, None)?; - println!("{:#?}", reader.metadata()); + println!("{:#?}", reader.schema()); + + // say we want to evaluate if the we can skip some row groups based on a field's value + let field = &reader.schema().fields[0]; + + // we can deserialize the parquet statistics from this field + let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?; + + println!("{:#?}", statistics); let start = SystemTime::now(); for maybe_chunk in reader { diff --git a/src/io/parquet/read/statistics/dictionary.rs b/src/io/parquet/read/statistics/dictionary.rs index 29a60fd1862..03c7e7ce966 100644 --- a/src/io/parquet/read/statistics/dictionary.rs +++ b/src/io/parquet/read/statistics/dictionary.rs @@ -1,5 +1,5 @@ use crate::array::*; -use crate::datatypes::DataType; +use crate::datatypes::{DataType, PhysicalType}; use crate::error::Result; use super::make_mutable; @@ -38,8 +38,13 @@ impl MutableArray for DynMutableDictionary { fn as_box(&mut self) -> Box { let inner = self.inner.as_arc(); - let keys = PrimitiveArray::::from_iter((0..inner.len() as i32).map(Some)); - Box::new(DictionaryArray::::from_data(keys, inner)) + match self.data_type.to_physical_type() { + PhysicalType::Dictionary(key) => match_integer_type!(key, |$T| { + let keys = PrimitiveArray::<$T>::from_iter((0..inner.len() as $T).map(Some)); + Box::new(DictionaryArray::<$T>::from_data(keys, inner)) + }), + _ => todo!(), + } } fn as_any(&self) -> &dyn std::any::Any { diff --git a/src/io/parquet/read/statistics/mod.rs b/src/io/parquet/read/statistics/mod.rs index 4e6e41fbdf4..6df1b16ebed 100644 --- a/src/io/parquet/read/statistics/mod.rs +++ b/src/io/parquet/read/statistics/mod.rs @@ -23,19 +23,29 @@ mod dictionary; mod fixlen; mod list; mod primitive; +mod struct_; mod utf8; use self::list::DynMutableListArray; use super::get_field_columns; +/// Enum of a count statistics +#[derive(Debug, PartialEq)] +pub enum Count { + /// simple arrays (every type not a Struct) have a count of UInt64 + Single(UInt64Array), + /// struct arrays have a count as a struct of UInt64 + Struct(StructArray), +} + /// Arrow-deserialized parquet Statistics of a file #[derive(Debug, PartialEq)] pub struct Statistics { - /// number of nulls - pub null_count: UInt64Array, + /// number of nulls. + pub null_count: Count, /// number of dictinct values - pub distinct_count: UInt64Array, + pub distinct_count: Count, /// Minimum pub min_value: Box, /// Maximum @@ -46,9 +56,9 @@ pub struct Statistics { #[derive(Debug)] struct MutableStatistics { /// number of nulls - pub null_count: UInt64Vec, + pub null_count: Box, /// number of dictinct values - pub distinct_count: UInt64Vec, + pub distinct_count: Box, /// Minimum pub min_value: Box, /// Maximum @@ -57,9 +67,48 @@ struct MutableStatistics { impl From for Statistics { fn from(mut s: MutableStatistics) -> Self { + let null_count = if let PhysicalType::Struct = s.null_count.data_type().to_physical_type() { + let a = s + .null_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Struct(a) + } else { + let a = s + .null_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Single(a) + }; + let distinct_count = + if let PhysicalType::Struct = s.distinct_count.data_type().to_physical_type() { + let a = s + .distinct_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Struct(a) + } else { + let a = s + .distinct_count + .as_box() + .as_any() + .downcast_ref::() + .unwrap() + .clone(); + Count::Single(a) + }; Self { - null_count: s.null_count.into(), - distinct_count: s.distinct_count.into(), + null_count, + distinct_count, min_value: s.min_value.as_box(), max_value: s.max_value.as_box(), } @@ -98,6 +147,10 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result Box::new( dictionary::DynMutableDictionary::try_with_capacity(data_type.clone(), capacity)?, ), + PhysicalType::Struct => Box::new(struct_::DynMutableStructArray::try_with_capacity( + data_type.clone(), + capacity, + )?), other => { return Err(ArrowError::NotYetImplemented(format!( "Deserializing parquet stats from {:?} is still not implemented", @@ -107,14 +160,28 @@ fn make_mutable(data_type: &DataType, capacity: usize) -> Result DataType { + if let DataType::Struct(fields) = data_type.to_logical_type() { + DataType::Struct( + fields + .iter() + .map(|f| Field::new(&f.name, create_dt(&f.data_type), f.is_nullable)) + .collect(), + ) + } else { + DataType::UInt64 + } +} + impl MutableStatistics { fn try_new(field: &Field) -> Result { let min_value = make_mutable(&field.data_type, 0)?; let max_value = make_mutable(&field.data_type, 0)?; + let dt = create_dt(&field.data_type); Ok(Self { - null_count: UInt64Vec::new(), - distinct_count: UInt64Vec::new(), + null_count: make_mutable(&dt, 0)?, + distinct_count: make_mutable(&dt, 0)?, min_value, max_value, }) @@ -188,11 +255,11 @@ fn push_others( } fn push( - mut stats: VecDeque<(Option>, ParquetPrimitiveType)>, + stats: &mut VecDeque<(Option>, ParquetPrimitiveType)>, min: &mut dyn MutableArray, max: &mut dyn MutableArray, - distinct_count: &mut UInt64Vec, - null_count: &mut UInt64Vec, + distinct_count: &mut dyn MutableArray, + null_count: &mut dyn MutableArray, ) -> Result<()> { match min.data_type().to_logical_type() { List(_) | LargeList(_) => { @@ -229,12 +296,51 @@ fn push( null_count, ); } + Struct(_) => { + let min = min + .as_mut_any() + .downcast_mut::() + .unwrap(); + let max = max + .as_mut_any() + .downcast_mut::() + .unwrap(); + let distinct_count = distinct_count + .as_mut_any() + .downcast_mut::() + .unwrap(); + let null_count = null_count + .as_mut_any() + .downcast_mut::() + .unwrap(); + return min + .inner + .iter_mut() + .zip(max.inner.iter_mut()) + .zip(distinct_count.inner.iter_mut()) + .zip(null_count.inner.iter_mut()) + .try_for_each(|(((min, max), distinct_count), null_count)| { + push( + stats, + min.as_mut(), + max.as_mut(), + distinct_count.as_mut(), + null_count.as_mut(), + ) + }); + } _ => {} } let (from, type_) = stats.pop_front().unwrap(); let from = from.as_deref(); + let distinct_count = distinct_count + .as_mut_any() + .downcast_mut::() + .unwrap(); + let null_count = null_count.as_mut_any().downcast_mut::().unwrap(); + push_others(from, distinct_count, null_count); let physical_type = &type_.physical_type; @@ -288,21 +394,18 @@ fn push( } } -/// Deserializes [`ParquetStatistics`] into [`Statistics`] associated to `field` +/// Deserializes the statistics in the column chunks from all `row_groups` +/// into [`Statistics`] associated from `field`'s name. /// -/// For non-nested types, it returns a single column. -/// For nested types, it returns one column per parquet primitive column. -pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Result { - if groups.is_empty() { - todo!("Return an empty statistics") - } - +/// # Errors +/// This function errors if the deserialization of the statistics fails (e.g. invalid utf8) +pub fn deserialize(field: &Field, row_groups: &[RowGroupMetaData]) -> Result { let mut statistics = MutableStatistics::try_new(field)?; // transpose - groups.iter().try_for_each(|group| { + row_groups.iter().try_for_each(|group| { let columns = get_field_columns(group.columns(), field.name.as_ref()); - let stats = columns + let mut stats = columns .into_iter() .map(|column| { Ok(( @@ -312,11 +415,11 @@ pub fn deserialize_statistics(field: &Field, groups: &[RowGroupMetaData]) -> Res }) .collect::, ParquetPrimitiveType)>>>()?; push( - stats, + &mut stats, statistics.min_value.as_mut(), statistics.max_value.as_mut(), - &mut statistics.distinct_count, - &mut statistics.null_count, + statistics.distinct_count.as_mut(), + statistics.null_count.as_mut(), ) })?; diff --git a/src/io/parquet/read/statistics/struct_.rs b/src/io/parquet/read/statistics/struct_.rs new file mode 100644 index 00000000000..085737d6241 --- /dev/null +++ b/src/io/parquet/read/statistics/struct_.rs @@ -0,0 +1,61 @@ +use crate::array::{Array, StructArray}; +use crate::error::Result; +use crate::{array::MutableArray, datatypes::DataType}; + +use super::make_mutable; + +#[derive(Debug)] +pub struct DynMutableStructArray { + data_type: DataType, + pub inner: Vec>, +} + +impl DynMutableStructArray { + pub fn try_with_capacity(data_type: DataType, capacity: usize) -> Result { + let inners = match data_type.to_logical_type() { + DataType::Struct(inner) => inner, + _ => unreachable!(), + }; + let inner = inners + .iter() + .map(|f| make_mutable(f.data_type(), capacity)) + .collect::>>()?; + + Ok(Self { data_type, inner }) + } +} +impl MutableArray for DynMutableStructArray { + fn data_type(&self) -> &DataType { + &self.data_type + } + + fn len(&self) -> usize { + self.inner.len() + } + + fn validity(&self) -> Option<&crate::bitmap::MutableBitmap> { + None + } + + fn as_box(&mut self) -> Box { + let inner = self.inner.iter_mut().map(|x| x.as_arc()).collect(); + + Box::new(StructArray::new(self.data_type.clone(), inner, None)) + } + + fn as_any(&self) -> &dyn std::any::Any { + self + } + + fn as_mut_any(&mut self) -> &mut dyn std::any::Any { + self + } + + fn push_null(&mut self) { + todo!() + } + + fn shrink_to_fit(&mut self) { + todo!() + } +} diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 995ad2bf40c..4bb52895192 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -37,7 +37,7 @@ pub fn read_column(mut reader: R, column: &str) -> Result Box { pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { match column { "int64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Int64Array::from_slice([-256])), max_value: Box::new(Int64Array::from_slice([9])), }, "float64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Float64Array::from_slice([0.0])), max_value: Box::new(Float64Array::from_slice([9.0])), }, "string" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(4)])), min_value: Box::new(Utf8Array::::from_slice([""])), max_value: Box::new(Utf8Array::::from_slice(["def"])), }, "bool" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(4)])), min_value: Box::new(BooleanArray::from_slice([false])), max_value: Box::new(BooleanArray::from_slice([true])), }, "timestamp_ms" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new( Int64Array::from_slice([0]).to(DataType::Timestamp(TimeUnit::Millisecond, None)), ), @@ -415,8 +415,8 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { ), }, "uint32" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(UInt32Array::from_slice([0])), max_value: Box::new(UInt32Array::from_slice([9])), }, @@ -429,33 +429,33 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { }; Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(0)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(0)])), min_value: new_dict(Arc::new(Int32Array::from_slice([10]))), max_value: new_dict(Arc::new(Int32Array::from_slice([200]))), } } "decimal_9" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(9, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(9, 0))), }, "decimal_18" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(18, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(18, 0))), }, "decimal_26" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Int128Array::from_slice([-256]).to(DataType::Decimal(26, 0))), max_value: Box::new(Int128Array::from_slice([9]).to(DataType::Decimal(26, 0))), }, "timestamp_us" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new( Int64Array::from_slice([-256]).to(DataType::Timestamp(TimeUnit::Microsecond, None)), ), @@ -464,8 +464,8 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { ), }, "timestamp_s" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new( Int64Array::from_slice([-256]).to(DataType::Timestamp(TimeUnit::Second, None)), ), @@ -474,8 +474,8 @@ pub fn pyarrow_nullable_statistics(column: &str) -> Statistics { ), }, "timestamp_s_utc" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(3)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(3)])), min_value: Box::new(Int64Array::from_slice([-256]).to(DataType::Timestamp( TimeUnit::Second, Some("UTC".to_string()), @@ -539,7 +539,7 @@ pub fn pyarrow_required(column: &str) -> Box { pub fn pyarrow_required_statistics(column: &str) -> Statistics { let mut s = pyarrow_nullable_statistics(column); - s.null_count = UInt64Array::from([Some(0)]); + s.null_count = Count::Single(UInt64Array::from([Some(0)])); s } @@ -559,50 +559,50 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { match column { "list_int16" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(1)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(1)])), min_value: new_list(Arc::new(Int16Array::from_slice([0])), true), max_value: new_list(Arc::new(Int16Array::from_slice([10])), true), }, "list_bool" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(1)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(1)])), min_value: new_list(Arc::new(BooleanArray::from_slice([false])), true), max_value: new_list(Arc::new(BooleanArray::from_slice([true])), true), }, "list_utf8" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(1)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(1)].into()), min_value: new_list(Arc::new(Utf8Array::::from_slice([""])), true), max_value: new_list(Arc::new(Utf8Array::::from_slice(["ccc"])), true), }, "list_large_binary" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(1)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(1)].into()), min_value: new_list(Arc::new(BinaryArray::::from_slice([b""])), true), max_value: new_list(Arc::new(BinaryArray::::from_slice([b"ccc"])), true), }, "list_int64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(1)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(1)].into()), min_value: new_list(Arc::new(Int64Array::from_slice([0])), true), max_value: new_list(Arc::new(Int64Array::from_slice([10])), true), }, "list_int64_required" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(1)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(1)].into()), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_int64_required_required" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(0)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(0)].into()), min_value: new_list(Arc::new(Int64Array::from_slice([0])), false), max_value: new_list(Arc::new(Int64Array::from_slice([10])), false), }, "list_nested_i64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(2)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(2)].into()), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -613,8 +613,8 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { ), }, "list_nested_inner_required_required_i64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(0)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(0)].into()), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -625,8 +625,8 @@ pub fn pyarrow_nested_nullable_statistics(column: &str) -> Statistics { ), }, "list_nested_inner_required_i64" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: [Some(0)].into(), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single([Some(0)].into()), min_value: new_list( new_list(Arc::new(Int64Array::from_slice([0])), true).into(), true, @@ -656,14 +656,14 @@ pub fn pyarrow_nested_edge_statistics(column: &str) -> Statistics { match column { "simple" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(0)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(0)])), min_value: new_list(Arc::new(Int64Array::from([Some(0)]))), max_value: new_list(Arc::new(Int64Array::from([Some(1)]))), }, "null" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(1)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(1)])), min_value: new_list(Arc::new(Int64Array::from([None]))), max_value: new_list(Arc::new(Int64Array::from([None]))), }, @@ -730,16 +730,51 @@ pub fn pyarrow_struct(column: &str) -> Box { } pub fn pyarrow_struct_statistics(column: &str) -> Statistics { + let new_struct = |arrays: Vec>, names: Vec| { + let fields = names + .into_iter() + .zip(arrays.iter()) + .map(|(n, a)| Field::new(n, a.data_type().clone(), true)) + .collect(); + StructArray::new(DataType::Struct(fields), arrays, None) + }; + + let names = vec!["f1".to_string(), "f2".to_string()]; + match column { "struct" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(4)]), - min_value: Box::new(BooleanArray::from_slice([false])), - max_value: Box::new(BooleanArray::from_slice([true])), + distinct_count: Count::Struct(new_struct( + vec![ + Arc::new(UInt64Array::from([None])), + Arc::new(UInt64Array::from([None])), + ], + names.clone(), + )), + null_count: Count::Struct(new_struct( + vec![ + Arc::new(UInt64Array::from([Some(4)])), + Arc::new(UInt64Array::from([Some(4)])), + ], + names.clone(), + )), + min_value: Box::new(new_struct( + vec![ + Arc::new(Utf8Array::::from_slice([""])), + Arc::new(BooleanArray::from_slice([false])), + ], + names.clone(), + )), + max_value: Box::new(new_struct( + vec![ + Arc::new(Utf8Array::::from_slice(["def"])), + Arc::new(BooleanArray::from_slice([true])), + ], + names, + )), }, "struct_struct" => Statistics { - distinct_count: UInt64Array::from([None]), - null_count: UInt64Array::from([Some(1)]), + distinct_count: Count::Single(UInt64Array::from([None])), + null_count: Count::Single(UInt64Array::from([Some(1)])), min_value: Box::new(BooleanArray::from_slice([false])), max_value: Box::new(BooleanArray::from_slice([true])), }, @@ -791,7 +826,7 @@ fn integration_read(data: &[u8]) -> Result { let schema = reader.schema().clone(); for field in &schema.fields { - let mut _statistics = deserialize_statistics(field, &reader.metadata().row_groups)?; + let mut _statistics = deserialize(field, &reader.metadata().row_groups)?; } let batches = reader.collect::>>()?;