Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Gate filtered cases
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 11, 2022
1 parent 3377d83 commit 09d97f3
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 43 deletions.
18 changes: 12 additions & 6 deletions src/io/parquet/read/deserialize/binary/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,27 +48,33 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let is_filtered = page.selected_rows().is_some();

match (
page.encoding(),
page.dictionary_page(),
is_optional,
is_filtered,
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, None, true) => {
(Encoding::Plain, None, true, false) => {
let (_, _, values) = utils::split_buffer(page);

let values = BinaryIter::new(values);

Ok(State::Optional(Optional::new(page), values))
}
(Encoding::Plain, None, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, None, false, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
92 changes: 70 additions & 22 deletions src/io/parquet/read/deserialize/boolean/basic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::VecDeque;

use parquet2::{encoding::Encoding, page::DataPage, schema::Repetition};
use parquet2::{
deserialize::SliceFilteredIter, encoding::Encoding, page::DataPage, schema::Repetition,
};

use crate::{
array::BooleanArray,
Expand All @@ -11,25 +13,19 @@ use crate::{

use super::super::utils;
use super::super::utils::{
extend_from_decoder, next, split_buffer, DecodedState, Decoder, MaybeNext, OptionalPageValidity,
extend_from_decoder, get_selected_rows, next, split_buffer, DecodedState, Decoder,
FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;

// The state of an optional DataPage with a boolean physical type
#[derive(Debug)]
struct Optional<'a> {
values: BitmapIter<'a>,
validity: OptionalPageValidity<'a>,
}
struct Values<'a>(BitmapIter<'a>);

impl<'a> Optional<'a> {
impl<'a> Values<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, _, values_buffer) = split_buffer(page);
let (_, _, values) = split_buffer(page);

Self {
values: BitmapIter::new(values_buffer, 0, values_buffer.len() * 8),
validity: OptionalPageValidity::new(page),
}
Self(BitmapIter::new(values, 0, values.len() * 8))
}
}

Expand All @@ -52,18 +48,44 @@ impl<'a> Required<'a> {
}
}

#[derive(Debug)]
struct FilteredRequired<'a> {
values: SliceFilteredIter<BitmapIter<'a>>,
}

impl<'a> FilteredRequired<'a> {
pub fn new(page: &'a DataPage) -> Self {
// todo: replace this by an iterator over slices, for faster deserialization
let values = BitmapIter::new(page.buffer(), 0, page.num_values());

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
}

#[inline]
pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

// The state of a `DataPage` of `Boolean` parquet boolean type
#[derive(Debug)]
enum State<'a> {
Optional(Optional<'a>),
Optional(OptionalPageValidity<'a>, Values<'a>),
Required(Required<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, Values<'a>),
}

impl<'a> State<'a> {
pub fn len(&self) -> usize {
match self {
State::Optional(page) => page.validity.len(),
State::Optional(validity, _) => validity.len(),
State::Required(page) => page.length - page.offset,
State::FilteredRequired(page) => page.len(),
State::FilteredOptional(optional, _) => optional.len(),
}
}
}
Expand All @@ -90,10 +112,21 @@ impl<'a> Decoder<'a> for BooleanDecoder {
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => Ok(State::Optional(Optional::new(page))),
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => Ok(State::Optional(
OptionalPageValidity::new(page),
Values::new(page),
)),
(Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, true, true) => Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
Values::new(page),
)),
(Encoding::Plain, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
}
_ => Err(utils::not_implemented(page)),
}
}
Expand All @@ -113,18 +146,33 @@ impl<'a> Decoder<'a> for BooleanDecoder {
) {
let (values, validity) = decoded;
match state {
State::Optional(page) => extend_from_decoder(
State::Optional(page_validity, page_values) => extend_from_decoder(
validity,
&mut page.validity,
page_validity,
Some(remaining),
values,
&mut page.values,
&mut page_values.0,
),
State::Required(page) => {
let remaining = remaining.min(page.length - page.offset);
values.extend_from_slice(page.values, page.offset, remaining);
page.offset += remaining;
}
State::FilteredRequired(page) => {
values.reserve(remaining);
for item in page.values.by_ref().take(remaining) {
values.push(item)
}
}
State::FilteredOptional(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
page_values.0.by_ref(),
);
}
}
}
}
Expand Down
7 changes: 4 additions & 3 deletions src/io/parquet/read/deserialize/boolean/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,16 @@ impl<'a> Decoder<'a> for BooleanDecoder {
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), is_optional) {
(Encoding::Plain, true) => {
match (page.encoding(), is_optional, is_filtered) {
(Encoding::Plain, true, false) => {
let (_, _, values) = utils::split_buffer(page);
let values = BitmapIter::new(values, 0, values.len() * 8);

Ok(State::Optional(Optional::new(page), values))
}
(Encoding::Plain, false) => Ok(State::Required(Required::new(page))),
(Encoding::Plain, false, false) => Ok(State::Required(Required::new(page))),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
56 changes: 51 additions & 5 deletions src/io/parquet/read/deserialize/dictionary.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{collections::VecDeque, sync::Arc};

use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle::HybridRleDecoder, Encoding},
page::{DataPage, DictPage},
schema::Repetition,
Expand All @@ -14,8 +15,8 @@ use crate::{

use super::{
utils::{
self, dict_indices_decoder, extend_from_decoder, DecodedState, Decoder, MaybeNext,
OptionalPageValidity,
self, dict_indices_decoder, extend_from_decoder, get_selected_rows, DecodedState, Decoder,
FilteredOptionalPageValidity, MaybeNext, OptionalPageValidity,
},
DataPages,
};
Expand All @@ -25,6 +26,8 @@ use super::{
pub enum State<'a> {
Optional(Optional<'a>),
Required(Required<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, HybridRleDecoder<'a>),
}

#[derive(Debug)]
Expand All @@ -39,6 +42,22 @@ impl<'a> Required<'a> {
}
}

#[derive(Debug)]
pub struct FilteredRequired<'a> {
values: SliceFilteredIter<HybridRleDecoder<'a>>,
}

impl<'a> FilteredRequired<'a> {
fn new(page: &'a DataPage) -> Self {
let values = dict_indices_decoder(page);

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Self { values }
}
}

#[derive(Debug)]
pub struct Optional<'a> {
values: HybridRleDecoder<'a>,
Expand All @@ -61,6 +80,8 @@ impl<'a> utils::PageState<'a> for State<'a> {
match self {
State::Optional(optional) => optional.validity.len(),
State::Required(required) => required.values.size_hint().0,
State::FilteredRequired(required) => required.values.size_hint().0,
State::FilteredOptional(validity, _) => validity.len(),
}
}
}
Expand Down Expand Up @@ -95,14 +116,24 @@ where
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, false) => {
match (page.encoding(), is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, false, false) => {
Ok(State::Required(Required::new(page)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, true) => {
(Encoding::PlainDictionary | Encoding::RleDictionary, true, false) => {
Ok(State::Optional(Optional::new(page)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, false, true) => {
Ok(State::FilteredRequired(FilteredRequired::new(page)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, true, true) => {
Ok(State::FilteredOptional(
FilteredOptionalPageValidity::new(page),
dict_indices_decoder(page),
))
}
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -137,6 +168,21 @@ where
.take(remaining),
);
}
State::FilteredOptional(page_validity, page_values) => extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
&mut page_values.by_ref().map(|x| K::from_u32(x).unwrap()),
),
State::FilteredRequired(page) => {
values.extend(
page.values
.by_ref()
.map(|x| K::from_u32(x).unwrap())
.take(remaining),
);
}
}
}
}
Expand Down
18 changes: 12 additions & 6 deletions src/io/parquet/read/deserialize/primitive/nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,23 +82,29 @@ where
fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let is_filtered = page.selected_rows().is_some();

match (
page.encoding(),
page.dictionary_page(),
is_optional,
is_filtered,
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true) => {
(Encoding::Plain, _, true, false) => {
Ok(State::Optional(Optional::new(page), Values::new::<P>(page)))
}
(Encoding::Plain, _, false) => Ok(State::Required(Values::new::<P>(page))),
(Encoding::Plain, _, false, false) => Ok(State::Required(Values::new::<P>(page))),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down
5 changes: 4 additions & 1 deletion src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,21 @@ use super::super::DataPages;

pub fn not_implemented(page: &DataPage) -> ArrowError {
let is_optional = page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();
let required = if is_optional { "optional" } else { "required" };
let is_filtered = if is_filtered { ", index-filtered" } else { "" };
let dict = if page.dictionary_page().is_some() {
", dictionary-encoded"
} else {
""
};
ArrowError::NotYetImplemented(format!(
"Decoding {:?} \"{:?}\"-encoded{} {} parquet pages",
"Decoding {:?} \"{:?}\"-encoded{} {} {} parquet pages",
page.descriptor.primitive_type.physical_type,
page.encoding(),
dict,
required,
is_filtered,
))
}

Expand Down

0 comments on commit 09d97f3

Please sign in to comment.