Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for decoding delta-length-encoded binary (parquet) (#1228)
Browse files Browse the repository at this point in the history
Added delta
  • Loading branch information
jorgecarleitao committed Aug 16, 2022
1 parent 3b29c82 commit 052a80a
Show file tree
Hide file tree
Showing 5 changed files with 195 additions and 40 deletions.
170 changes: 131 additions & 39 deletions src/io/parquet/read/deserialize/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::default::Default;

use parquet2::{
deserialize::SliceFilteredIter,
encoding::{hybrid_rle, Encoding},
encoding::{delta_length_byte_array, hybrid_rle, Encoding},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
};
Expand All @@ -23,44 +23,6 @@ use super::super::utils::{
use super::super::Pages;
use super::{super::utils, utils::*};

/*
fn read_delta_optional<O: Offset>(
validity_buffer: &[u8],
values_buffer: &[u8],
additional: usize,
values: &mut Binary<O>,
validity: &mut MutableBitmap,
) {
let Binary {
offsets,
values,
last_offset,
} = values;
// values_buffer: first 4 bytes are len, remaining is values
let mut values_iterator = delta_length_byte_array::Decoder::new(values_buffer);
let offsets_iterator = values_iterator.by_ref().map(|x| {
*last_offset += O::from_usize(x as usize).unwrap();
*last_offset
});
let mut page_validity = OptionalPageValidity::new(validity_buffer, additional);
// offsets:
extend_from_decoder(
validity,
&mut page_validity,
None,
offsets,
offsets_iterator,
);
// values:
let new_values = values_iterator.into_values();
values.extend_from_slice(new_values);
}
*/

#[derive(Debug)]
pub(super) struct Required<'a> {
pub values: SizedBinaryIter<'a>,
Expand All @@ -79,6 +41,52 @@ impl<'a> Required<'a> {
}
}

#[derive(Debug)]
pub(super) struct Delta<'a> {
pub lengths: std::vec::IntoIter<usize>,
pub values: &'a [u8],
}

impl<'a> Delta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let (_, _, values) = split_buffer(page)?;

let mut lengths_iter = delta_length_byte_array::Decoder::new(values);

#[allow(clippy::needless_collect)] // we need to consume it to get the values
let lengths = lengths_iter
.by_ref()
.map(|x| x as usize)
.collect::<Vec<_>>();

let values = lengths_iter.into_values();
Ok(Self {
lengths: lengths.into_iter(),
values,
})
}

pub fn len(&self) -> usize {
self.lengths.size_hint().0
}
}

impl<'a> Iterator for Delta<'a> {
type Item = &'a [u8];

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let length = self.lengths.next()?;
let (item, remaining) = self.values.split_at(length);
self.values = remaining;
Some(item)
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.lengths.size_hint()
}
}

#[derive(Debug)]
pub(super) struct FilteredRequired<'a> {
pub values: SliceFilteredIter<SizedBinaryIter<'a>>,
Expand All @@ -99,6 +107,26 @@ impl<'a> FilteredRequired<'a> {
}
}

#[derive(Debug)]
pub(super) struct FilteredDelta<'a> {
pub values: SliceFilteredIter<Delta<'a>>,
}

impl<'a> FilteredDelta<'a> {
pub fn try_new(page: &'a DataPage) -> Result<Self> {
let values = Delta::try_new(page)?;

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Ok(Self { values })
}

pub fn len(&self) -> usize {
self.values.size_hint().0
}
}

pub(super) type Dict = Vec<Vec<u8>>;

#[derive(Debug)]
Expand Down Expand Up @@ -167,7 +195,11 @@ enum State<'a> {
Required(Required<'a>),
RequiredDictionary(RequiredDictionary<'a>),
OptionalDictionary(OptionalPageValidity<'a>, ValuesDictionary<'a>),
Delta(Delta<'a>),
OptionalDelta(OptionalPageValidity<'a>, Delta<'a>),
FilteredRequired(FilteredRequired<'a>),
FilteredDelta(FilteredDelta<'a>),
FilteredOptionalDelta(FilteredOptionalPageValidity<'a>, Delta<'a>),
FilteredOptional(FilteredOptionalPageValidity<'a>, BinaryIter<'a>),
FilteredRequiredDictionary(FilteredRequiredDictionary<'a>),
FilteredOptionalDictionary(FilteredOptionalPageValidity<'a>, ValuesDictionary<'a>),
Expand All @@ -178,10 +210,14 @@ impl<'a> utils::PageState<'a> for State<'a> {
match self {
State::Optional(validity, _) => validity.len(),
State::Required(state) => state.len(),
State::Delta(state) => state.len(),
State::OptionalDelta(state, _) => state.len(),
State::RequiredDictionary(values) => values.len(),
State::OptionalDictionary(optional, _) => optional.len(),
State::FilteredRequired(state) => state.len(),
State::FilteredOptional(validity, _) => validity.len(),
State::FilteredDelta(state) => state.len(),
State::FilteredOptionalDelta(state, _) => state.len(),
State::FilteredRequiredDictionary(values) => values.len(),
State::FilteredOptionalDictionary(optional, _) => optional.len(),
}
Expand Down Expand Up @@ -284,6 +320,20 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
BinaryIter::new(values),
))
}
(Encoding::DeltaLengthByteArray, _, false, false) => {
Delta::try_new(page).map(State::Delta)
}
(Encoding::DeltaLengthByteArray, _, true, false) => Ok(State::OptionalDelta(
OptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
(Encoding::DeltaLengthByteArray, _, false, true) => {
FilteredDelta::try_new(page).map(State::FilteredDelta)
}
(Encoding::DeltaLengthByteArray, _, true, true) => Ok(State::FilteredOptionalDelta(
FilteredOptionalPageValidity::try_new(page)?,
Delta::try_new(page)?,
)),
_ => Err(utils::not_implemented(page)),
}
}
Expand Down Expand Up @@ -315,11 +365,44 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
values.push(x)
}
}
State::Delta(page) => {
values.extend_lengths(page.lengths.by_ref().take(additional), &mut page.values);
}
State::OptionalDelta(page_validity, page_values) => {
let Binary {
offsets,
values: values_,
last_offset,
} = values;

let offset = *last_offset;
extend_from_decoder(
validity,
page_validity,
Some(additional),
offsets,
page_values.lengths.by_ref().map(|x| {
*last_offset += O::from_usize(x).unwrap();
*last_offset
}),
);

let length = *last_offset - offset;

let (consumed, remaining) = page_values.values.split_at(length.to_usize());
page_values.values = remaining;
values_.extend_from_slice(consumed);
}
State::FilteredRequired(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::FilteredDelta(page) => {
for x in page.values.by_ref().take(additional) {
values.push(x)
}
}
State::OptionalDictionary(page_validity, page_values) => {
let page_dict = &page_values.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down Expand Up @@ -348,6 +431,15 @@ impl<'a, O: Offset> utils::Decoder<'a> for BinaryDecoder<O> {
page_values.by_ref(),
);
}
State::FilteredOptionalDelta(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(additional),
values,
page_values.by_ref(),
);
}
State::FilteredRequiredDictionary(page) => {
let page_dict = &page.dict;
let op = move |index: u32| page_dict[index as usize].as_ref();
Expand Down
22 changes: 22 additions & 0 deletions src/io/parquet/read/deserialize/binary/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,17 @@ pub struct Binary<O: Offset> {
#[derive(Debug)]
pub struct Offsets<O: Offset>(pub Vec<O>);

impl<O: Offset> Offsets<O> {
#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I) {
let mut last_offset = *self.0.last().unwrap();
self.0.extend(lengths.map(|length| {
last_offset += O::from_usize(length).unwrap();
last_offset
}));
}
}

impl<O: Offset> Pushable<O> for Offsets<O> {
#[inline]
fn len(&self) -> usize {
Expand Down Expand Up @@ -63,6 +74,17 @@ impl<O: Offset> Binary<O> {
pub fn len(&self) -> usize {
self.offsets.len()
}

#[inline]
pub fn extend_lengths<I: Iterator<Item = usize>>(&mut self, lengths: I, values: &mut &[u8]) {
let current_offset = self.last_offset;
self.offsets.extend_lengths(lengths);
self.last_offset = *self.offsets.0.last().unwrap(); // guaranteed to have one
let length = self.last_offset.to_usize() - current_offset.to_usize();
let (consumed, remaining) = values.split_at(length);
*values = remaining;
self.values.extend_from_slice(consumed);
}
}

impl<'a, O: Offset> Pushable<&'a [u8]> for Binary<O> {
Expand Down
7 changes: 7 additions & 0 deletions src/io/parquet/write/binary/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,13 @@ pub(crate) fn encode_delta<O: Offset>(

delta_bitpacked::encode(lengths, buffer);
} else {
println!(
"{:?}",
offsets
.windows(2)
.map(|w| (w[1] - w[0]).to_usize() as i64)
.collect::<Vec<_>>()
);
let lengths = offsets.windows(2).map(|w| (w[1] - w[0]).to_usize() as i64);
delta_bitpacked::encode(lengths, buffer);
}
Expand Down
24 changes: 24 additions & 0 deletions tests/it/io/parquet/read_indexes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,18 @@ fn indexed_required_utf8() -> Result<()> {
read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected)
}

#[test]
fn indexed_required_utf8_delta() -> Result<()> {
let array21 = Utf8Array::<i32>::from_slice(["a", "b", "c"]);
let array22 = Utf8Array::<i32>::from_slice(["d", "e", "f"]);
let expected = Utf8Array::<i32>::from_slice(["e"]).boxed();

read_with_indexes(
pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?,
expected,
)
}

#[test]
fn indexed_required_i32() -> Result<()> {
let array21 = Int32Array::from_slice([1, 2, 3]);
Expand Down Expand Up @@ -194,6 +206,18 @@ fn indexed_optional_utf8() -> Result<()> {
read_with_indexes(pages(&[&array21, &array22], Encoding::Plain)?, expected)
}

#[test]
fn indexed_optional_utf8_delta() -> Result<()> {
let array21 = Utf8Array::<i32>::from([Some("a"), Some("b"), None]);
let array22 = Utf8Array::<i32>::from([None, Some("e"), Some("f")]);
let expected = Utf8Array::<i32>::from_slice(["e"]).boxed();

read_with_indexes(
pages(&[&array21, &array22], Encoding::DeltaLengthByteArray)?,
expected,
)
}

#[test]
fn indexed_required_fixed_len() -> Result<()> {
let array21 = FixedSizeBinaryArray::from_slice([[127], [128], [129]]);
Expand Down
12 changes: 11 additions & 1 deletion tests/it/io/parquet/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,6 @@ fn list_large_binary_optional_v1() -> Result<()> {
}

#[test]
#[ignore]
fn utf8_optional_v2_delta() -> Result<()> {
round_trip(
"string",
Expand All @@ -350,6 +349,17 @@ fn utf8_optional_v2_delta() -> Result<()> {
)
}

#[test]
fn utf8_required_v2_delta() -> Result<()> {
round_trip(
"string",
"required",
Version::V2,
CompressionOptions::Uncompressed,
vec![Encoding::DeltaLengthByteArray],
)
}

#[test]
fn i32_optional_v2_dict() -> Result<()> {
round_trip(
Expand Down

0 comments on commit 052a80a

Please sign in to comment.