diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 3d9104c0fc1..a0427d08474 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -48,27 +48,33 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - - match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + let is_filtered = page.selected_rows().is_some(); + + match ( + page.encoding(), + page.dictionary_page(), + is_optional, + is_filtered, + ) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) } - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( Optional::new(page), ValuesDictionary::new(page, dict), )) } - (Encoding::Plain, None, true) => { + (Encoding::Plain, None, true, false) => { let (_, _, values) = utils::split_buffer(page); let values = BinaryIter::new(values); Ok(State::Optional(Optional::new(page), values)) } - (Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, None, false, false) => Ok(State::Required(Required::new(page))), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index 2c524a6b435..6f6c31b0f2c 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -1,6 +1,8 @@ use std::collections::VecDeque; -use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition}; +use parquet2::{ + deserialize::SliceFilteredIter, encoding::Encoding, page::DataPage, schema::Repetition, +}; use crate::{ array::BooleanArray, @@ -11,25 +13,19 @@ use crate::{ use super::super::utils; use super::super::utils::{ - extend_from_decoder, next, split_buffer, DecodedState, Decoder, MaybeNext, OptionalPageValidity, + extend_from_decoder, get_selected_rows, next, split_buffer, DecodedState, Decoder, + FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }; use super::super::DataPages; -// The state of an optional DataPage with a boolean physical type #[derive(Debug)] -struct Optional<'a> { - values: BitmapIter<'a>, - validity: OptionalPageValidity<'a>, -} +struct Values<'a>(BitmapIter<'a>); -impl<'a> Optional<'a> { +impl<'a> Values<'a> { pub fn new(page: &'a DataPage) -> Self { - let (_, _, values_buffer) = split_buffer(page); + let (_, _, values) = split_buffer(page); - Self { - values: BitmapIter::new(values_buffer, 0, values_buffer.len() * 8), - validity: OptionalPageValidity::new(page), - } + Self(BitmapIter::new(values, 0, values.len() * 8)) } } @@ -52,18 +48,44 @@ impl<'a> Required<'a> { } } +#[derive(Debug)] +struct FilteredRequired<'a> { + values: SliceFilteredIter>, +} + +impl<'a> FilteredRequired<'a> { + pub fn new(page: &'a DataPage) -> Self { + // todo: replace this by an iterator over slices, for faster deserialization + let values = BitmapIter::new(page.buffer(), 0, page.num_values()); + + let rows = get_selected_rows(page); + let values = SliceFilteredIter::new(values, rows); + + Self { values } + } + + #[inline] + pub fn len(&self) -> usize { + self.values.size_hint().0 + } +} + // The state of a `DataPage` of `Boolean` parquet boolean type #[derive(Debug)] enum State<'a> { - Optional(Optional<'a>), + Optional(OptionalPageValidity<'a>, Values<'a>), Required(Required<'a>), + FilteredRequired(FilteredRequired<'a>), + FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>), } impl<'a> State<'a> { pub fn len(&self) -> usize { match self { - State::Optional(page) => page.validity.len(), + State::Optional(validity, _) => validity.len(), State::Required(page) => page.length - page.offset, + State::FilteredRequired(page) => page.len(), + State::FilteredOptional(optional, _) => optional.len(), } } } @@ -90,10 +112,21 @@ impl<'a> Decoder<'a> for BooleanDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - - match (page.encoding(), is_optional) { - (Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))), - (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), + let is_filtered = page.selected_rows().is_some(); + + match (page.encoding(), is_optional, is_filtered) { + (Encoding::Plain, true, false) => Ok(State::Optional( + OptionalPageValidity::new(page), + Values::new(page), + )), + (Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, true, true) => Ok(State::FilteredOptional( + FilteredOptionalPageValidity::new(page), + Values::new(page), + )), + (Encoding::Plain, false, true) => { + Ok(State::FilteredRequired(FilteredRequired::new(page))) + } _ => Err(utils::not_implemented(page)), } } @@ -113,18 +146,33 @@ impl<'a> Decoder<'a> for BooleanDecoder { ) { let (values, validity) = decoded; match state { - State::Optional(page) => extend_from_decoder( + State::Optional(page_validity, page_values) => extend_from_decoder( validity, - &mut page.validity, + page_validity, Some(remaining), values, - &mut page.values, + &mut page_values.0, ), State::Required(page) => { let remaining = remaining.min(page.length - page.offset); values.extend_from_slice(page.values, page.offset, remaining); page.offset += remaining; } + State::FilteredRequired(page) => { + values.reserve(remaining); + for item in page.values.by_ref().take(remaining) { + values.push(item) + } + } + State::FilteredOptional(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + page_values.0.by_ref(), + ); + } } } } diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index d63d3059fde..5f30c698a80 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -66,15 +66,16 @@ impl<'a> Decoder<'a> for BooleanDecoder { fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); - match (page.encoding(), is_optional) { - (Encoding::Plain, true) => { + match (page.encoding(), is_optional, is_filtered) { + (Encoding::Plain, true, false) => { let (_, _, values) = utils::split_buffer(page); let values = BitmapIter::new(values, 0, values.len() * 8); Ok(State::Optional(Optional::new(page), values)) } - (Encoding::Plain, false) => Ok(State::Required(Required::new(page))), + (Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs index 12f95c9222d..7a2103c96ee 100644 --- a/src/io/parquet/read/deserialize/dictionary.rs +++ b/src/io/parquet/read/deserialize/dictionary.rs @@ -1,6 +1,7 @@ use std::{collections::VecDeque, sync::Arc}; use parquet2::{ + deserialize::SliceFilteredIter, encoding::{hybrid_rle::HybridRleDecoder, Encoding}, page::{DataPage, DictPage}, schema::Repetition, @@ -14,8 +15,8 @@ use crate::{ use super::{ utils::{ - self, dict_indices_decoder, extend_from_decoder, DecodedState, Decoder, MaybeNext, - OptionalPageValidity, + self, dict_indices_decoder, extend_from_decoder, get_selected_rows, DecodedState, Decoder, + FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity, }, DataPages, }; @@ -25,6 +26,8 @@ use super::{ pub enum State<'a> { Optional(Optional<'a>), Required(Required<'a>), + FilteredRequired(FilteredRequired<'a>), + FilteredOptional(FilteredOptionalPageValidity<'a>, HybridRleDecoder<'a>), } #[derive(Debug)] @@ -39,6 +42,22 @@ impl<'a> Required<'a> { } } +#[derive(Debug)] +pub struct FilteredRequired<'a> { + values: SliceFilteredIter>, +} + +impl<'a> FilteredRequired<'a> { + fn new(page: &'a DataPage) -> Self { + let values = dict_indices_decoder(page); + + let rows = get_selected_rows(page); + let values = SliceFilteredIter::new(values, rows); + + Self { values } + } +} + #[derive(Debug)] pub struct Optional<'a> { values: HybridRleDecoder<'a>, @@ -61,6 +80,8 @@ impl<'a> utils::PageState<'a> for State<'a> { match self { State::Optional(optional) => optional.validity.len(), State::Required(required) => required.values.size_hint().0, + State::FilteredRequired(required) => required.values.size_hint().0, + State::FilteredOptional(validity, _) => validity.len(), } } } @@ -95,14 +116,24 @@ where fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); - match (page.encoding(), is_optional) { - (Encoding::PlainDictionary | Encoding::RleDictionary, false) => { + match (page.encoding(), is_optional, is_filtered) { + (Encoding::PlainDictionary | Encoding::RleDictionary, false, false) => { Ok(State::Required(Required::new(page))) } - (Encoding::PlainDictionary | Encoding::RleDictionary, true) => { + (Encoding::PlainDictionary | Encoding::RleDictionary, true, false) => { Ok(State::Optional(Optional::new(page))) } + (Encoding::PlainDictionary | Encoding::RleDictionary, false, true) => { + Ok(State::FilteredRequired(FilteredRequired::new(page))) + } + (Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => { + Ok(State::FilteredOptional( + FilteredOptionalPageValidity::new(page), + dict_indices_decoder(page), + )) + } _ => Err(utils::not_implemented(page)), } } @@ -137,6 +168,21 @@ where .take(remaining), ); } + State::FilteredOptional(page_validity, page_values) => extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + &mut page_values.by_ref().map(|x| K::from_u32(x).unwrap()), + ), + State::FilteredRequired(page) => { + values.extend( + page.values + .by_ref() + .map(|x| K::from_u32(x).unwrap()) + .take(remaining), + ); + } } } } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index ac9514cf0df..0aff18e2578 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -82,23 +82,29 @@ where fn build_state(&self, page: &'a DataPage) -> Result { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; - - match (page.encoding(), page.dictionary_page(), is_optional) { - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + let is_filtered = page.selected_rows().is_some(); + + match ( + page.encoding(), + page.dictionary_page(), + is_optional, + is_filtered, + ) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) } - (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => { let dict = dict.as_any().downcast_ref().unwrap(); Ok(State::OptionalDictionary( Optional::new(page), ValuesDictionary::new(page, dict), )) } - (Encoding::Plain, _, true) => { + (Encoding::Plain, _, true, false) => { Ok(State::Optional(Optional::new(page), Values::new::

(page))) } - (Encoding::Plain, _, false) => Ok(State::Required(Values::new::

(page))), + (Encoding::Plain, _, false, false) => Ok(State::Required(Values::new::

(page))), _ => Err(utils::not_implemented(page)), } } diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 95c322e4dff..7bda733684f 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -16,18 +16,21 @@ use super::super::DataPages; pub fn not_implemented(page: &DataPage) -> ArrowError { let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional; + let is_filtered = page.selected_rows().is_some(); let required = if is_optional { "optional" } else { "required" }; + let is_filtered = if is_filtered { ", index-filtered" } else { "" }; let dict = if page.dictionary_page().is_some() { ", dictionary-encoded" } else { "" }; ArrowError::NotYetImplemented(format!( - "Decoding {:?} \"{:?}\"-encoded{} {} parquet pages", + "Decoding {:?} \"{:?}\"-encoded{} {} {} parquet pages", page.descriptor.primitive_type.physical_type, page.encoding(), dict, required, + is_filtered, )) }