Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for others
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 11, 2022
1 parent e95116b commit 3377d83
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 30 deletions.
30 changes: 22 additions & 8 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::default::Default;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
indexes::Interval,
page::{BinaryPageDict, DataPage},
schema::Repetition,
};
Expand All @@ -18,7 +17,8 @@ use crate::{
};

use super::super::utils::{
extend_from_decoder, next, DecodedState, MaybeNext, OptionalPageValidity,
extend_from_decoder, get_selected_rows, next, DecodedState, FilteredOptionalPageValidity,
MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;
use super::{super::utils, utils::*};
Expand Down Expand Up @@ -87,12 +87,7 @@ impl<'a> FilteredRequired<'a> {
pub fn new(page: &'a DataPage) -> Self {
let values = SizedBinaryIter::new(page.buffer(), page.num_values());

let rows = page
.selected_rows()
.unwrap_or(&[Interval::new(0, page.num_values())])
.iter()
.copied()
.collect();
let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
Expand Down Expand Up @@ -147,6 +142,7 @@ enum State<'a> {
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
Expand All @@ -157,6 +153,7 @@ impl<'a> utils::PageState<'a> for State<'a> {
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(validity, _) => validity.len(),
}
}
}
Expand Down Expand Up @@ -245,6 +242,14 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
(Encoding::Plain, _, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
}
(Encoding::Plain, _, true, true) => {
let (_, _, values) = utils::split_buffer(page);

Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
BinaryIter::new(values),
))
}
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -313,6 +318,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::FilteredOptional(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
page_values.by_ref(),
);
}
}
}
}
Expand Down
35 changes: 26 additions & 9 deletions src/io/parquet/read/deserialize/fixed_size_binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::VecDeque;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
indexes::Interval,
page::{DataPage, FixedLenByteArrayPageDict},
schema::Repetition,
};
Expand All @@ -13,8 +12,9 @@ use crate::{
};

use super::super::utils::{
dict_indices_decoder, extend_from_decoder, next, not_implemented, split_buffer, DecodedState,
Decoder, MaybeNext, OptionalPageValidity, PageState, Pushable,
dict_indices_decoder, extend_from_decoder, get_selected_rows, next, not_implemented,
split_buffer, DecodedState, Decoder, FilteredOptionalPageValidity, MaybeNext,
OptionalPageValidity, PageState, Pushable,
};
use super::super::DataPages;
use super::utils::FixedSizeBinary;
Expand Down Expand Up @@ -65,12 +65,7 @@ impl<'a> FilteredRequired<'a> {
assert_eq!(values.len() % size, 0);
let values = values.chunks_exact(size);

let rows = page
.selected_rows()
.unwrap_or(&[Interval::new(0, page.num_values())])
.iter()
.copied()
.collect();
let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
Expand Down Expand Up @@ -124,6 +119,10 @@ enum State<'a> {
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalDictionary<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredOptional(
FilteredOptionalPageValidity<'a>,
std::slice::ChunksExact<'a, u8>,
),
}

impl<'a> PageState<'a> for State<'a> {
Expand All @@ -134,6 +133,7 @@ impl<'a> PageState<'a> for State<'a> {
State::RequiredDictionary(state) => state.len(),
State::OptionalDictionary(state) => state.validity.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(state, _) => state.len(),
}
}
}
Expand Down Expand Up @@ -184,6 +184,14 @@ impl<'a> Decoder<'a> for BinaryDecoder {
(Encoding::Plain, None, false, true) => Ok(State::FilteredRequired(
FilteredRequired::new(page, self.size),
)),
(Encoding::Plain, _, true, true) => {
let (_, _, values) = split_buffer(page);

Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
values.chunks_exact(self.size),
))
}
_ => Err(not_implemented(page)),
}
}
Expand Down Expand Up @@ -249,6 +257,15 @@ impl<'a> Decoder<'a> for BinaryDecoder {
values.push(x)
}
}
State::FilteredOptional(page_validity, page_values) => {
extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
page_values.by_ref(),
);
}
}
}
}
Expand Down
25 changes: 17 additions & 8 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::VecDeque;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
indexes::Interval,
page::{DataPage, PrimitivePageDict},
schema::Repetition,
types::decode,
Expand All @@ -16,7 +15,7 @@ use crate::{
};

use super::super::utils;
use super::super::utils::OptionalPageValidity;
use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity};
use super::super::DataPages;

#[derive(Debug)]
Expand All @@ -31,12 +30,7 @@ impl<'a> FilteredRequiredValues<'a> {

let values = values.chunks_exact(std::mem::size_of::<P>());

let rows = page
.selected_rows()
.unwrap_or(&[Interval::new(0, page.num_values())])
.iter()
.copied()
.collect();
let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
Expand Down Expand Up @@ -107,6 +101,7 @@ where
RequiredDictionary(ValuesDictionary<'a, P>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a, P>),
FilteredRequired(FilteredRequiredValues<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>),
}

impl<'a, P> utils::PageState<'a> for State<'a, P>
Expand All @@ -120,6 +115,7 @@ where
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(values) => values.len(),
State::FilteredOptional(optional, _) => optional.len(),
}
}
}
Expand Down Expand Up @@ -200,6 +196,10 @@ where
(Encoding::Plain, _, false, true) => Ok(State::FilteredRequired(
FilteredRequiredValues::new::<P>(page),
)),
(Encoding::Plain, _, true, true) => Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
Values::new::<P>(page),
)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -258,6 +258,15 @@ where
.take(remaining),
);
}
State::FilteredOptional(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
page_values.values.by_ref().map(decode).map(self.op),
);
}
}
}
}
Expand Down
111 changes: 106 additions & 5 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::collections::VecDeque;

use parquet2::deserialize::{FilteredHybridEncoded, HybridDecoderBitmapIter, HybridEncoded};
use parquet2::deserialize::{
FilteredHybridEncoded, FilteredHybridRleDecoderIter, HybridDecoderBitmapIter, HybridEncoded,
};
use parquet2::encoding::hybrid_rle;
use parquet2::indexes::Interval;
use parquet2::page::{split_buffer as _split_buffer, DataPage};
use parquet2::schema::Repetition;

Expand Down Expand Up @@ -86,6 +89,99 @@ impl<A: Copy + Default> Pushable<A> for Vec<A> {
}
}

/// The state of a partially deserialized page
pub(super) trait PageValidity<'a> {
fn next_limited(&mut self, limit: usize) -> Option<FilteredHybridEncoded<'a>>;
}

#[derive(Debug, Clone)]
pub struct FilteredOptionalPageValidity<'a> {
iter: FilteredHybridRleDecoderIter<'a>,
current: Option<(FilteredHybridEncoded<'a>, usize)>,
}

impl<'a> FilteredOptionalPageValidity<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, validity, _) = split_buffer(page);

let iter = hybrid_rle::Decoder::new(validity, 1);
let iter = HybridDecoderBitmapIter::new(iter, page.num_values());
let selected_rows = get_selected_rows(page);
let iter = FilteredHybridRleDecoderIter::new(iter, selected_rows);

Self {
iter,
current: None,
}
}

pub fn len(&self) -> usize {
self.iter.len()
}
}

pub fn get_selected_rows(page: &DataPage) -> VecDeque<Interval> {
page.selected_rows()
.unwrap_or(&[Interval::new(0, page.num_values())])
.iter()
.copied()
.collect()
}

impl<'a> PageValidity<'a> for FilteredOptionalPageValidity<'a> {
fn next_limited(&mut self, limit: usize) -> Option<FilteredHybridEncoded<'a>> {
let (run, own_offset) = if let Some((run, offset)) = self.current {
(run, offset)
} else {
// a new run
let run = self.iter.next()?; // no run -> None
self.current = Some((run, 0));
return self.next_limited(limit);
};

match run {
FilteredHybridEncoded::Bitmap {
values,
offset,
length,
} => {
let run_length = length - own_offset;

let length = limit.min(run_length);

if length == run_length {
self.current = None;
} else {
self.current = Some((run, own_offset + length));
}

Some(FilteredHybridEncoded::Bitmap {
values,
offset,
length,
})
}
FilteredHybridEncoded::Repeated { is_set, length } => {
let run_length = length - own_offset;

let length = limit.min(run_length);

if length == run_length {
self.current = None;
} else {
self.current = Some((run, own_offset + length));
}

Some(FilteredHybridEncoded::Repeated { is_set, length })
}
FilteredHybridEncoded::Skipped(set) => {
self.current = None;
Some(FilteredHybridEncoded::Skipped(set))
}
}
}
}

pub struct Zip<V, I> {
validity: V,
values: I,
Expand Down Expand Up @@ -185,10 +281,16 @@ impl<'a> OptionalPageValidity<'a> {
}
}

impl<'a> PageValidity<'a> for OptionalPageValidity<'a> {
fn next_limited(&mut self, limit: usize) -> Option<FilteredHybridEncoded<'a>> {
self.next_limited(limit)
}
}

/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder
pub(super) fn extend_from_decoder<T: Default, P: Pushable<T>, I: Iterator<Item = T>>(
pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<Item = T>>(
validity: &mut MutableBitmap,
page_validity: &mut OptionalPageValidity,
page_validity: &mut dyn PageValidity<'a>,
limit: Option<usize>,
pushable: &mut P,
mut values_iter: I,
Expand All @@ -207,10 +309,9 @@ pub(super) fn extend_from_decoder<T: Default, P: Pushable<T>, I: Iterator<Item =
offset,
length,
} => {
// consume `additional` items
// consume `length` items
let iter = BitmapIter::new(values, offset, length);
let iter = Zip::new(iter, &mut values_iter);
let iter = iter.skip(offset);

for item in iter {
if let Some(item) = item {
Expand Down

0 comments on commit 3377d83

Please sign in to comment.