Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Migrated to new decoder
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Apr 11, 2022
1 parent 261137f commit e95116b
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 123 deletions.
22 changes: 4 additions & 18 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ ahash = { version = "0.7", optional = true }

# parquet support
#parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "filtered", optional = true, default_features = false, features = ["stream"] }
parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "filter", optional = true, default_features = false, features = ["stream"] }

# avro support
avro-schema = { version = "0.2", optional = true }
Expand Down Expand Up @@ -108,26 +108,12 @@ rustdoc-args = ["--cfg", "docsrs"]
[features]
default = []
full = [
"io_odbc",
"io_csv",
"io_csv_async",
"io_json",
"io_ipc",
"io_flight",
"io_ipc_write_async",
"io_ipc_read_async",
"io_ipc_compression",
"io_json_integration",
"io_print",
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_avro_compression",
"io_avro_async",
"regex",
"compute",
#"regex",
#"compute",
# parses timezones used in timestamp conversions
"chrono-tz",
#"chrono-tz",
]
io_odbc = ["odbc-api"]
io_csv = ["io_csv_read", "io_csv_write"]
Expand Down
214 changes: 109 additions & 105 deletions src/io/parquet/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::collections::VecDeque;

use parquet2::deserialize::{FilteredHybridEncoded, HybridDecoderBitmapIter, HybridEncoded};
use parquet2::encoding::hybrid_rle;
use parquet2::page::{split_buffer as _split_buffer, DataPage};
use parquet2::schema::Repetition;
use streaming_iterator::{convert, Convert, StreamingIterator};

use crate::bitmap::utils::BitmapIter;
use crate::bitmap::MutableBitmap;
Expand Down Expand Up @@ -86,44 +86,6 @@ impl<A: Copy + Default> Pushable<A> for Vec<A> {
}
}

#[derive(Debug)]
pub struct OptionalPageValidity<'a> {
validity: Convert<hybrid_rle::Decoder<'a>>,
// invariants:
// * run_offset < length
// * consumed < length
// how many items have been taken on the current encoded run.
// 0 implies we need to advance the decoder
run_offset: usize,
// how many items have been consumed from the encoder
consumed: usize,
// how many items we must read from the page
length: usize,
// how many items must be skipped from the page
offset: usize,
}

impl<'a> OptionalPageValidity<'a> {
#[inline]
pub fn new(page: &'a DataPage) -> Self {
let (_, validity, _) = split_buffer(page);

let validity = convert(hybrid_rle::Decoder::new(validity, 1));
Self {
validity,
run_offset: 0,
consumed: 0,
length: page.num_values(),
offset: 0,
}
}

#[inline]
pub fn len(&self) -> usize {
self.length - self.consumed
}
}

pub struct Zip<V, I> {
validity: V,
values: I,
Expand Down Expand Up @@ -151,86 +113,128 @@ impl<T, V: Iterator<Item = bool>, I: Iterator<Item = T>> Iterator for Zip<V, I>
}
}

#[derive(Debug, Clone)]
pub struct OptionalPageValidity<'a> {
iter: HybridDecoderBitmapIter<'a>,
current: Option<(HybridEncoded<'a>, usize)>,
}

impl<'a> OptionalPageValidity<'a> {
pub fn new(page: &'a DataPage) -> Self {
let (_, validity, _) = split_buffer(page);

let iter = hybrid_rle::Decoder::new(validity, 1);
let iter = HybridDecoderBitmapIter::new(iter, page.num_values());
Self {
iter,
current: None,
}
}

pub fn len(&self) -> usize {
self.iter.len()
+ self
.current
.as_ref()
.map(|(run, offset)| run.len() - offset)
.unwrap_or_default()
}

fn next_limited(&mut self, limit: usize) -> Option<FilteredHybridEncoded<'a>> {
let (run, offset) = if let Some((run, offset)) = self.current {
(run, offset)
} else {
// a new run
let run = self.iter.next()?; // no run -> None
self.current = Some((run, 0));
return self.next_limited(limit);
};

match run {
HybridEncoded::Bitmap(values, length) => {
let run_length = length - offset;

let length = limit.min(run_length);

if length == run_length {
self.current = None;
} else {
self.current = Some((run, offset + length));
}

Some(FilteredHybridEncoded::Bitmap {
values,
offset,
length,
})
}
HybridEncoded::Repeated(is_set, run_length) => {
let run_length = run_length - offset;

let length = limit.min(run_length);

if length == run_length {
self.current = None;
} else {
self.current = Some((run, offset + length));
}

Some(FilteredHybridEncoded::Repeated { is_set, length })
}
}
}
}

/// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder
pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable<T>, I: Iterator<Item = T>>(
pub(super) fn extend_from_decoder<T: Default, P: Pushable<T>, I: Iterator<Item = T>>(
validity: &mut MutableBitmap,
page_validity: &mut OptionalPageValidity<'a>,
page_validity: &mut OptionalPageValidity,
limit: Option<usize>,
values: &mut P,
pushable: &mut P,
mut values_iter: I,
) {
let limit = limit.unwrap_or(usize::MAX);

// todo: remove `consumed_here` and compute next limit from `consumed`
let mut consumed_here = 0;
while consumed_here < limit {
if page_validity.run_offset == 0 {
page_validity.validity.advance()
}
if let Some(run) = page_validity.validity.get() {
match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
// a pack has at most `pack.len() * 8` bits
// during execution, we may end in the middle of a pack (run_offset != 0)
// the remaining items in the pack is given by a combination
// of the page length, the offset in the pack, and where we are in the page
let pack_size = pack.len() * 8 - page_validity.run_offset;
let remaining = page_validity.len();
let length = std::cmp::min(pack_size, remaining);

let offset = page_validity.offset.saturating_sub(page_validity.consumed);

// todo: if `offset` is larger than the run, we need to restrict `additional`
let additional = limit.min(length);

// consume `additional` items
let iter = BitmapIter::new(pack, page_validity.run_offset, offset + additional);
let iter = Zip::new(iter, &mut values_iter);
let iter = iter.skip(offset);

for item in iter {
if let Some(item) = item {
values.push(item)
} else {
values.push_null()
}
}
validity.extend_from_slice(pack, offset + page_validity.run_offset, additional);

if additional == length {
page_validity.run_offset = 0
} else {
page_validity.run_offset += additional;
};
consumed_here += additional;
page_validity.consumed += additional;
}
&hybrid_rle::HybridEncoded::Rle(value, length) => {
let is_set = value[0] == 1;
let length = length - page_validity.run_offset;

// the number of elements that will be consumed in this (run, iteration)
let additional = limit.min(length);

validity.extend_constant(additional, is_set);
if is_set {
(0..additional).for_each(|_| values.push(values_iter.next().unwrap()));
let run = page_validity.next_limited(limit);
let run = if let Some(run) = run { run } else { break };

match run {
FilteredHybridEncoded::Bitmap {
values,
offset,
length,
} => {
// consume `additional` items
let iter = BitmapIter::new(values, offset, length);
let iter = Zip::new(iter, &mut values_iter);
let iter = iter.skip(offset);

for item in iter {
if let Some(item) = item {
pushable.push(item)
} else {
values.extend_constant(additional, T::default());
pushable.push_null()
}
}
validity.extend_from_slice(values, offset, length);

if additional == length {
page_validity.run_offset = 0
} else {
page_validity.run_offset += additional;
};
consumed_here += additional;
page_validity.consumed += additional;
consumed_here += length;
}
FilteredHybridEncoded::Repeated { is_set, length } => {
validity.extend_constant(length, is_set);
if is_set {
(0..length).for_each(|_| pushable.push(values_iter.next().unwrap()));
} else {
pushable.extend_constant(length, T::default());
}
};
} else {
break;
}

consumed_here += length;
}
FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {},
};
}
}

Expand Down

0 comments on commit e95116b

Please sign in to comment.