From e95116b447e76e5989332edf2121a24969287f42 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Mon, 11 Apr 2022 06:35:08 +0000 Subject: [PATCH] Migrated to new decoder --- Cargo.toml | 22 +-- src/io/parquet/read/deserialize/utils.rs | 214 ++++++++++++----------- 2 files changed, 113 insertions(+), 123 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 831abab280e..5c1f28a50eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,7 +69,7 @@ ahash = { version = "0.7", optional = true } # parquet support #parquet2 = { version = "0.10", optional = true, default_features = false, features = ["stream"] } -parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "filtered", optional = true, default_features = false, features = ["stream"] } +parquet2 = { git = "https://github.com/jorgecarleitao/parquet2", branch = "filter", optional = true, default_features = false, features = ["stream"] } # avro support avro-schema = { version = "0.2", optional = true } @@ -108,26 +108,12 @@ rustdoc-args = ["--cfg", "docsrs"] [features] default = [] full = [ - "io_odbc", - "io_csv", - "io_csv_async", - "io_json", - "io_ipc", - "io_flight", - "io_ipc_write_async", - "io_ipc_read_async", - "io_ipc_compression", - "io_json_integration", - "io_print", "io_parquet", "io_parquet_compression", - "io_avro", - "io_avro_compression", - "io_avro_async", - "regex", - "compute", + #"regex", + #"compute", # parses timezones used in timestamp conversions - "chrono-tz", + #"chrono-tz", ] io_odbc = ["odbc-api"] io_csv = ["io_csv_read", "io_csv_write"] diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index af004a4562d..789ffec5eb6 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -1,9 +1,9 @@ use std::collections::VecDeque; +use parquet2::deserialize::{FilteredHybridEncoded, HybridDecoderBitmapIter, HybridEncoded}; use parquet2::encoding::hybrid_rle; use parquet2::page::{split_buffer as _split_buffer, DataPage}; use parquet2::schema::Repetition; -use streaming_iterator::{convert, Convert, StreamingIterator}; use crate::bitmap::utils::BitmapIter; use crate::bitmap::MutableBitmap; @@ -86,44 +86,6 @@ impl Pushable for Vec { } } -#[derive(Debug)] -pub struct OptionalPageValidity<'a> { - validity: Convert>, - // invariants: - // * run_offset < length - // * consumed < length - // how many items have been taken on the current encoded run. - // 0 implies we need to advance the decoder - run_offset: usize, - // how many items have been consumed from the encoder - consumed: usize, - // how many items we must read from the page - length: usize, - // how many items must be skipped from the page - offset: usize, -} - -impl<'a> OptionalPageValidity<'a> { - #[inline] - pub fn new(page: &'a DataPage) -> Self { - let (_, validity, _) = split_buffer(page); - - let validity = convert(hybrid_rle::Decoder::new(validity, 1)); - Self { - validity, - run_offset: 0, - consumed: 0, - length: page.num_values(), - offset: 0, - } - } - - #[inline] - pub fn len(&self) -> usize { - self.length - self.consumed - } -} - pub struct Zip { validity: V, values: I, @@ -151,12 +113,84 @@ impl, I: Iterator> Iterator for Zip } } +#[derive(Debug, Clone)] +pub struct OptionalPageValidity<'a> { + iter: HybridDecoderBitmapIter<'a>, + current: Option<(HybridEncoded<'a>, usize)>, +} + +impl<'a> OptionalPageValidity<'a> { + pub fn new(page: &'a DataPage) -> Self { + let (_, validity, _) = split_buffer(page); + + let iter = hybrid_rle::Decoder::new(validity, 1); + let iter = HybridDecoderBitmapIter::new(iter, page.num_values()); + Self { + iter, + current: None, + } + } + + pub fn len(&self) -> usize { + self.iter.len() + + self + .current + .as_ref() + .map(|(run, offset)| run.len() - offset) + .unwrap_or_default() + } + + fn next_limited(&mut self, limit: usize) -> Option> { + let (run, offset) = if let Some((run, offset)) = self.current { + (run, offset) + } else { + // a new run + let run = self.iter.next()?; // no run -> None + self.current = Some((run, 0)); + return self.next_limited(limit); + }; + + match run { + HybridEncoded::Bitmap(values, length) => { + let run_length = length - offset; + + let length = limit.min(run_length); + + if length == run_length { + self.current = None; + } else { + self.current = Some((run, offset + length)); + } + + Some(FilteredHybridEncoded::Bitmap { + values, + offset, + length, + }) + } + HybridEncoded::Repeated(is_set, run_length) => { + let run_length = run_length - offset; + + let length = limit.min(run_length); + + if length == run_length { + self.current = None; + } else { + self.current = Some((run, offset + length)); + } + + Some(FilteredHybridEncoded::Repeated { is_set, length }) + } + } + } +} + /// Extends a [`Pushable`] from an iterator of non-null values and an hybrid-rle decoder -pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator>( +pub(super) fn extend_from_decoder, I: Iterator>( validity: &mut MutableBitmap, - page_validity: &mut OptionalPageValidity<'a>, + page_validity: &mut OptionalPageValidity, limit: Option, - values: &mut P, + pushable: &mut P, mut values_iter: I, ) { let limit = limit.unwrap_or(usize::MAX); @@ -164,73 +198,43 @@ pub(super) fn extend_from_decoder<'a, T: Default, P: Pushable, I: Iterator { - // a pack has at most `pack.len() * 8` bits - // during execution, we may end in the middle of a pack (run_offset != 0) - // the remaining items in the pack is given by a combination - // of the page length, the offset in the pack, and where we are in the page - let pack_size = pack.len() * 8 - page_validity.run_offset; - let remaining = page_validity.len(); - let length = std::cmp::min(pack_size, remaining); - - let offset = page_validity.offset.saturating_sub(page_validity.consumed); - - // todo: if `offset` is larger than the run, we need to restrict `additional` - let additional = limit.min(length); - - // consume `additional` items - let iter = BitmapIter::new(pack, page_validity.run_offset, offset + additional); - let iter = Zip::new(iter, &mut values_iter); - let iter = iter.skip(offset); - - for item in iter { - if let Some(item) = item { - values.push(item) - } else { - values.push_null() - } - } - validity.extend_from_slice(pack, offset + page_validity.run_offset, additional); - - if additional == length { - page_validity.run_offset = 0 - } else { - page_validity.run_offset += additional; - }; - consumed_here += additional; - page_validity.consumed += additional; - } - &hybrid_rle::HybridEncoded::Rle(value, length) => { - let is_set = value[0] == 1; - let length = length - page_validity.run_offset; - - // the number of elements that will be consumed in this (run, iteration) - let additional = limit.min(length); - - validity.extend_constant(additional, is_set); - if is_set { - (0..additional).for_each(|_| values.push(values_iter.next().unwrap())); + let run = page_validity.next_limited(limit); + let run = if let Some(run) = run { run } else { break }; + + match run { + FilteredHybridEncoded::Bitmap { + values, + offset, + length, + } => { + // consume `additional` items + let iter = BitmapIter::new(values, offset, length); + let iter = Zip::new(iter, &mut values_iter); + let iter = iter.skip(offset); + + for item in iter { + if let Some(item) = item { + pushable.push(item) } else { - values.extend_constant(additional, T::default()); + pushable.push_null() } + } + validity.extend_from_slice(values, offset, length); - if additional == length { - page_validity.run_offset = 0 - } else { - page_validity.run_offset += additional; - }; - consumed_here += additional; - page_validity.consumed += additional; + consumed_here += length; + } + FilteredHybridEncoded::Repeated { is_set, length } => { + validity.extend_constant(length, is_set); + if is_set { + (0..length).for_each(|_| pushable.push(values_iter.next().unwrap())); + } else { + pushable.extend_constant(length, T::default()); } - }; - } else { - break; - } + + consumed_here += length; + } + FilteredHybridEncoded::Skipped(valids) => for _ in values_iter.by_ref().take(valids) {}, + }; } }