Skip to content

Commit

Permalink
Simplified PageIterator (#103)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 19, 2022
1 parent d6425e2 commit a6e6cf3
Show file tree
Hide file tree
Showing 7 changed files with 56 additions and 51 deletions.
3 changes: 0 additions & 3 deletions src/parquet_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ pub enum PageType {
DataPage,
DataPageV2,
DictionaryPage,
IndexPage,
}

impl TryFrom<ParquetPageType> for PageType {
Expand All @@ -101,7 +100,6 @@ impl TryFrom<ParquetPageType> for PageType {
ParquetPageType::DATA_PAGE => PageType::DataPage,
ParquetPageType::DATA_PAGE_V2 => PageType::DataPageV2,
ParquetPageType::DICTIONARY_PAGE => PageType::DictionaryPage,
ParquetPageType::INDEX_PAGE => PageType::IndexPage,
_ => return Err(ParquetError::OutOfSpec("Thrift out of range".to_string())),
})
}
Expand All @@ -113,7 +111,6 @@ impl From<PageType> for ParquetPageType {
PageType::DataPage => ParquetPageType::DATA_PAGE,
PageType::DataPageV2 => ParquetPageType::DATA_PAGE_V2,
PageType::DictionaryPage => ParquetPageType::DICTIONARY_PAGE,
PageType::IndexPage => ParquetPageType::INDEX_PAGE,
}
}
}
Expand Down
35 changes: 18 additions & 17 deletions src/read/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::error::{ParquetError, Result};
use crate::page::{CompressedDataPage, DataPage, DataPageHeader};
use crate::FallibleStreamingIterator;

use super::PageIterator;
use super::page::PageIterator;

fn decompress_v1(compressed: &[u8], compression: Compression, buffer: &mut [u8]) -> Result<()> {
compression::decompress(compression, compressed, buffer)
Expand Down Expand Up @@ -93,9 +93,9 @@ pub fn decompress(
))
}

fn decompress_reuse<R: std::io::Read>(
fn decompress_reuse<P: PageIterator>(
mut compressed_page: CompressedDataPage,
iterator: &mut PageIterator<R>,
iterator: &mut P,
buffer: &mut Vec<u8>,
) -> Result<(DataPage, bool)> {
let was_decompressed = decompress_buffer(&mut compressed_page, buffer)?;
Expand All @@ -108,20 +108,20 @@ fn decompress_reuse<R: std::io::Read>(
);

if was_decompressed {
iterator.reuse_buffer(compressed_page.buffer)
iterator.swap_buffer(&mut compressed_page.buffer)
};
Ok((new_page, was_decompressed))
}

/// Decompressor that allows re-using the page buffer of [`PageIterator`].
/// Decompressor that allows re-using the page buffer of [`PageReader`].
/// # Implementation
/// The implementation depends on whether a page is compressed or not.
/// > `PageIterator(a)`, `CompressedPage(b)`, `Decompressor(c)`, `DecompressedPage(d)`
/// > `PageReader(a)`, `CompressedPage(b)`, `Decompressor(c)`, `DecompressedPage(d)`
/// ### un-compressed pages:
/// > page iter: `a` is swapped with `b`
/// > decompress iter: `b` is swapped with `d`, `b` is swapped with `a`
/// therefore:
/// * `PageIterator` has its buffer back
/// * `PageReader` has its buffer back
/// * `Decompressor`'s buffer is un-used
/// * `DecompressedPage` has the same data as `CompressedPage` had
/// ### compressed pages:
Expand All @@ -132,23 +132,23 @@ fn decompress_reuse<R: std::io::Read>(
/// > * `c` is moved to `d`
/// > * (next iteration): `d` is moved to `c`
/// therefore, while the page is available:
/// * `PageIterator` has its buffer back
/// * `PageReader` has its buffer back
/// * `Decompressor`'s buffer empty
/// * `DecompressedPage` has the decompressed buffer
/// after the page is used:
/// * `PageIterator` has its buffer back
/// * `PageReader` has its buffer back
/// * `Decompressor` has its buffer back
/// * `DecompressedPage` has an empty buffer
pub struct Decompressor<R: std::io::Read> {
iter: PageIterator<R>,
pub struct Decompressor<P: PageIterator> {
iter: P,
buffer: Vec<u8>,
current: Option<DataPage>,
was_decompressed: bool,
}

impl<R: std::io::Read> Decompressor<R> {
impl<P: PageIterator> Decompressor<P> {
/// Creates a new [`Decompressor`].
pub fn new(iter: PageIterator<R>, buffer: Vec<u8>) -> Self {
pub fn new(iter: P, buffer: Vec<u8>) -> Self {
Self {
iter,
buffer,
Expand All @@ -159,13 +159,14 @@ impl<R: std::io::Read> Decompressor<R> {

/// Returns two buffers: the first buffer corresponds to the page buffer,
/// the second to the decompression buffer.
pub fn into_buffers(self) -> (Vec<u8>, Vec<u8>) {
let page_buffer = self.iter.into_buffer();
pub fn into_buffers(mut self) -> (Vec<u8>, Vec<u8>) {
let mut page_buffer = vec![];
self.iter.swap_buffer(&mut page_buffer);
(page_buffer, self.buffer)
}
}

impl<R: std::io::Read> FallibleStreamingIterator for Decompressor<R> {
impl<P: PageIterator> FallibleStreamingIterator for Decompressor<P> {
type Item = DataPage;
type Error = ParquetError;

Expand All @@ -174,7 +175,7 @@ impl<R: std::io::Read> FallibleStreamingIterator for Decompressor<R> {
if self.was_decompressed {
self.buffer = std::mem::take(&mut page.buffer);
} else {
self.iter.reuse_buffer(std::mem::take(&mut page.buffer));
self.iter.swap_buffer(&mut page.buffer);
}
}

Expand Down
22 changes: 10 additions & 12 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
mod compression;
pub mod levels;
mod metadata;
mod page_iterator;
#[cfg(feature = "stream")]
mod page_stream;
mod page;
#[cfg(feature = "stream")]
mod stream;

Expand All @@ -13,9 +11,9 @@ use std::vec::IntoIter;

pub use compression::{decompress, BasicDecompressor, Decompressor};
pub use metadata::read_metadata;
pub use page_iterator::{PageFilter, PageIterator};
#[cfg(feature = "stream")]
pub use page_stream::get_page_stream;
pub use page::get_page_stream;
pub use page::{PageFilter, PageReader};
#[cfg(feature = "stream")]
pub use stream::read_metadata as read_metadata_async;

Expand All @@ -42,18 +40,18 @@ pub fn filter_row_groups(
metadata
}

/// Returns a new [`PageIterator`] by seeking `reader` to the begining of `column_chunk`.
/// Returns a new [`PageReader`] by seeking `reader` to the begining of `column_chunk`.
pub fn get_page_iterator<R: Read + Seek>(
column_chunk: &ColumnChunkMetaData,
mut reader: R,
pages_filter: Option<PageFilter>,
buffer: Vec<u8>,
) -> Result<PageIterator<R>> {
) -> Result<PageReader<R>> {
let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true));

let (col_start, _) = column_chunk.byte_range();
reader.seek(SeekFrom::Start(col_start))?;
Ok(PageIterator::new(
Ok(PageReader::new(
reader,
column_chunk.num_values(),
column_chunk.compression(),
Expand Down Expand Up @@ -127,13 +125,13 @@ pub trait ColumnChunkIter<I>:
}

/// A [`MutStreamingIterator`] that reads column chunks one by one,
/// returning a [`PageIterator`] per column.
/// returning a [`PageReader`] per column.
pub struct ColumnIterator<R: Read + Seek> {
reader: Option<R>,
field: ParquetType,
columns: Vec<ColumnChunkMetaData>,
page_filter: Option<PageFilter>,
current: Option<(PageIterator<R>, ColumnChunkMetaData)>,
current: Option<(PageReader<R>, ColumnChunkMetaData)>,
page_buffer: Vec<u8>,
}

Expand All @@ -158,7 +156,7 @@ impl<R: Read + Seek> ColumnIterator<R> {
}

impl<R: Read + Seek> MutStreamingIterator for ColumnIterator<R> {
type Item = (PageIterator<R>, ColumnChunkMetaData);
type Item = (PageReader<R>, ColumnChunkMetaData);
type Error = ParquetError;

fn advance(mut self) -> Result<State<Self>> {
Expand Down Expand Up @@ -189,7 +187,7 @@ impl<R: Read + Seek> MutStreamingIterator for ColumnIterator<R> {
}
}

impl<R: Read + Seek> ColumnChunkIter<PageIterator<R>> for ColumnIterator<R> {
impl<R: Read + Seek> ColumnChunkIter<PageReader<R>> for ColumnIterator<R> {
fn field(&self) -> &ParquetType {
&self.field
}
Expand Down
14 changes: 14 additions & 0 deletions src/read/page/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
mod reader;
#[cfg(feature = "stream")]
mod stream;

use crate::{error::ParquetError, page::CompressedDataPage};

pub use reader::{PageFilter, PageReader};

pub trait PageIterator: Iterator<Item = Result<CompressedDataPage, ParquetError>> {
fn swap_buffer(&mut self, buffer: &mut Vec<u8>);
}

#[cfg(feature = "stream")]
pub use stream::get_page_stream;
29 changes: 13 additions & 16 deletions src/read/page_iterator.rs → src/read/page/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ use crate::page::{
ParquetPageHeader,
};

use super::PageIterator;

/// Type declaration for a page filter
pub type PageFilter = Arc<dyn Fn(&ColumnDescriptor, &DataPageHeader) -> bool + Send + Sync>;

/// A page iterator iterates over row group's pages. In parquet, pages are guaranteed to be
/// contiguously arranged in memory and therefore must be read in sequence.
pub struct PageIterator<R: Read> {
pub struct PageReader<R: Read> {
// The source
reader: R,

Expand All @@ -40,7 +42,7 @@ pub struct PageIterator<R: Read> {
pub(crate) buffer: Vec<u8>,
}

impl<R: Read> PageIterator<R> {
impl<R: Read> PageReader<R> {
pub fn new(
reader: R,
total_num_values: i64,
Expand Down Expand Up @@ -68,20 +70,18 @@ impl<R: Read> PageIterator<R> {
Ok(page_header)
}

pub fn reuse_buffer(&mut self, buffer: Vec<u8>) {
self.buffer = buffer;
}

pub fn into_buffer(self) -> Vec<u8> {
self.buffer
}

pub fn into_inner(self) -> (R, Vec<u8>) {
(self.reader, self.buffer)
}
}

impl<R: Read> Iterator for PageIterator<R> {
impl<R: Read> PageIterator for PageReader<R> {
fn swap_buffer(&mut self, buffer: &mut Vec<u8>) {
std::mem::swap(&mut self.buffer, buffer)
}
}

impl<R: Read> Iterator for PageReader<R> {
type Item = Result<CompressedDataPage>;

fn next(&mut self) -> Option<Self::Item> {
Expand All @@ -107,7 +107,7 @@ impl<R: Read> Iterator for PageIterator<R> {
/// This function is lightweight and executes a minimal amount of work so that it is IO bounded.
// Any un-necessary CPU-intensive tasks SHOULD be executed on individual pages.
fn next_page<R: Read>(
reader: &mut PageIterator<R>,
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> Result<Option<CompressedDataPage>> {
let total_values = reader.total_num_values;
Expand All @@ -127,7 +127,7 @@ fn next_page<R: Read>(
}

fn build_page<R: Read>(
reader: &mut PageIterator<R>,
reader: &mut PageReader<R>,
buffer: &mut Vec<u8>,
) -> Result<Option<CompressedDataPage>> {
let page_header = reader.read_page_header()?;
Expand Down Expand Up @@ -160,14 +160,12 @@ fn build_page<R: Read>(
reader.current_dictionary = Some(dict);
Ok(None)
}
_ => Ok(None),
}
}

pub(super) enum FinishedPage {
Data(CompressedDataPage),
Dict(Arc<dyn DictPage>),
Index,
}

pub(super) fn finish_page(
Expand Down Expand Up @@ -222,7 +220,6 @@ pub(super) fn finish_page(
descriptor.clone(),
)))
}
PageType::IndexPage => Ok(FinishedPage::Index),
}
}

Expand Down
3 changes: 1 addition & 2 deletions src/read/page_stream.rs → src/read/page/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::error::Result;
use crate::metadata::{ColumnChunkMetaData, ColumnDescriptor};
use crate::page::{CompressedDataPage, ParquetPageHeader};

use super::page_iterator::{finish_page, get_page_header, FinishedPage};
use super::reader::{finish_page, get_page_header, FinishedPage};
use super::PageFilter;

/// Returns a stream of compressed data pages
Expand Down Expand Up @@ -81,7 +81,6 @@ fn _get_page_stream<'a, R: AsyncRead + AsyncSeek + Unpin + Send>(
current_dictionary = Some(dict);
continue
}
_ => continue,
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/write/column_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ fn build_column_chunk(
.unwrap()
.encoding,
],
_ => todo!(),
}
})
.collect::<HashSet<_>>() // unique
Expand Down

0 comments on commit a6e6cf3

Please sign in to comment.