Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read and write delta-bitpacked
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 16, 2022
1 parent 052a80a commit 7396e7e
Show file tree
Hide file tree
Showing 13 changed files with 493 additions and 69 deletions.
8 changes: 4 additions & 4 deletions arrow-parquet-integration-testing/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _prepare(
write,
"--version",
version,
"--encoding-utf8",
"--encoding-int",
encoding_utf8,
"--compression",
compression,
Expand Down Expand Up @@ -75,8 +75,8 @@ def variations():
# "generated_custom_metadata",
]:
# pyarrow does not support decoding "delta"-encoded values.
# for encoding in ["plain", "delta"]:
for encoding in ["plain"]:
for encoding in ["plain", "delta"]:
#for encoding in ["plain"]:
for compression in ["uncompressed", "zstd", "snappy"]:
yield (version, file, compression, encoding)

Expand All @@ -95,4 +95,4 @@ def variations():
if str(c1.type) in ["month_interval", "day_time_interval"]:
# pyarrow does not support interval types from parquet
continue
assert c1 == c2
assert c1 == c2, (c1, c2)
13 changes: 11 additions & 2 deletions arrow-parquet-integration-testing/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::fs::File;
use std::{io::Read};
use std::io::Read;

use arrow2::array::Array;
use arrow2::io::ipc::IpcField;
use arrow2::{
AHashMap,
chunk::Chunk,
datatypes::{DataType, Schema},
error::Result,
Expand All @@ -16,6 +15,7 @@ use arrow2::{
RowGroupIterator, Version as ParquetVersion, WriteOptions,
},
},
AHashMap,
};
use clap::Parser;
use flate2::read::GzDecoder;
Expand Down Expand Up @@ -110,6 +110,8 @@ struct Args {
projection: Option<String>,
#[clap(short, long, arg_enum, help = "encoding scheme for utf8", default_value_t = EncodingScheme::Plain)]
encoding_utf8: EncodingScheme,
#[clap(short('i'), long, arg_enum, help = "encoding scheme for int", default_value_t = EncodingScheme::Plain)]
encoding_int: EncodingScheme,
#[clap(short, long, arg_enum)]
compression: Compression,
}
Expand Down Expand Up @@ -178,6 +180,13 @@ fn main() -> Result<()> {
.map(|f| {
transverse(&f.data_type, |dt| match dt {
DataType::Dictionary(..) => Encoding::RleDictionary,
DataType::Int32 => {
if args.encoding_int == EncodingScheme::Delta {
Encoding::DeltaBinaryPacked
} else {
Encoding::Plain
}
}
DataType::Utf8 | DataType::LargeUtf8 => {
if args.encoding_utf8 == EncodingScheme::Delta {
Encoding::DeltaLengthByteArray
Expand Down
16 changes: 8 additions & 8 deletions src/io/parquet/read/deserialize/primitive/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use super::super::utils::{get_selected_rows, FilteredOptionalPageValidity, Optio
use super::super::Pages;

#[derive(Debug)]
struct FilteredRequiredValues<'a> {
pub(super) struct FilteredRequiredValues<'a> {
values: SliceFilteredIter<std::slice::ChunksExact<'a, u8>>,
}

Expand Down Expand Up @@ -89,7 +89,7 @@ where

// The state of a `DataPage` of `Primitive` parquet primitive type
#[derive(Debug)]
enum State<'a, T>
pub(super) enum State<'a, T>
where
T: NativeType,
{
Expand Down Expand Up @@ -118,15 +118,15 @@ where
}

#[derive(Debug)]
struct PrimitiveDecoder<T, P, F>
pub(super) struct PrimitiveDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
F: Fn(P) -> T,
{
phantom: std::marker::PhantomData<T>,
phantom_p: std::marker::PhantomData<P>,
op: F,
pub op: F,
}

impl<T, P, F> PrimitiveDecoder<T, P, F>
Expand All @@ -136,7 +136,7 @@ where
F: Fn(P) -> T,
{
#[inline]
fn new(op: F) -> Self {
pub(super) fn new(op: F) -> Self {
Self {
phantom: std::marker::PhantomData,
phantom_p: std::marker::PhantomData,
Expand Down Expand Up @@ -183,9 +183,9 @@ where
Ok(State::Optional(validity, values))
}
(Encoding::Plain, _, false, false) => Ok(State::Required(Values::try_new::<P>(page)?)),
(Encoding::Plain, _, false, true) => Ok(State::FilteredRequired(
FilteredRequiredValues::try_new::<P>(page)?,
)),
(Encoding::Plain, _, false, true) => {
FilteredRequiredValues::try_new::<P>(page).map(State::FilteredRequired)
}
(Encoding::Plain, _, true, true) => Ok(State::FilteredOptional(
FilteredOptionalPageValidity::try_new(page)?,
Values::try_new::<P>(page)?,
Expand Down
260 changes: 260 additions & 0 deletions src/io/parquet/read/deserialize/primitive/integer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
use std::collections::VecDeque;

use num_traits::AsPrimitive;
use parquet2::{
deserialize::SliceFilteredIter,
encoding::{delta_bitpacked::Decoder, Encoding},
page::{split_buffer, DataPage, DictPage},
schema::Repetition,
types::NativeType as ParquetNativeType,
};

use crate::{
array::MutablePrimitiveArray,
bitmap::MutableBitmap,
datatypes::DataType,
error::Result,
io::parquet::read::deserialize::utils::{
get_selected_rows, FilteredOptionalPageValidity, OptionalPageValidity,
},
types::NativeType,
};

use super::super::utils;
use super::super::Pages;

use super::basic::{finish, PrimitiveDecoder, State as PrimitiveState};

/// The state of a [`DataPage`] of an integer parquet type (i32 or i64)
#[derive(Debug)]
enum State<'a, T>
where
T: NativeType,
{
Common(PrimitiveState<'a, T>),
DeltaBinaryPackedRequired(Decoder<'a>),
DeltaBinaryPackedOptional(OptionalPageValidity<'a>, Decoder<'a>),
FilteredDeltaBinaryPackedRequired(SliceFilteredIter<Decoder<'a>>),
FilteredDeltaBinaryPackedOptional(FilteredOptionalPageValidity<'a>, Decoder<'a>),
}

impl<'a, T> utils::PageState<'a> for State<'a, T>
where
T: NativeType,
{
fn len(&self) -> usize {
match self {
State::Common(state) => state.len(),
State::DeltaBinaryPackedRequired(state) => state.size_hint().0,
State::DeltaBinaryPackedOptional(state, _) => state.len(),
State::FilteredDeltaBinaryPackedRequired(state) => state.size_hint().0,
State::FilteredDeltaBinaryPackedOptional(state, _) => state.len(),
}
}
}

/// Decoder of integer parquet type
#[derive(Debug)]
struct IntDecoder<T, P, F>(PrimitiveDecoder<T, P, F>)
where
T: NativeType,
P: ParquetNativeType,
i64: num_traits::AsPrimitive<P>,
F: Fn(P) -> T;

impl<T, P, F> IntDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
i64: num_traits::AsPrimitive<P>,
F: Fn(P) -> T,
{
#[inline]
fn new(op: F) -> Self {
Self(PrimitiveDecoder::new(op))
}
}

impl<'a, T, P, F> utils::Decoder<'a> for IntDecoder<T, P, F>
where
T: NativeType,
P: ParquetNativeType,
i64: num_traits::AsPrimitive<P>,
F: Copy + Fn(P) -> T,
{
type State = State<'a, T>;
type Dict = Vec<T>;
type DecodedState = (Vec<T>, MutableBitmap);

fn build_state(&self, page: &'a DataPage, dict: Option<&'a Self::Dict>) -> Result<Self::State> {
let is_optional =
page.descriptor.primitive_type.field_info.repetition == Repetition::Optional;
let is_filtered = page.selected_rows().is_some();

match (page.encoding(), dict, is_optional, is_filtered) {
(Encoding::DeltaBinaryPacked, _, false, false) => {
let (_, _, values) = split_buffer(page)?;
Ok(State::DeltaBinaryPackedRequired(Decoder::new(values)))
}
(Encoding::DeltaBinaryPacked, _, true, false) => {
let (_, _, values) = split_buffer(page)?;
Ok(State::DeltaBinaryPackedOptional(
OptionalPageValidity::try_new(page)?,
Decoder::new(values),
))
}
(Encoding::DeltaBinaryPacked, _, false, true) => {
let (_, _, values) = split_buffer(page)?;
let values = Decoder::new(values);

let rows = get_selected_rows(page);
let values = SliceFilteredIter::new(values, rows);

Ok(State::FilteredDeltaBinaryPackedRequired(values))
}
(Encoding::DeltaBinaryPacked, _, true, true) => {
let (_, _, values) = split_buffer(page)?;
let values = Decoder::new(values);

Ok(State::FilteredDeltaBinaryPackedOptional(
FilteredOptionalPageValidity::try_new(page)?,
values,
))
}
_ => self.0.build_state(page, dict).map(State::Common),
}
}

fn with_capacity(&self, capacity: usize) -> Self::DecodedState {
self.0.with_capacity(capacity)
}

fn extend_from_state(
&self,
state: &mut Self::State,
decoded: &mut Self::DecodedState,
remaining: usize,
) {
let (values, validity) = decoded;
match state {
State::Common(state) => self.0.extend_from_state(state, decoded, remaining),
State::DeltaBinaryPackedRequired(state) => {
values.extend(
state
.by_ref()
.map(|x| x.as_())
.map(self.0.op)
.take(remaining),
);
}
State::DeltaBinaryPackedOptional(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
page_values.by_ref().map(|x| x.as_()).map(self.0.op),
)
}
State::FilteredDeltaBinaryPackedRequired(page) => {
values.extend(
page.by_ref()
.map(|x| x.as_())
.map(self.0.op)
.take(remaining),
);
}
State::FilteredDeltaBinaryPackedOptional(page_validity, page_values) => {
utils::extend_from_decoder(
validity,
page_validity,
Some(remaining),
values,
page_values.by_ref().map(|x| x.as_()).map(self.0.op),
);
}
}
}

fn deserialize_dict(&self, page: &DictPage) -> Self::Dict {
self.0.deserialize_dict(page)
}
}

/// An [`Iterator`] adapter over [`Pages`] assumed to be encoded as primitive arrays
/// encoded as parquet integer types
#[derive(Debug)]
pub struct IntegerIter<T, I, P, F>
where
I: Pages,
T: NativeType,
P: ParquetNativeType,
F: Fn(P) -> T,
{
iter: I,
data_type: DataType,
items: VecDeque<(Vec<T>, MutableBitmap)>,
remaining: usize,
chunk_size: Option<usize>,
dict: Option<Vec<T>>,
op: F,
phantom: std::marker::PhantomData<P>,
}

impl<T, I, P, F> IntegerIter<T, I, P, F>
where
I: Pages,
T: NativeType,

P: ParquetNativeType,
F: Copy + Fn(P) -> T,
{
pub fn new(
iter: I,
data_type: DataType,
num_rows: usize,
chunk_size: Option<usize>,
op: F,
) -> Self {
Self {
iter,
data_type,
items: VecDeque::new(),
dict: None,
remaining: num_rows,
chunk_size,
op,
phantom: Default::default(),
}
}
}

impl<T, I, P, F> Iterator for IntegerIter<T, I, P, F>
where
I: Pages,
T: NativeType,
P: ParquetNativeType,
i64: num_traits::AsPrimitive<P>,
F: Copy + Fn(P) -> T,
{
type Item = Result<MutablePrimitiveArray<T>>;

fn next(&mut self) -> Option<Self::Item> {
let maybe_state = utils::next(
&mut self.iter,
&mut self.items,
&mut self.dict,
&mut self.remaining,
self.chunk_size,
&IntDecoder::new(self.op),
);
match maybe_state {
utils::MaybeNext::Some(Ok((values, validity))) => {
Some(Ok(finish(&self.data_type, values, validity)))
}
utils::MaybeNext::Some(Err(e)) => Some(Err(e)),
utils::MaybeNext::None => None,
utils::MaybeNext::More => self.next(),
}
}
}
2 changes: 2 additions & 0 deletions src/io/parquet/read/deserialize/primitive/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod basic;
mod dictionary;
mod integer;
mod nested;

pub use basic::Iter;
pub use dictionary::{DictIter, NestedDictIter};
pub use integer::IntegerIter;
pub use nested::NestedIter;
Loading

0 comments on commit 7396e7e

Please sign in to comment.