Skip to content

Commit

Permalink
perf: Reduce error bubbling in parquet hybrid_rle (#16348)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 21, 2024
1 parent a6f4cb6 commit 1cdab6f
Show file tree
Hide file tree
Showing 12 changed files with 87 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use arrow::bitmap::utils::BitmapIter;
use arrow::bitmap::MutableBitmap;
use arrow::datatypes::ArrowDataType;
use polars_error::PolarsResult;
use polars_utils::iter::FallibleIterator;

use super::super::utils::{
extend_from_decoder, get_selected_rows, next, DecodedState, Decoder,
Expand Down Expand Up @@ -201,7 +200,6 @@ impl<'a> Decoder<'a> for BooleanDecoder {
values,
&mut *page_values,
);
page_values.get_result()?;
},
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/polars-parquet/src/arrow/read/deserialize/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl<'a> PageValidity<'a> for FilteredOptionalPageValidity<'a> {
(run, offset)
} else {
// a new run
let run = self.iter.next()?.unwrap(); // no run -> None
let run = self.iter.next()?; // no run -> None
self.current = Some((run, 0));
return self.next_limited(limit);
};
Expand Down Expand Up @@ -181,7 +181,7 @@ impl<'a> OptionalPageValidity<'a> {
(run, offset)
} else {
// a new run
let run = self.iter.next()?.unwrap(); // no run -> None
let run = self.iter.next()?; // no run -> None
self.current = Some((run, 0));
return self.next_limited(limit);
};
Expand Down
33 changes: 12 additions & 21 deletions crates/polars-parquet/src/parquet/deserialize/filtered_rle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::collections::VecDeque;

use super::{HybridDecoderBitmapIter, HybridEncoded};
use crate::parquet::encoding::hybrid_rle::BitmapIter;
use crate::parquet::error::Error;
use crate::parquet::indexes::Interval;

/// Type definition of a [`FilteredHybridBitmapIter`] of [`HybridDecoderBitmapIter`].
Expand Down Expand Up @@ -54,7 +53,7 @@ impl<'a> FilteredHybridEncoded<'a> {
///
/// This iterator adapter is used in combination with
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FilteredHybridBitmapIter<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> {
pub struct FilteredHybridBitmapIter<'a, I: Iterator<Item = HybridEncoded<'a>>> {
iter: I,
current: Option<(HybridEncoded<'a>, usize)>,
// a run may end in the middle of an interval, in which case we must
Expand All @@ -66,7 +65,7 @@ pub struct FilteredHybridBitmapIter<'a, I: Iterator<Item = Result<HybridEncoded<
total_items: usize,
}

impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> FilteredHybridBitmapIter<'a, I> {
impl<'a, I: Iterator<Item = HybridEncoded<'a>>> FilteredHybridBitmapIter<'a, I> {
pub fn new(iter: I, selected_rows: VecDeque<Interval>) -> Self {
let total_items = selected_rows.iter().map(|x| x.length).sum();
Self {
Expand Down Expand Up @@ -99,10 +98,8 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> FilteredHybridBit
}
}

impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
for FilteredHybridBitmapIter<'a, I>
{
type Item = Result<FilteredHybridEncoded<'a>, Error>;
impl<'a, I: Iterator<Item = HybridEncoded<'a>>> Iterator for FilteredHybridBitmapIter<'a, I> {
type Item = FilteredHybridEncoded<'a>;

fn next(&mut self) -> Option<Self::Item> {
let interval = if let Some(interval) = self.current_interval {
Expand All @@ -116,14 +113,8 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
let (run, offset) = if let Some((run, offset)) = self.current {
(run, offset)
} else {
self.current = Some((self.iter.next()?, 0));
// a new run
let run = self.iter.next()?; // no run => something wrong since intervals should only slice items up all runs' length
match run {
Ok(run) => {
self.current = Some((run, 0));
},
Err(e) => return Some(Err(e)),
}
return self.next();
};

Expand Down Expand Up @@ -157,7 +148,7 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
Some((run, offset + to_skip))
};

return Some(Ok(FilteredHybridEncoded::Skipped(set)));
return Some(FilteredHybridEncoded::Skipped(set));
};

// slice the bitmap according to current interval
Expand All @@ -170,7 +161,7 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
self.advance_current_interval(run_length);
self.current_items_in_runs += run_length;
self.current = None;
Some(Ok(FilteredHybridEncoded::Skipped(set)))
Some(FilteredHybridEncoded::Skipped(set))
} else {
let length = if run_length > interval.length {
// interval is fully consumed
Expand All @@ -196,7 +187,7 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
self.current = None;
length
};
Some(Ok(FilteredHybridEncoded::Repeated { is_set, length }))
Some(FilteredHybridEncoded::Repeated { is_set, length })
}
},
HybridEncoded::Bitmap(values, full_run_length) => {
Expand All @@ -223,7 +214,7 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
Some((run, offset + to_skip))
};

return Some(Ok(FilteredHybridEncoded::Skipped(set)));
return Some(FilteredHybridEncoded::Skipped(set));
};

// slice the bitmap according to current interval
Expand All @@ -236,7 +227,7 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
self.advance_current_interval(run_length);
self.current_items_in_runs += run_length;
self.current = None;
Some(Ok(FilteredHybridEncoded::Skipped(set)))
Some(FilteredHybridEncoded::Skipped(set))
} else {
let length = if run_length > interval.length {
// interval is fully consumed
Expand All @@ -262,11 +253,11 @@ impl<'a, I: Iterator<Item = Result<HybridEncoded<'a>, Error>>> Iterator
self.current = None;
length
};
Some(Ok(FilteredHybridEncoded::Bitmap {
Some(FilteredHybridEncoded::Bitmap {
values,
offset: new_offset,
length,
}))
})
}
},
}
Expand Down
44 changes: 11 additions & 33 deletions crates/polars-parquet/src/parquet/deserialize/hybrid_rle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use polars_utils::iter::FallibleIterator;

use crate::parquet::encoding::hybrid_rle::{self, BitmapIter};
use crate::parquet::error::Error;

/// The decoding state of the hybrid-RLE decoder with a maximum definition level of 1
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -29,7 +26,7 @@ impl<'a> HybridEncoded<'a> {
}
}

pub trait HybridRleRunsIterator<'a>: Iterator<Item = Result<HybridEncoded<'a>, Error>> {
pub trait HybridRleRunsIterator<'a>: Iterator<Item = HybridEncoded<'a>> {
/// Number of elements remaining. This may not be the items of the iterator - an item
/// of the iterator may contain more than one element.
fn number_of_elements(&self) -> usize;
Expand All @@ -39,7 +36,7 @@ pub trait HybridRleRunsIterator<'a>: Iterator<Item = Result<HybridEncoded<'a>, E
#[derive(Debug, Clone)]
pub struct HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>,
{
iter: I,
length: usize,
Expand All @@ -48,7 +45,7 @@ where

impl<'a, I> HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>,
{
/// Returns a new [`HybridRleIter`]
#[inline]
Expand All @@ -74,7 +71,7 @@ where

impl<'a, I> HybridRleRunsIterator<'a> for HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>,
{
fn number_of_elements(&self) -> usize {
self.len()
Expand All @@ -83,18 +80,18 @@ where

impl<'a, I> Iterator for HybridRleIter<'a, I>
where
I: Iterator<Item = Result<hybrid_rle::HybridEncoded<'a>, Error>>,
I: Iterator<Item = hybrid_rle::HybridEncoded<'a>>,
{
type Item = Result<HybridEncoded<'a>, Error>;
type Item = HybridEncoded<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.consumed == self.length {
return None;
};
let run = self.iter.next()?;
let run = self.iter.next();

Some(run.map(|run| match run {
run.map(|run| match run {
hybrid_rle::HybridEncoded::Bitpacked(pack) => {
// a pack has at most `pack.len() * 8` bits
let pack_size = pack.len() * 8;
Expand All @@ -112,7 +109,7 @@ where
self.consumed += additional;
HybridEncoded::Repeated(is_set, additional)
},
}))
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
Expand All @@ -137,11 +134,10 @@ enum HybridBooleanState<'a> {
#[derive(Debug)]
pub struct HybridRleBooleanIter<'a, I>
where
I: Iterator<Item = Result<HybridEncoded<'a>, Error>>,
I: Iterator<Item = HybridEncoded<'a>>,
{
iter: I,
current_run: Option<HybridBooleanState<'a>>,
result: Result<(), Error>,
}

impl<'a, I> HybridRleBooleanIter<'a, I>
Expand All @@ -152,19 +148,10 @@ where
Self {
iter,
current_run: None,
result: Ok(()),
}
}

fn set_new_run(&mut self, run: Result<HybridEncoded<'a>, Error>) -> Option<bool> {
let run = match run {
Err(e) => {
self.result = Err(e);
return None;
},
Ok(r) => r,
};

fn set_new_run(&mut self, run: HybridEncoded<'a>) -> Option<bool> {
let run = match run {
HybridEncoded::Bitmap(bitmap, length) => {
HybridBooleanState::Bitmap(BitmapIter::new(bitmap, 0, length))
Expand Down Expand Up @@ -217,14 +204,5 @@ where
}
}

impl<'a, I> FallibleIterator<Error> for HybridRleBooleanIter<'a, I>
where
I: HybridRleRunsIterator<'a>,
{
fn get_result(&mut self) -> Result<(), Error> {
self.result.clone()
}
}

/// Type definition for a [`HybridRleBooleanIter`] using [`hybrid_rle::Decoder`].
pub type HybridRleDecoderIter<'a> = HybridRleBooleanIter<'a, HybridDecoderBitmapIter<'a>>;
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl<'a> Block<'a> {
let length = std::cmp::min(length, num_mini_blocks * values_per_mini_block);

let mut consumed_bytes = 0;
let (min_delta, consumed) = zigzag_leb128::decode(values)?;
let (min_delta, consumed) = zigzag_leb128::decode(values);
consumed_bytes += consumed;
values = &values[consumed..];

Expand Down Expand Up @@ -133,19 +133,19 @@ pub struct Decoder<'a> {
impl<'a> Decoder<'a> {
pub fn try_new(mut values: &'a [u8]) -> Result<Self, Error> {
let mut consumed_bytes = 0;
let (block_size, consumed) = uleb128::decode(values)?;
let (block_size, consumed) = uleb128::decode(values);
consumed_bytes += consumed;
assert_eq!(block_size % 128, 0);
values = &values[consumed..];
let (num_mini_blocks, consumed) = uleb128::decode(values)?;
let (num_mini_blocks, consumed) = uleb128::decode(values);
let num_mini_blocks = num_mini_blocks as usize;
consumed_bytes += consumed;
values = &values[consumed..];
let (total_count, consumed) = uleb128::decode(values)?;
let (total_count, consumed) = uleb128::decode(values);
let total_count = total_count as usize;
consumed_bytes += consumed;
values = &values[consumed..];
let (first_value, consumed) = zigzag_leb128::decode(values)?;
let (first_value, consumed) = zigzag_leb128::decode(values);
consumed_bytes += consumed;
values = &values[consumed..];

Expand Down
18 changes: 7 additions & 11 deletions crates/polars-parquet/src/parquet/encoding/hybrid_rle/decoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use polars_utils::slice::GetSaferUnchecked;

use super::super::{ceil8, uleb128};
use super::HybridEncoded;
use crate::parquet::error::Error;

/// An [`Iterator`] of [`HybridEncoded`].
#[derive(Debug, Clone)]
Expand All @@ -25,14 +24,11 @@ impl<'a> Decoder<'a> {
}

impl<'a> Iterator for Decoder<'a> {
type Item = Result<HybridEncoded<'a>, Error>;
type Item = HybridEncoded<'a>;

#[inline] // -18% improvement in bench
fn next(&mut self) -> Option<Self::Item> {
let (indicator, consumed) = match uleb128::decode(self.values) {
Ok((indicator, consumed)) => (indicator, consumed),
Err(e) => return Some(Err(e)),
};
let (indicator, consumed) = uleb128::decode(self.values);
self.values = unsafe { self.values.get_unchecked_release(consumed..) };

// We want to early return if consumed == 0 OR num_bits == 0, so combine into a single branch.
Expand All @@ -46,15 +42,15 @@ impl<'a> Iterator for Decoder<'a> {
let bytes = std::cmp::min(bytes, self.values.len());
let (result, remaining) = self.values.split_at(bytes);
self.values = remaining;
Some(Ok(HybridEncoded::Bitpacked(result)))
Some(HybridEncoded::Bitpacked(result))
} else {
// is rle
let run_length = indicator as usize >> 1;
// repeated-value := value that is repeated, using a fixed-width of round-up-to-next-byte(bit-width)
let rle_bytes = ceil8(self.num_bits);
let (result, remaining) = self.values.split_at(rle_bytes);
self.values = remaining;
Some(Ok(HybridEncoded::Rle(result, run_length)))
Some(HybridEncoded::Rle(result, run_length))
}
}
}
Expand All @@ -77,7 +73,7 @@ mod tests {

let run = decoder.next().unwrap();

if let HybridEncoded::Bitpacked(values) = run.unwrap() {
if let HybridEncoded::Bitpacked(values) = run {
assert_eq!(values, &[0b00001011]);
let result = bitpacked::Decoder::<u32>::try_new(values, bit_width, length)
.unwrap()
Expand All @@ -103,7 +99,7 @@ mod tests {

let run = decoder.next().unwrap();

if let HybridEncoded::Bitpacked(values) = run.unwrap() {
if let HybridEncoded::Bitpacked(values) = run {
assert_eq!(values, &[0b11101011, 0b00000010]);
let result = bitpacked::Decoder::<u32>::try_new(values, bit_width, 10)
.unwrap()
Expand All @@ -128,7 +124,7 @@ mod tests {

let run = decoder.next().unwrap();

if let HybridEncoded::Rle(values, items) = run.unwrap() {
if let HybridEncoded::Rle(values, items) = run {
assert_eq!(values, &[0b00000001]);
assert_eq!(items, length);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ pub struct HybridRleDecoder<'a> {

#[inline]
fn read_next<'a>(decoder: &mut Decoder<'a>, remaining: usize) -> Result<State<'a>, Error> {
Ok(match decoder.next().transpose()? {
Ok(match decoder.next() {
Some(HybridEncoded::Bitpacked(packed)) => {
let num_bits = decoder.num_bits();
let length = std::cmp::min(packed.len() * 8 / num_bits, remaining);
Expand Down
Loading

0 comments on commit 1cdab6f

Please sign in to comment.