Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Delayed dict (#1185)
Browse files Browse the repository at this point in the history
* Added bench

* Improved performance of reading from dictionary-encoded

* Simpler and added docs

* Finished migration
  • Loading branch information
jorgecarleitao committed Aug 11, 2022
1 parent 34f7913 commit 2a12d17
Show file tree
Hide file tree
Showing 27 changed files with 443 additions and 407 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ futures = { version = "0.3", optional = true }
async-stream = { version = "0.3.2", optional = true }

# parquet support
parquet2 = { version = "0.14.0", optional = true, default_features = false }
parquet2 = { version = "0.15.0", optional = true, default_features = false, features = ["async"] }

# avro support
avro-schema = { version = "0.3", optional = true }
Expand Down
4 changes: 4 additions & 0 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ fn add_benchmark(c: &mut Criterion) {
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, true, true, false, false);
let a = format!("read ts dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 11).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

Expand Down
4 changes: 2 additions & 2 deletions src/io/parquet/read/deserialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ module for non-nested arrays is `simple::page_iter_to_arrays`.

This function expects

* a (fallible) streaming iterator of decompressed and encoded pages, `DataPages`
* a (fallible) streaming iterator of decompressed and encoded pages, `Pages`
* the source (parquet) column type, including its logical information
* the target (arrow) `DataType`
* the chunk size
Expand All @@ -18,7 +18,7 @@ This design is shared among _all_ `(parquet, arrow)` implemented tuples. Their m
difference is how they are deserialized, which depends on the source and target types.

When the array iterator is pulled the first time, the following happens:
* a page from `DataPages` is pulled
* a page from `Pages` is pulled
* a `PageState<'a>` is built from the page
* the `PageState` is consumed into a mutable array:
* if `chunk_size` is larger than the number of rows in the page, the mutable array state is preserved and a new page is pulled and the process repeated until we fill a chunk.
Expand Down
106 changes: 41 additions & 65 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::default::Default;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
page::{split_buffer, BinaryPageDict, DataPage},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
};

Expand All @@ -20,7 +20,7 @@ use super::super::utils::{
extend_from_decoder, get_selected_rows, next, DecodedState, FilteredOptionalPageValidity,
MaybeNext, OptionalPageValidity,
};
use super::super::DataPages;
use super::super::Pages;
use super::{super::utils, utils::*};

/*
Expand Down Expand Up @@ -99,14 +99,16 @@ impl<'a> FilteredRequired<'a> {
}
}

pub(super) type Dict = Vec<Vec<u8>>;

#[derive(Debug)]
pub(super) struct RequiredDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> RequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
Expand All @@ -121,11 +123,11 @@ impl<'a> RequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct FilteredRequiredDictionary<'a> {
pub values: SliceFilteredIter<hybrid_rle::HybridRleDecoder<'a>>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> FilteredRequiredDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

let rows = get_selected_rows(page);
Expand All @@ -143,11 +145,11 @@ impl<'a> FilteredRequiredDictionary<'a> {
#[derive(Debug)]
pub(super) struct ValuesDictionary<'a> {
pub values: hybrid_rle::HybridRleDecoder<'a>,
pub dict: &'a BinaryPageDict,
pub dict: &'a Dict,
}

impl<'a> ValuesDictionary<'a> {
pub fn try_new(page: &'a DataPage, dict: &'a BinaryPageDict) -> Result<Self> {
pub fn try_new(page: &'a DataPage, dict: &'a Dict) -> Result<Self> {
let values = utils::dict_indices_decoder(page)?;

Ok(Self { dict, values })
Expand Down Expand Up @@ -219,7 +221,7 @@ impl<O: Offset> TraitBinaryArray<O> for Utf8Array<O> {
}
}

impl<'a, O: Offset> DecodedState<'a> for (Binary<O>, MutableBitmap) {
impl<O: Offset> DecodedState for (Binary<O>, MutableBitmap) {
fn len(&self) -> usize {
self.0.len()
}
Expand All @@ -232,42 +234,29 @@ struct BinaryDecoder<O: Offset> {

impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
type State = State<'a>;
type Dict = Dict;
type DecodedState = (Binary<O>, MutableBitmap);

fn build_state(&self, page: &'a DataPage) -> Result<Self::State> {
fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (
page.encoding(),
page.dictionary_page(),
is_optional,
is_filtered,
) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => {
Ok(State::RequiredDictionary(RequiredDictionary::try_new(
page,
dict.as_any().downcast_ref().unwrap(),
)?))
}
match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, false) => Ok(
State::RequiredDictionary(RequiredDictionary::try_new(page, dict)?),
),
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, false) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::OptionalDictionary(
OptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
))
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), false, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

FilteredRequiredDictionary::try_new(page, dict)
.map(State::FilteredRequiredDictionary)
}
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(dict), true, true) => {
let dict = dict.as_any().downcast_ref().unwrap();

Ok(State::FilteredOptionalDictionary(
FilteredOptionalPageValidity::try_new(page)?,
ValuesDictionary::try_new(page, dict)?,
Expand Down Expand Up @@ -332,15 +321,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
State::OptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Expand All @@ -350,14 +332,8 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
)
}
State::RequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
Expand All @@ -373,29 +349,16 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
);
}
State::FilteredRequiredDictionary(page) => {
let dict_values = page.dict.values();
let dict_offsets = page.dict.offsets();
let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();

for x in page.values.by_ref().map(op).take(additional) {
values.push(x)
}
}
State::FilteredOptionalDictionary(page_validity, page_values) => {
let dict_values = page_values.dict.values();
let dict_offsets = page_values.dict.offsets();

let op = move |index: u32| {
let index = index as usize;
let dict_offset_i = dict_offsets[index] as usize;
let dict_offset_ip1 = dict_offsets[index + 1] as usize;
&dict_values[dict_offset_i..dict_offset_ip1]
};
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
utils::extend_from_decoder(
validity,
page_validity,
Expand All @@ -406,6 +369,10 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
}
}
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
deserialize_plain(&page.buffer, page.num_values)
}
}

pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
Expand All @@ -421,35 +388,38 @@ pub(super) fn finish<O: Offset, A: TraitBinaryArray<O>>(
)
}

pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: DataPages> {
pub struct Iter<O: Offset, A: TraitBinaryArray<O>, I: Pages> {
iter: I,
data_type: DataType,
items: VecDeque<(Binary<O>, MutableBitmap)>,
dict: Option<Dict>,
chunk_size: Option<usize>,
remaining: usize,
phantom_a: std::marker::PhantomData<A>,
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iter<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iter<O, A, I> {
pub fn new(iter: I, data_type: DataType, chunk_size: Option<usize>, num_rows: usize) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
dict: None,
chunk_size,
remaining: num_rows,
phantom_a: Default::default(),
}
}
}

impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I> {
impl<O: Offset, A: TraitBinaryArray<O>, I: Pages> Iterator for Iter<O, A, I> {
type Item = Result<A>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.remaining,
self.chunk_size,
&BinaryDecoder::<O>::default(),
Expand All @@ -464,3 +434,9 @@ impl<O: Offset, A: TraitBinaryArray<O>, I: DataPages> Iterator for Iter<O, A, I>
}
}
}

pub(super) fn deserialize_plain(values: &[u8], num_values: usize) -> Dict {
SizedBinaryIter::new(values, num_values)
.map(|x| x.to_vec())
.collect()
}
Loading

0 comments on commit 2a12d17

Please sign in to comment.