Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Fixed support to read dict nested binary parquet #924

Merged
merged 2 commits into from Mar 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 27 additions & 44 deletions src/io/parquet/read/deserialize/binary/basic.rs
Expand Up @@ -74,58 +74,40 @@ impl<'a> Required<'a> {
}
}

struct RequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct ValuesDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub remaining: usize,
pub dict: &'a BinaryPageDict,
}

impl<'a> RequiredDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let values = utils::dict_indices_decoder(page.buffer(), page.num_values());

Self {
values,
remaining: page.num_values(),
dict,
}
}
}

struct OptionalDictionary<'a> {
values: hybrid_rle::HybridRleDecoder<'a>,
validity: OptionalPageValidity<'a>,
dict: &'a BinaryPageDict,
}

impl<'a> OptionalDictionary<'a> {
fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
impl<'a> ValuesDictionary<'a> {
pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self {
let (_, _, indices_buffer) = utils::split_buffer(page);

let values = utils::dict_indices_decoder(indices_buffer, page.num_values());

Self {
values,
validity: OptionalPageValidity::new(page),
dict,
}
Self { dict, values }
}

#[inline]
pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

enum State<'a> {
Optional(OptionalPageValidity<'a>, BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalDictionary<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.remaining,
State::RequiredDictionary(state) => state.remaining,
State::OptionalDictionary(state) => state.validity.len(),
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand Down Expand Up @@ -184,16 +166,18 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
Ok(State::RequiredDictionary(RequiredDictionary::new(
Ok(State::RequiredDictionary(ValuesDictionary::new(
page,
dict.as_any().downcast_ref().unwrap(),
)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
Ok(State::OptionalDictionary(OptionalDictionary::new(
page,
dict.as_any().downcast_ref().unwrap(),
)))
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::OptionalDictionary(
OptionalPageValidity::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, _, true) => {
let (_, _, values) = utils::split_buffer(page);
Expand Down Expand Up @@ -241,22 +225,22 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::OptionalDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
extend_from_decoder(
utils::extend_from_decoder(
validity,
&mut page.validity,
page_validity,
Some(additional),
values,
&mut page.values.by_ref().map(op),
&mut page_values.values.by_ref().map(op),
)
}
State::RequiredDictionary(page) => {
Expand All @@ -269,7 +253,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
&dict_values[dict_offset_i..dict_offset_ip1]
};

page.remaining = page.remaining.saturating_sub(additional);
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
Expand Down
50 changes: 50 additions & 0 deletions src/io/parquet/read/deserialize/binary/nested.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{

use super::super::nested_utils::*;
use super::super::utils::MaybeNext;
use super::basic::ValuesDictionary;
use super::utils::Binary;
use super::{
super::utils,
Expand All @@ -20,13 +21,17 @@ use super::{
enum State<'a> {
Optional(Optional<'a>, utils::BinaryIter<'a>),
Required(Required<'a>),
RequiredDictionary(ValuesDictionary<'a>),
OptionalDictionary(Optional<'a>, ValuesDictionary<'a>),
}

impl<'a> utils::PageState<'a> for State<'a> {
fn len(&self) -> usize {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.remaining,
State::RequiredDictionary(required) => required.len(),
State::OptionalDictionary(optional, _) => optional.len(),
}
}
}
Expand All @@ -45,6 +50,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional;

match (page.encoding(), page.dictionary_page(), is_optional) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict)))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => {
let dict = dict.as_any().downcast_ref().unwrap();
Ok(State::OptionalDictionary(
Optional::new(page),
ValuesDictionary::new(page, dict),
))
}
(Encoding::Plain, None, true) => {
let (_, _, values) = utils::split_buffer(page);

Expand Down Expand Up @@ -95,6 +111,40 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
State::OptionalDictionary(page_validity, page_values) => {
let max_def = page_validity.max_def();
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
read_optional_values(
page_validity.definition_levels.by_ref(),
max_def,
page_values.values.by_ref().map(op),
values,
validity,
additional,
)
}
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions tests/it/io/parquet/read.rs
Expand Up @@ -269,6 +269,11 @@ fn v1_nested_utf8() -> Result<()> {
test_pyarrow_integration("list_utf8", 1, "nested", false, false, None)
}

#[test]
fn v1_nested_utf8_dict() -> Result<()> {
test_pyarrow_integration("list_utf8", 1, "nested", true, false, None)
}

#[test]
fn v2_nested_large_binary() -> Result<()> {
test_pyarrow_integration("list_large_binary", 2, "nested", false, false, None)
Expand Down