Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Fixed support to read dict nested binary parquet (#924)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 21, 2022
1 parent 3981d73 commit be5c88e
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 44 deletions.
71 changes: 27 additions & 44 deletions src/io/parquet/read/deserialize/binary/basic.rs
Expand Up @@ -74,58 +74,40 @@ impl<'a> Required<'a> {
}
}

struct RequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct ValuesDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub remaining: usize,
pub dict: &'a BinaryPageDict,
}

impl<'a> RequiredDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page.buffer(), page.num_values());

Self {
values,
remaining: page.num_values(),
dict,
}
}
}

struct OptionalDictionary<'a> {
values: hybrid_rle::HybridRleDecoder<'a>,
validity: OptionalPageValidity<'a>,
dict: &'a BinaryPageDict,
}

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
impl<'a> ValuesDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = utils::dict_indices_decoder(indices_buffer, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(page),
dict,
}
Self { dict, values }
}

#[inline]
pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

enum State<'a> {
Optional(OptionalPageValidity<'a>, BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalDictionary<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.remaining,
State::RequiredDictionary(state) => state.remaining,
State::OptionalDictionary(state) => state.validity.len(),
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand Down Expand Up @@ -184,16 +166,18 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Ok(State::RequiredDictionary(RequiredDictionary::new(
Ok(State::RequiredDictionary(ValuesDictionary::new(
page,
dict.as_any().downcast_ref().unwrap(),
)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Ok(State::OptionalDictionary(OptionalDictionary::new(
page,
dict.as_any().downcast_ref().unwrap(),
)))
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::OptionalDictionary(
OptionalPageValidity::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = utils::split_buffer(page);
Expand Down Expand Up @@ -241,22 +225,22 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
extend_from_decoder(
utils::extend_from_decoder(
validity,
&mut page.validity,
page_validity,
Some(additional),
values,
&mut page.values.by_ref().map(op),
&mut page_values.values.by_ref().map(op),
)
}
State::RequiredDictionary(page) => {
Expand All @@ -269,7 +253,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
&dict_values[dict_offset_i..dict_offset_ip1]
};

page.remaining = page.remaining.saturating_sub(additional);
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
Expand Down
50 changes: 50 additions & 0 deletions src/io/parquet/read/deserialize/binary/nested.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::basic::ValuesDictionary;
use super::utils::Binary;
use super::{
super::utils,
Expand All @@ -20,13 +21,17 @@ use super::{
enum State<'a> {
Optional(Optional<'a>, utils::BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.remaining,
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand All @@ -45,6 +50,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, None, true) => {
let (_, _, values) = utils::split_buffer(page);

Expand Down Expand Up @@ -95,6 +111,40 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
State::OptionalDictionary(page_validity, page_values) => {
let max_def = page_validity.max_def();
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.values.by_ref().map(op),
values,
validity,
additional,
)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/it/io/parquet/read.rs
Expand Up @@ -269,6 +269,11 @@ fn v1_nested_utf8() -> Result<()> {
test_pyarrow_integration("list_utf8", 1, "nested", false, false, None)
}

#[test]
fn v1_nested_utf8_dict() -> Result<()> {
test_pyarrow_integration("list_utf8", 1, "nested", true, false, None)
}

#[test]
fn v2_nested_large_binary() -> Result<()> {
test_pyarrow_integration("list_large_binary", 2, "nested", false, false, None)
Expand Down

0 comments on commit be5c88e

Please sign in to comment.