diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 17d4035e5e4..c2e723865de 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -4,7 +4,6 @@ use std::default::Default; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - indexes::Interval, page::{BinaryPageDict, DataPage}, schema::Repetition, }; @@ -18,7 +17,8 @@ use crate::{ }; use super::super::utils::{ - extend_from_decoder, next, DecodedState, MaybeNext, OptionalPageValidity, + extend_from_decoder, get_selected_rows, next, DecodedState, FilteredOptionalPageValidity, + MaybeNext, OptionalPageValidity, }; use super::super::DataPages; use super::{super::utils, utils::*}; @@ -87,12 +87,7 @@ impl<'a> FilteredRequired<'a> { pub fn new(page: &'a DataPage) -> Self { let values = SizedBinaryIter::new(page.buffer(), page.num_values()); - let rows = page - .selected_rows() - .unwrap_or(&[Interval::new(0, page.num_values())]) - .iter() - .copied() - .collect(); + let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); Self { values } @@ -147,6 +142,7 @@ enum State<'a> { RequiredDictionary(RequiredDictionary<'a>), OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>), FilteredRequired(FilteredRequired<'a>), + FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>), } impl<'a> utils::PageState<'a> for State<'a> { @@ -157,6 +153,7 @@ impl<'a> utils::PageState<'a> for State<'a> { State::RequiredDictionary(values) => values.len(), State::OptionalDictionary(optional, _) => optional.len(), State::FilteredRequired(state) => state.len(), + State::FilteredOptional(validity, _) => validity.len(), } } } @@ -245,6 +242,14 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { (Encoding::Plain, _, false, true) => { Ok(State::FilteredRequired(FilteredRequired::new(page))) } + (Encoding::Plain, _, true, true) => { + let (_, _, values) = utils::split_buffer(page); + + Ok(State::FilteredOptional( + FilteredOptionalPageValidity::new(page), + BinaryIter::new(values), + )) + } _ => Err(utils::not_implemented(page)), } } @@ -313,6 +318,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { values.push(x) } } + State::FilteredOptional(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(additional), + values, + page_values.by_ref(), + ); + } } } } diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index 5bd14610bd6..c4645e595aa 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -3,7 +3,6 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - indexes::Interval, page::{DataPage, FixedLenByteArrayPageDict}, schema::Repetition, }; @@ -13,8 +12,9 @@ use crate::{ }; use super::super::utils::{ - dict_indices_decoder, extend_from_decoder, next, not_implemented, split_buffer, DecodedState, - Decoder, MaybeNext, OptionalPageValidity, PageState, Pushable, + dict_indices_decoder, extend_from_decoder, get_selected_rows, next, not_implemented, + split_buffer, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext, + OptionalPageValidity, PageState, Pushable, }; use super::super::DataPages; use super::utils::FixedSizeBinary; @@ -65,12 +65,7 @@ impl<'a> FilteredRequired<'a> { assert_eq!(values.len() % size, 0); let values = values.chunks_exact(size); - let rows = page - .selected_rows() - .unwrap_or(&[Interval::new(0, page.num_values())]) - .iter() - .copied() - .collect(); + let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); Self { values } @@ -124,6 +119,10 @@ enum State<'a> { RequiredDictionary(RequiredDictionary<'a>), OptionalDictionary(OptionalDictionary<'a>), FilteredRequired(FilteredRequired<'a>), + FilteredOptional( + FilteredOptionalPageValidity<'a>, + std::slice::ChunksExact<'a, u8>, + ), } impl<'a> PageState<'a> for State<'a> { @@ -134,6 +133,7 @@ impl<'a> PageState<'a> for State<'a> { State::RequiredDictionary(state) => state.len(), State::OptionalDictionary(state) => state.validity.len(), State::FilteredRequired(state) => state.len(), + State::FilteredOptional(state, _) => state.len(), } } } @@ -184,6 +184,14 @@ impl<'a> Decoder<'a> for BinaryDecoder { (Encoding::Plain, None, false, true) => Ok(State::FilteredRequired( FilteredRequired::new(page, self.size), )), + (Encoding::Plain, _, true, true) => { + let (_, _, values) = split_buffer(page); + + Ok(State::FilteredOptional( + FilteredOptionalPageValidity::new(page), + values.chunks_exact(self.size), + )) + } _ => Err(not_implemented(page)), } } @@ -249,6 +257,15 @@ impl<'a> Decoder<'a> for BinaryDecoder { values.push(x) } } + State::FilteredOptional(page_validity, page_values) => { + extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + page_values.by_ref(), + ); + } } } } diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 741fa6c0830..abb766a6968 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -3,7 +3,6 @@ use std::collections::VecDeque; use parquet2::{ deserialize::SliceFilteredIter, encoding::{hybrid_rle, Encoding}, - indexes::Interval, page::{DataPage, PrimitivePageDict}, schema::Repetition, types::decode, @@ -16,7 +15,7 @@ use crate::{ }; use super::super::utils; -use super::super::utils::OptionalPageValidity; +use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity}; use super::super::DataPages; #[derive(Debug)] @@ -31,12 +30,7 @@ impl<'a> FilteredRequiredValues<'a> { let values = values.chunks_exact(std::mem::size_of::

()); - let rows = page - .selected_rows() - .unwrap_or(&[Interval::new(0, page.num_values())]) - .iter() - .copied() - .collect(); + let rows = get_selected_rows(page); let values = SliceFilteredIter::new(values, rows); Self { values } @@ -107,6 +101,7 @@ where RequiredDictionary(ValuesDictionary<'a, P>), OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>), FilteredRequired(FilteredRequiredValues<'a>), + FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>), } impl<'a, P> utils::PageState<'a> for State<'a, P> @@ -120,6 +115,7 @@ where State::RequiredDictionary(values) => values.len(), State::OptionalDictionary(optional, _) => optional.len(), State::FilteredRequired(values) => values.len(), + State::FilteredOptional(optional, _) => optional.len(), } } } @@ -200,6 +196,10 @@ where (Encoding::Plain, _, false, true) => Ok(State::FilteredRequired( FilteredRequiredValues::new::

(page), )), + (Encoding::Plain, _, true, true) => Ok(State::FilteredOptional( + FilteredOptionalPageValidity::new(page), + Values::new::

(page), + )), _ => Err(utils::not_implemented(page)), } } @@ -258,6 +258,15 @@ where .take(remaining), ); } + State::FilteredOptional(page_validity, page_values) => { + utils::extend_from_decoder( + validity, + page_validity, + Some(remaining), + values, + page_values.values.by_ref().map(decode).map(self.op), + ); + } } } } diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 789ffec5eb6..95c322e4dff 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -1,7 +1,10 @@ use std::collections::VecDeque; -use parquet2::deserialize::{FilteredHybridEncoded, HybridDecoderBitmapIter, HybridEncoded}; +use parquet2::deserialize::{ + FilteredHybridEncoded, FilteredHybridRleDecoderIter, HybridDecoderBitmapIter, HybridEncoded, +}; use parquet2::encoding::hybrid_rle; +use parquet2::indexes::Interval; use parquet2::page::{split_buffer as _split_buffer, DataPage}; use parquet2::schema::Repetition; @@ -86,6 +89,99 @@ impl Pushable for Vec { } } +/// The state of a partially deserialized page +pub(super) trait PageValidity<'a> { + fn next_limited(&mut self, limit: usize) -> Option>; +} + +#[derive(Debug, Clone)] +pub struct FilteredOptionalPageValidity<'a> { + iter: FilteredHybridRleDecoderIter<'a>, + current: Option<(FilteredHybridEncoded<'a>, usize)>, +} + +impl<'a> FilteredOptionalPageValidity<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (_, validity, _) = split_buffer(page); + + let iter = hybrid_rle::Decoder::new(validity, 1); + let iter = HybridDecoderBitmapIter::new(iter, page.num_values()); + let selected_rows = get_selected_rows(page); + let iter = FilteredHybridRleDecoderIter::new(iter, selected_rows); + + Self { + iter, + current: None, + } + } + + pub fn len(&self) -> usize { + self.iter.len() + } +} + +pub fn get_selected_rows(page: &DataPage) -> VecDeque { + page.selected_rows() + .unwrap_or(&[Interval::new(0, page.num_values())]) + .iter() + .copied() + .collect() +} + +impl<'a> PageValidity<'a> for FilteredOptionalPageValidity<'a> { + fn next_limited(&mut self, limit: usize) -> Option> { + let (run, own_offset) = if let Some((run, offset)) = self.current { + (run, offset) + } else { + // a new run + let run = self.iter.next()?; // no run -> None + self.current = Some((run, 0)); + return self.next_limited(limit); + }; + + match run { + FilteredHybridEncoded::Bitmap { + values, + offset, + length, + } => { + let run_length = length - own_offset; + + let length = limit.min(run_length); + + if length == run_length { + self.current = None; + } else { + self.current = Some((run, own_offset + length)); + } + + Some(FilteredHybridEncoded::Bitmap { + values, + offset, + length, + }) + } + FilteredHybridEncoded::Repeated { is_set, length } => { + let run_length = length - own_offset; + + let length = limit.min(run_length); + + if length == run_length { + self.current = None; + } else { + self.current = Some((run, own_offset + length)); + } + + Some(FilteredHybridEncoded::Repeated { is_set, length }) + } + FilteredHybridEncoded::Skipped(set) => { + self.current = None; + Some(FilteredHybridEncoded::Skipped(set)) + } + } + } +} + pub struct Zip { validity: V, values: I, @@ -185,10 +281,16 @@ impl<'a> OptionalPageValidity<'a> { } } +impl<'a> PageValidity<'a> for OptionalPageValidity<'a> { + fn next_limited(&mut self, limit: usize) -> Option> { + self.next_limited(limit) + } +} + /// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder -pub(super) fn extend_from_decoder, I: Iterator>( +pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator>( validity: &mut MutableBitmap, - page_validity: &mut OptionalPageValidity, + page_validity: &mut dyn PageValidity<'a>, limit: Option, pushable: &mut P, mut values_iter: I, @@ -207,10 +309,9 @@ pub(super) fn extend_from_decoder, I: Iterator { - // consume `additional` items + // consume `length` items let iter = BitmapIter::new(values, offset, length); let iter = Zip::new(iter, &mut values_iter); - let iter = iter.skip(offset); for item in iter { if let Some(item) = item {