diff --git a/arrow-parquet-integration-testing/main.py b/arrow-parquet-integration-testing/main.py index 42e19e81b8f..2ca76d99b1f 100644 --- a/arrow-parquet-integration-testing/main.py +++ b/arrow-parquet-integration-testing/main.py @@ -24,7 +24,7 @@ def _prepare( write, "--version", version, - "--encoding-utf8", + "--encoding-int", encoding_utf8, "--compression", compression, @@ -75,8 +75,8 @@ def variations(): # "generated_custom_metadata", ]: # pyarrow does not support decoding "delta"-encoded values. - # for encoding in ["plain", "delta"]: - for encoding in ["plain"]: + for encoding in ["plain", "delta"]: + #for encoding in ["plain"]: for compression in ["uncompressed", "zstd", "snappy"]: yield (version, file, compression, encoding) @@ -95,4 +95,4 @@ def variations(): if str(c1.type) in ["month_interval", "day_time_interval"]: # pyarrow does not support interval types from parquet continue - assert c1 == c2 + assert c1 == c2, (c1, c2) diff --git a/arrow-parquet-integration-testing/src/main.rs b/arrow-parquet-integration-testing/src/main.rs index d79da943187..831dd6b5f00 100644 --- a/arrow-parquet-integration-testing/src/main.rs +++ b/arrow-parquet-integration-testing/src/main.rs @@ -1,10 +1,9 @@ use std::fs::File; -use std::{io::Read}; +use std::io::Read; use arrow2::array::Array; use arrow2::io::ipc::IpcField; use arrow2::{ - AHashMap, chunk::Chunk, datatypes::{DataType, Schema}, error::Result, @@ -16,6 +15,7 @@ use arrow2::{ RowGroupIterator, Version as ParquetVersion, WriteOptions, }, }, + AHashMap, }; use clap::Parser; use flate2::read::GzDecoder; @@ -110,6 +110,8 @@ struct Args { projection: Option, #[clap(short, long, arg_enum, help = "encoding scheme for utf8", default_value_t = EncodingScheme::Plain)] encoding_utf8: EncodingScheme, + #[clap(short('i'), long, arg_enum, help = "encoding scheme for int", default_value_t = EncodingScheme::Plain)] + encoding_int: EncodingScheme, #[clap(short, long, arg_enum)] compression: Compression, } @@ -178,6 +180,13 @@ fn main() -> Result<()> { .map(|f| { transverse(&f.data_type, |dt| match dt { DataType::Dictionary(..) => Encoding::RleDictionary, + DataType::Int32 => { + if args.encoding_int == EncodingScheme::Delta { + Encoding::DeltaBinaryPacked + } else { + Encoding::Plain + } + } DataType::Utf8 | DataType::LargeUtf8 => { if args.encoding_utf8 == EncodingScheme::Delta { Encoding::DeltaLengthByteArray diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 648e78782a0..67f017872ca 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -19,7 +19,7 @@ use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, Optio use super::super::Pages; #[derive(Debug)] -struct FilteredRequiredValues<'a> { +pub(super) struct FilteredRequiredValues<'a> { values: SliceFilteredIter>, } @@ -89,7 +89,7 @@ where // The state of a `DataPage` of `Primitive` parquet primitive type #[derive(Debug)] -enum State<'a, T> +pub(super) enum State<'a, T> where T: NativeType, { @@ -118,7 +118,7 @@ where } #[derive(Debug)] -struct PrimitiveDecoder +pub(super) struct PrimitiveDecoder where T: NativeType, P: ParquetNativeType, @@ -126,7 +126,7 @@ where { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData

, - op: F, + pub op: F, } impl PrimitiveDecoder @@ -136,7 +136,7 @@ where F: Fn(P) -> T, { #[inline] - fn new(op: F) -> Self { + pub(super) fn new(op: F) -> Self { Self { phantom: std::marker::PhantomData, phantom_p: std::marker::PhantomData, @@ -183,9 +183,9 @@ where Ok(State::Optional(validity, values)) } (Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::

(page)?)), - (Encoding::Plain, _, false, true) => Ok(State::FilteredRequired( - FilteredRequiredValues::try_new::

(page)?, - )), + (Encoding::Plain, _, false, true) => { + FilteredRequiredValues::try_new::

(page).map(State::FilteredRequired) + } (Encoding::Plain, _, true, true) => Ok(State::FilteredOptional( FilteredOptionalPageValidity::try_new(page)?, Values::try_new::

(page)?, diff --git a/src/io/parquet/read/deserialize/primitive/integer.rs b/src/io/parquet/read/deserialize/primitive/integer.rs new file mode 100644 index 00000000000..3e09e421505 --- /dev/null +++ b/src/io/parquet/read/deserialize/primitive/integer.rs @@ -0,0 +1,260 @@ +use std::collections::VecDeque; + +use num_traits::AsPrimitive; +use parquet2::{ + deserialize::SliceFilteredIter, + encoding::{delta_bitpacked::Decoder, Encoding}, + page::{split_buffer, DataPage, DictPage}, + schema::Repetition, + types::NativeType as ParquetNativeType, +}; + +use crate::{ + array::MutablePrimitiveArray, + bitmap::MutableBitmap, + datatypes::DataType, + error::Result, + io::parquet::read::deserialize::utils::{ + get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity, + }, + types::NativeType, +}; + +use super::super::utils; +use super::super::Pages; + +use super::basic::{finish, PrimitiveDecoder, State as PrimitiveState}; + +/// The state of a [`DataPage`] of an integer parquet type (i32 or i64) +#[derive(Debug)] +enum State<'a, T> +where + T: NativeType, +{ + Common(PrimitiveState<'a, T>), + DeltaBinaryPackedRequired(Decoder<'a>), + DeltaBinaryPackedOptional(OptionalPageValidity<'a>, Decoder<'a>), + FilteredDeltaBinaryPackedRequired(SliceFilteredIter>), + FilteredDeltaBinaryPackedOptional(FilteredOptionalPageValidity<'a>, Decoder<'a>), +} + +impl<'a, T> utils::PageState<'a> for State<'a, T> +where + T: NativeType, +{ + fn len(&self) -> usize { + match self { + State::Common(state) => state.len(), + State::DeltaBinaryPackedRequired(state) => state.size_hint().0, + State::DeltaBinaryPackedOptional(state, _) => state.len(), + State::FilteredDeltaBinaryPackedRequired(state) => state.size_hint().0, + State::FilteredDeltaBinaryPackedOptional(state, _) => state.len(), + } + } +} + +/// Decoder of integer parquet type +#[derive(Debug)] +struct IntDecoder(PrimitiveDecoder) +where + T: NativeType, + P: ParquetNativeType, + i64: num_traits::AsPrimitive

, + F: Fn(P) -> T; + +impl IntDecoder +where + T: NativeType, + P: ParquetNativeType, + i64: num_traits::AsPrimitive

, + F: Fn(P) -> T, +{ + #[inline] + fn new(op: F) -> Self { + Self(PrimitiveDecoder::new(op)) + } +} + +impl<'a, T, P, F> utils::Decoder<'a> for IntDecoder +where + T: NativeType, + P: ParquetNativeType, + i64: num_traits::AsPrimitive

, + F: Copy + Fn(P) -> T, +{ + type State = State<'a, T>; + type Dict = Vec; + type DecodedState = (Vec, MutableBitmap); + + fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result { + let is_optional = + page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); + + match (page.encoding(), dict, is_optional, is_filtered) { + (Encoding::DeltaBinaryPacked, _, false, false) => { + let (_, _, values) = split_buffer(page)?; + Ok(State::DeltaBinaryPackedRequired(Decoder::new(values))) + } + (Encoding::DeltaBinaryPacked, _, true, false) => { + let (_, _, values) = split_buffer(page)?; + Ok(State::DeltaBinaryPackedOptional( + OptionalPageValidity::try_new(page)?, + Decoder::new(values), + )) + } + (Encoding::DeltaBinaryPacked, _, false, true) => { + let (_, _, values) = split_buffer(page)?; + let values = Decoder::new(values); + + let rows = get_selected_rows(page); + let values = SliceFilteredIter::new(values, rows); + + Ok(State::FilteredDeltaBinaryPackedRequired(values)) + } + (Encoding::DeltaBinaryPacked, _, true, true) => { + let (_, _, values) = split_buffer(page)?; + let values = Decoder::new(values); + + Ok(State::FilteredDeltaBinaryPackedOptional( + FilteredOptionalPageValidity::try_new(page)?, + values, + )) + } + _ => self.0.build_state(page, dict).map(State::Common), + } + } + + fn with_capacity(&self, capacity: usize) -> Self::DecodedState { + self.0.with_capacity(capacity) + } + + fn extend_from_state( + &self, + state: &mut Self::State, + decoded: &mut Self::DecodedState, + remaining: usize, + ) { + let (values, validity) = decoded; + match state { + State::Common(state) => self.0.extend_from_state(state, decoded, remaining), + State::DeltaBinaryPackedRequired(state) => { + values.extend( + state + .by_ref() + .map(|x| x.as_()) + .map(self.0.op) + .take(remaining), + ); + } + State::DeltaBinaryPackedOptional(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + page_values.by_ref().map(|x| x.as_()).map(self.0.op), + ) + } + State::FilteredDeltaBinaryPackedRequired(page) => { + values.extend( + page.by_ref() + .map(|x| x.as_()) + .map(self.0.op) + .take(remaining), + ); + } + State::FilteredDeltaBinaryPackedOptional(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + page_values.by_ref().map(|x| x.as_()).map(self.0.op), + ); + } + } + } + + fn deserialize_dict(&self, page: &DictPage) -> Self::Dict { + self.0.deserialize_dict(page) + } +} + +/// An [`Iterator`] adapter over [`Pages`] assumed to be encoded as primitive arrays +/// encoded as parquet integer types +#[derive(Debug)] +pub struct IntegerIter +where + I: Pages, + T: NativeType, + P: ParquetNativeType, + F: Fn(P) -> T, +{ + iter: I, + data_type: DataType, + items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, + chunk_size: Option, + dict: Option>, + op: F, + phantom: std::marker::PhantomData

, +} + +impl IntegerIter +where + I: Pages, + T: NativeType, + + P: ParquetNativeType, + F: Copy + Fn(P) -> T, +{ + pub fn new( + iter: I, + data_type: DataType, + num_rows: usize, + chunk_size: Option, + op: F, + ) -> Self { + Self { + iter, + data_type, + items: VecDeque::new(), + dict: None, + remaining: num_rows, + chunk_size, + op, + phantom: Default::default(), + } + } +} + +impl Iterator for IntegerIter +where + I: Pages, + T: NativeType, + P: ParquetNativeType, + i64: num_traits::AsPrimitive

, + F: Copy + Fn(P) -> T, +{ + type Item = Result>; + + fn next(&mut self) -> Option { + let maybe_state = utils::next( + &mut self.iter, + &mut self.items, + &mut self.dict, + &mut self.remaining, + self.chunk_size, + &IntDecoder::new(self.op), + ); + match maybe_state { + utils::MaybeNext::Some(Ok((values, validity))) => { + Some(Ok(finish(&self.data_type, values, validity))) + } + utils::MaybeNext::Some(Err(e)) => Some(Err(e)), + utils::MaybeNext::None => None, + utils::MaybeNext::More => self.next(), + } + } +} diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index 70d743dcc27..27d9c27c318 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -1,7 +1,9 @@ mod basic; mod dictionary; +mod integer; mod nested; pub use basic::Iter; pub use dictionary::{DictIter, NestedDictIter}; +pub use integer::IntegerIter; pub use nested::NestedIter; diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index 96a27228e96..28dfaa4812e 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -71,14 +71,14 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( Ok(match data_type.to_logical_type() { Null => null::iter_to_arrays(pages, data_type, chunk_size, num_rows), Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size, num_rows)), - UInt8 => dyn_iter(iden(primitive::Iter::new( + UInt8 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as u8, ))), - UInt16 => dyn_iter(iden(primitive::Iter::new( + UInt16 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -86,7 +86,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( |x: i32| x as u16, ))), UInt32 => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( + PhysicalType::Int32 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -94,7 +94,7 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( |x: i32| x as u32, ))), // some implementations of parquet write arrow's u32 into i64. - PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( + PhysicalType::Int64 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -108,21 +108,21 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( ))) } }, - Int8 => dyn_iter(iden(primitive::Iter::new( + Int8 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as i8, ))), - Int16 => dyn_iter(iden(primitive::Iter::new( + Int16 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as i16, ))), - Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new( + Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -200,14 +200,14 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( } Decimal(_, _) => match physical_type { - PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( + PhysicalType::Int32 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i32| x as i128, ))), - PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( + PhysicalType::Int64 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -250,14 +250,14 @@ pub fn page_iter_to_arrays<'a, I: Pages + 'a>( }, // INT64 - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::Iter::new( + Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, chunk_size, |x: i64| x as i64, ))), - UInt64 => dyn_iter(iden(primitive::Iter::new( + UInt64 => dyn_iter(iden(primitive::IntegerIter::new( pages, data_type, num_rows, @@ -368,7 +368,7 @@ fn timestamp<'a, I: Pages + 'a>( )); } - let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, |x: i64| x); + let iter = primitive::IntegerIter::new(pages, data_type, num_rows, chunk_size, |x: i64| x); let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); match (factor, is_multiplier) { (1, _) => Ok(dyn_iter(iden(iter))), diff --git a/src/io/parquet/write/dictionary.rs b/src/io/parquet/write/dictionary.rs index f04acc2fc53..37d7f614aca 100644 --- a/src/io/parquet/write/dictionary.rs +++ b/src/io/parquet/write/dictionary.rs @@ -149,8 +149,7 @@ macro_rules! dyn_prim { ($from:ty, $to:ty, $array:expr, $options:expr, $type_:expr) => {{ let values = $array.values().as_any().downcast_ref().unwrap(); - let mut buffer = vec![]; - primitive_encode_plain::<$from, $to>(values, false, &mut buffer); + let buffer = primitive_encode_plain::<$from, $to>(values, false, vec![]); let stats = primitive_build_statistics::<$from, $to>(values, $type_.clone()); let stats = serialize_statistics(&stats); (DictPage::new(buffer, values.len(), false), stats) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index 3b8f48e9938..0abc6330a85 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -79,8 +79,14 @@ pub fn to_parquet_schema(schema: &Schema) -> Result { /// Note that this is whether this implementation supports it, which is a subset of /// what the parquet spec allows. pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { + if let (Encoding::DeltaBinaryPacked, DataType::Decimal(p, _)) = + (encoding, data_type.to_logical_type()) + { + return *p <= 18; + }; + matches!( - (encoding, data_type), + (encoding, data_type.to_logical_type()), (Encoding::Plain, _) | ( Encoding::DeltaLengthByteArray, @@ -88,6 +94,24 @@ pub fn can_encode(data_type: &DataType, encoding: Encoding) -> bool { ) | (Encoding::RleDictionary, DataType::Dictionary(_, _, _)) | (Encoding::PlainDictionary, DataType::Dictionary(_, _, _)) + | ( + Encoding::DeltaBinaryPacked, + DataType::Null + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 + | DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Date32 + | DataType::Time32(_) + | DataType::Int64 + | DataType::Date64 + | DataType::Time64(_) + | DataType::Timestamp(_, _) + | DataType::Duration(_) + ) ) } @@ -167,58 +191,66 @@ pub fn array_to_page_simple( boolean::array_to_page(array.as_any().downcast_ref().unwrap(), options, type_) } // casts below MUST match the casts done at the metadata (field -> parquet type). - DataType::UInt8 => primitive::array_to_page::( + DataType::UInt8 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::UInt16 => primitive::array_to_page::( + DataType::UInt16 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::UInt32 => primitive::array_to_page::( + DataType::UInt32 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::UInt64 => primitive::array_to_page::( + DataType::UInt64 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::Int8 => primitive::array_to_page::( + DataType::Int8 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::Int16 => primitive::array_to_page::( + DataType::Int16 => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), DataType::Int32 | DataType::Date32 | DataType::Time32(_) => { - primitive::array_to_page::( + primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ) } DataType::Int64 | DataType::Date64 | DataType::Time64(_) | DataType::Timestamp(_, _) - | DataType::Duration(_) => primitive::array_to_page::( + | DataType::Duration(_) => primitive::array_to_page_integer::( array.as_any().downcast_ref().unwrap(), options, type_, + encoding, ), - DataType::Float32 => primitive::array_to_page::( + DataType::Float32 => primitive::array_to_page_plain::( array.as_any().downcast_ref().unwrap(), options, type_, ), - DataType::Float64 => primitive::array_to_page::( + DataType::Float64 => primitive::array_to_page_plain::( array.as_any().downcast_ref().unwrap(), options, type_, @@ -249,7 +281,7 @@ pub fn array_to_page_simple( ), DataType::Null => { let array = Int32Array::new_null(DataType::Int32, array.len()); - primitive::array_to_page::(&array, options, type_) + primitive::array_to_page_plain::(&array, options, type_) } DataType::Interval(IntervalUnit::YearMonth) => { let type_ = type_; @@ -327,7 +359,7 @@ pub fn array_to_page_simple( let array = PrimitiveArray::::new(DataType::Int32, values, array.validity().cloned()); - primitive::array_to_page::(&array, options, type_) + primitive::array_to_page_integer::(&array, options, type_, encoding) } else if precision <= 18 { let values = array .values() @@ -338,7 +370,7 @@ pub fn array_to_page_simple( let array = PrimitiveArray::::new(DataType::Int64, values, array.validity().cloned()); - primitive::array_to_page::(&array, options, type_) + primitive::array_to_page_integer::(&array, options, type_, encoding) } else { let size = decimal_length_from_precision(precision); diff --git a/src/io/parquet/write/primitive/basic.rs b/src/io/parquet/write/primitive/basic.rs index 36f1f20e1c5..3da58c54fde 100644 --- a/src/io/parquet/write/primitive/basic.rs +++ b/src/io/parquet/write/primitive/basic.rs @@ -1,52 +1,127 @@ use parquet2::{ + encoding::delta_bitpacked::encode, encoding::Encoding, page::DataPage, schema::types::PrimitiveType, statistics::{serialize_statistics, PrimitiveStatistics}, - types::NativeType, + types::NativeType as ParquetNativeType, }; use super::super::utils; use super::super::WriteOptions; use crate::{ array::{Array, PrimitiveArray}, - error::Result, - io::parquet::read::schema::is_nullable, - types::NativeType as ArrowNativeType, + error::Error, + io::parquet::{read::schema::is_nullable, write::utils::ExactSizedIter}, + types::NativeType, }; -pub(crate) fn encode_plain(array: &PrimitiveArray, is_optional: bool, buffer: &mut Vec) +pub(crate) fn encode_plain( + array: &PrimitiveArray, + is_optional: bool, + mut buffer: Vec, +) -> Vec where - T: ArrowNativeType, - R: NativeType, - T: num_traits::AsPrimitive, + T: NativeType, + P: ParquetNativeType, + T: num_traits::AsPrimitive

, { if is_optional { + buffer.reserve(std::mem::size_of::

() * (array.len() - array.null_count())); // append the non-null values array.iter().for_each(|x| { if let Some(x) = x { - let parquet_native: R = x.as_(); + let parquet_native: P = x.as_(); buffer.extend_from_slice(parquet_native.to_le_bytes().as_ref()) } }); } else { + buffer.reserve(std::mem::size_of::

() * array.len()); // append all values array.values().iter().for_each(|x| { - let parquet_native: R = x.as_(); + let parquet_native: P = x.as_(); buffer.extend_from_slice(parquet_native.to_le_bytes().as_ref()) }); } + buffer +} + +pub(crate) fn encode_delta( + array: &PrimitiveArray, + is_optional: bool, + mut buffer: Vec, +) -> Vec +where + T: NativeType, + P: ParquetNativeType, + T: num_traits::AsPrimitive

, + P: num_traits::AsPrimitive, +{ + if is_optional { + // append the non-null values + let iterator = array.iter().flatten().map(|x| { + let parquet_native: P = x.as_(); + let integer: i64 = parquet_native.as_(); + integer + }); + let iterator = ExactSizedIter::new(iterator, array.len() - array.null_count()); + encode(iterator, &mut buffer) + } else { + // append all values + let iterator = array.values().iter().map(|x| { + let parquet_native: P = x.as_(); + let integer: i64 = parquet_native.as_(); + integer + }); + encode(iterator, &mut buffer) + } + buffer +} + +pub fn array_to_page_plain( + array: &PrimitiveArray, + options: WriteOptions, + type_: PrimitiveType, +) -> Result +where + T: NativeType, + P: ParquetNativeType, + T: num_traits::AsPrimitive

, +{ + array_to_page(array, options, type_, Encoding::Plain, encode_plain) +} + +pub fn array_to_page_integer( + array: &PrimitiveArray, + options: WriteOptions, + type_: PrimitiveType, + encoding: Encoding, +) -> Result +where + T: NativeType, + P: ParquetNativeType, + T: num_traits::AsPrimitive

, + P: num_traits::AsPrimitive, +{ + match encoding { + Encoding::DeltaBinaryPacked => array_to_page(array, options, type_, encoding, encode_delta), + Encoding::Plain => array_to_page(array, options, type_, encoding, encode_plain), + other => Err(Error::nyi(format!("Encoding integer as {other:?}"))), + } } -pub fn array_to_page( +pub fn array_to_page, bool, Vec) -> Vec>( array: &PrimitiveArray, options: WriteOptions, type_: PrimitiveType, -) -> Result + encoding: Encoding, + encode: F, +) -> Result where - T: ArrowNativeType, - R: NativeType, - T: num_traits::AsPrimitive, + T: NativeType, + P: ParquetNativeType, + // constraint required to build statistics + T: num_traits::AsPrimitive

, { let is_optional = is_nullable(&type_.field_info); @@ -63,7 +138,7 @@ where let definition_levels_byte_length = buffer.len(); - encode_plain(array, is_optional, &mut buffer); + let buffer = encode(array, is_optional, buffer); let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( @@ -84,20 +159,20 @@ where statistics, type_, options, - Encoding::Plain, + encoding, ) } -pub fn build_statistics( +pub fn build_statistics( array: &PrimitiveArray, primitive_type: PrimitiveType, -) -> PrimitiveStatistics +) -> PrimitiveStatistics

where - T: ArrowNativeType, - R: NativeType, - T: num_traits::AsPrimitive, + T: NativeType, + P: ParquetNativeType, + T: num_traits::AsPrimitive

, { - PrimitiveStatistics:: { + PrimitiveStatistics::

{ primitive_type, null_count: Some(array.null_count() as i64), distinct_count: None, @@ -105,7 +180,7 @@ where .iter() .flatten() .map(|x| { - let x: R = x.as_(); + let x: P = x.as_(); x }) .max_by(|x, y| x.ord(y)), @@ -113,7 +188,7 @@ where .iter() .flatten() .map(|x| { - let x: R = x.as_(); + let x: P = x.as_(); x }) .min_by(|x, y| x.ord(y)), diff --git a/src/io/parquet/write/primitive/mod.rs b/src/io/parquet/write/primitive/mod.rs index eec1d695d1d..7bbdc219438 100644 --- a/src/io/parquet/write/primitive/mod.rs +++ b/src/io/parquet/write/primitive/mod.rs @@ -1,7 +1,8 @@ mod basic; mod nested; -pub use basic::array_to_page; +pub use basic::array_to_page_integer; +pub use basic::array_to_page_plain; pub(crate) use basic::build_statistics; pub(crate) use basic::encode_plain; pub use nested::array_to_page as nested_array_to_page; diff --git a/src/io/parquet/write/primitive/nested.rs b/src/io/parquet/write/primitive/nested.rs index ffbfde6554c..e2909b02ceb 100644 --- a/src/io/parquet/write/primitive/nested.rs +++ b/src/io/parquet/write/primitive/nested.rs @@ -31,7 +31,7 @@ where let (repetition_levels_byte_length, definition_levels_byte_length) = nested::write_rep_and_def(options.version, nested, &mut buffer)?; - encode_plain(array, is_optional, &mut buffer); + let buffer = encode_plain(array, is_optional, buffer); let statistics = if options.write_statistics { Some(serialize_statistics(&build_statistics( diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index bff9346006c..bfa5f2a3c3b 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -197,6 +197,30 @@ fn indexed_optional_i32() -> Result<()> { read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected) } +#[test] +fn indexed_optional_i32_delta() -> Result<()> { + let array21 = Int32Array::from([Some(1), Some(2), None]); + let array22 = Int32Array::from([None, Some(5), Some(6)]); + let expected = Int32Array::from_slice([5]).boxed(); + + read_with_indexes( + pages(&[&array21, &array22], Encoding::DeltaBinaryPacked)?, + expected, + ) +} + +#[test] +fn indexed_required_i32_delta() -> Result<()> { + let array21 = Int32Array::from_slice([1, 2, 3]); + let array22 = Int32Array::from_slice([4, 5, 6]); + let expected = Int32Array::from_slice([5]).boxed(); + + read_with_indexes( + pages(&[&array21, &array22], Encoding::DeltaBinaryPacked)?, + expected, + ) +} + #[test] fn indexed_optional_utf8() -> Result<()> { let array21 = Utf8Array::::from([Some("a"), Some("b"), None]); diff --git a/tests/it/io/parquet/write.rs b/tests/it/io/parquet/write.rs index f95b53331c5..dfeed7ccd00 100644 --- a/tests/it/io/parquet/write.rs +++ b/tests/it/io/parquet/write.rs @@ -92,6 +92,28 @@ fn int64_optional_v2() -> Result<()> { ) } +#[test] +fn int64_optional_delta() -> Result<()> { + round_trip( + "int64", + "nullable", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::DeltaBinaryPacked], + ) +} + +#[test] +fn int64_required_delta() -> Result<()> { + round_trip( + "int64", + "required", + Version::V2, + CompressionOptions::Uncompressed, + vec![Encoding::DeltaBinaryPacked], + ) +} + #[cfg(feature = "io_parquet_compression")] #[test] fn int64_optional_v2_compressed() -> Result<()> {