From be5c88e87602354b45bb2405a4ed3260b3effb7b Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 21 Mar 2022 07:35:41 +0100 Subject: [PATCH] Fixed support to read dict nested binary parquet (#924) --- .../parquet/read/deserialize/binary/basic.rs | 71 +++++++------------ .../parquet/read/deserialize/binary/nested.rs | 50 +++++++++++++ tests/it/io/parquet/read.rs | 5 ++ 3 files changed, 82 insertions(+), 44 deletions(-) diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 10d3ec07c49..6b0b8f1bd9a 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -74,49 +74,31 @@ impl<'a> Required<'a> { } } -struct RequiredDictionary<'a> { +#[derive(Debug)] +pub(super) struct ValuesDictionary<'a> { pub values: hybrid_rle::HybridRleDecoder<'a>, - pub remaining: usize, pub dict: &'a BinaryPageDict, } -impl<'a> RequiredDictionary<'a> { - fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { - let values = utils::dict_indices_decoder(page.buffer(), page.num_values()); - - Self { - values, - remaining: page.num_values(), - dict, - } - } -} - -struct OptionalDictionary<'a> { - values: hybrid_rle::HybridRleDecoder<'a>, - validity: OptionalPageValidity<'a>, - dict: &'a BinaryPageDict, -} - -impl<'a> OptionalDictionary<'a> { - fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { +impl<'a> ValuesDictionary<'a> { + pub fn new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Self { let (_, _, indices_buffer) = utils::split_buffer(page); - let values = utils::dict_indices_decoder(indices_buffer, page.num_values()); - Self { - values, - validity: OptionalPageValidity::new(page), - dict, - } + Self { dict, values } + } + + #[inline] + pub fn len(&self) -> usize { + self.values.size_hint().0 } } enum State<'a> { Optional(OptionalPageValidity<'a>, BinaryIter<'a>), Required(Required<'a>), - RequiredDictionary(RequiredDictionary<'a>), - OptionalDictionary(OptionalDictionary<'a>), + RequiredDictionary(ValuesDictionary<'a>), + OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>), } impl<'a> utils::PageState<'a> for State<'a> { @@ -124,8 +106,8 @@ impl<'a> utils::PageState<'a> for State<'a> { match self { State::Optional(validity, _) => validity.len(), State::Required(state) => state.remaining, - State::RequiredDictionary(state) => state.remaining, - State::OptionalDictionary(state) => state.validity.len(), + State::RequiredDictionary(values) => values.len(), + State::OptionalDictionary(optional, _) => optional.len(), } } } @@ -184,16 +166,18 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { match (page.encoding(), page.dictionary_page(), is_optional) { (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { - Ok(State::RequiredDictionary(RequiredDictionary::new( + Ok(State::RequiredDictionary(ValuesDictionary::new( page, dict.as_any().downcast_ref().unwrap(), ))) } (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { - Ok(State::OptionalDictionary(OptionalDictionary::new( - page, - dict.as_any().downcast_ref().unwrap(), - ))) + let dict = dict.as_any().downcast_ref().unwrap(); + + Ok(State::OptionalDictionary( + OptionalPageValidity::new(page), + ValuesDictionary::new(page, dict), + )) } (Encoding::Plain, _, true) => { let (_, _, values) = utils::split_buffer(page); @@ -241,9 +225,9 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { values.push(x) } } - State::OptionalDictionary(page) => { - let dict_values = page.dict.values(); - let dict_offsets = page.dict.offsets(); + State::OptionalDictionary(page_validity, page_values) => { + let dict_values = page_values.dict.values(); + let dict_offsets = page_values.dict.offsets(); let op = move |index: u32| { let index = index as usize; @@ -251,12 +235,12 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { let dict_offset_ip1 = dict_offsets[index + 1] as usize; &dict_values[dict_offset_i..dict_offset_ip1] }; - extend_from_decoder( + utils::extend_from_decoder( validity, - &mut page.validity, + page_validity, Some(additional), values, - &mut page.values.by_ref().map(op), + &mut page_values.values.by_ref().map(op), ) } State::RequiredDictionary(page) => { @@ -269,7 +253,6 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { &dict_values[dict_offset_i..dict_offset_ip1] }; - page.remaining = page.remaining.saturating_sub(additional); for x in page.values.by_ref().map(op).take(additional) { values.push(x) } diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index 53a86523614..522cc63cf49 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -9,6 +9,7 @@ use crate::{ use super::super::nested_utils::*; use super::super::utils::MaybeNext; +use super::basic::ValuesDictionary; use super::utils::Binary; use super::{ super::utils, @@ -20,6 +21,8 @@ use super::{ enum State<'a> { Optional(Optional<'a>, utils::BinaryIter<'a>), Required(Required<'a>), + RequiredDictionary(ValuesDictionary<'a>), + OptionalDictionary(Optional<'a>, ValuesDictionary<'a>), } impl<'a> utils::PageState<'a> for State<'a> { @@ -27,6 +30,8 @@ impl<'a> utils::PageState<'a> for State<'a> { match self { State::Optional(validity, _) => validity.len(), State::Required(state) => state.remaining, + State::RequiredDictionary(required) => required.len(), + State::OptionalDictionary(optional, _) => optional.len(), } } } @@ -45,6 +50,17 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { page.descriptor().type_().get_basic_info().repetition() == &Repetition::Optional; match (page.encoding(), page.dictionary_page(), is_optional) { + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false) => { + let dict = dict.as_any().downcast_ref().unwrap(); + Ok(State::RequiredDictionary(ValuesDictionary::new(page, dict))) + } + (Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true) => { + let dict = dict.as_any().downcast_ref().unwrap(); + Ok(State::OptionalDictionary( + Optional::new(page), + ValuesDictionary::new(page, dict), + )) + } (Encoding::Plain, None, true) => { let (_, _, values) = utils::split_buffer(page); @@ -95,6 +111,40 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder { values.push(x) } } + State::RequiredDictionary(page) => { + let dict_values = page.dict.values(); + let dict_offsets = page.dict.offsets(); + + let op = move |index: u32| { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + &dict_values[dict_offset_i..dict_offset_ip1] + }; + for x in page.values.by_ref().map(op).take(additional) { + values.push(x) + } + } + State::OptionalDictionary(page_validity, page_values) => { + let max_def = page_validity.max_def(); + let dict_values = page_values.dict.values(); + let dict_offsets = page_values.dict.offsets(); + + let op = move |index: u32| { + let index = index as usize; + let dict_offset_i = dict_offsets[index] as usize; + let dict_offset_ip1 = dict_offsets[index + 1] as usize; + &dict_values[dict_offset_i..dict_offset_ip1] + }; + read_optional_values( + page_validity.definition_levels.by_ref(), + max_def, + page_values.values.by_ref().map(op), + values, + validity, + additional, + ) + } } } } diff --git a/tests/it/io/parquet/read.rs b/tests/it/io/parquet/read.rs index 13fde93610e..3a946f71a0d 100644 --- a/tests/it/io/parquet/read.rs +++ b/tests/it/io/parquet/read.rs @@ -269,6 +269,11 @@ fn v1_nested_utf8() -> Result<()> { test_pyarrow_integration("list_utf8", 1, "nested", false, false, None) } +#[test] +fn v1_nested_utf8_dict() -> Result<()> { + test_pyarrow_integration("list_utf8", 1, "nested", true, false, None) +} + #[test] fn v2_nested_large_binary() -> Result<()> { test_pyarrow_integration("list_large_binary", 2, "nested", false, false, None)