diff --git a/src/indexes/intervals.rs b/src/indexes/intervals.rs index 1506dac99..ce235db03 100644 --- a/src/indexes/intervals.rs +++ b/src/indexes/intervals.rs @@ -2,8 +2,6 @@ use parquet_format_async_temp::PageLocation; use crate::error::ParquetError; -use super::index::PageIndex; - /// An interval #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Interval { @@ -21,7 +19,7 @@ impl Interval { } /// Returns the set of (row) intervals of the pages. -fn compute_row_page_intervals( +fn compute_page_row_intervals( locations: &[PageLocation], num_rows: u64, ) -> Result, ParquetError> { @@ -49,25 +47,25 @@ fn compute_row_page_intervals( /// Returns the set of intervals `(start, len)` containing all the /// selected rows (for a given column) -pub fn compute_rows<'a, T>( - index: &'a [PageIndex], +pub fn compute_rows( + selected: &[bool], locations: &[PageLocation], num_rows: u64, - selector: &dyn Fn(&'a PageIndex) -> bool, ) -> Result, ParquetError> { - let page_intervals = compute_row_page_intervals(locations, num_rows)?; + let page_intervals = compute_page_row_intervals(locations, num_rows)?; - Ok(index + Ok(selected .iter() .zip(page_intervals.iter().copied()) - .filter_map(|(index, page)| { - let is_selected = selector(index); - if is_selected { - Some(page) - } else { - None - } - }) + .filter_map( + |(&is_selected, page)| { + if is_selected { + Some(page) + } else { + None + } + }, + ) .collect()) } @@ -121,7 +119,7 @@ pub fn select_pages( locations: &[PageLocation], num_rows: u64, ) -> Result, ParquetError> { - let page_intervals = compute_row_page_intervals(locations, num_rows)?; + let page_intervals = compute_page_row_intervals(locations, num_rows)?; page_intervals .into_iter() diff --git a/src/indexes/mod.rs b/src/indexes/mod.rs index 6af524587..e0b80dcdf 100644 --- a/src/indexes/mod.rs +++ b/src/indexes/mod.rs @@ -15,15 +15,6 @@ mod tests { #[test] fn test_basic() { - let index = NativeIndex { - primitive_type: PrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32), - indexes: vec![PageIndex { - min: Some(0i32), - max: Some(10), - null_count: Some(0), - }], - boundary_order: Default::default(), - }; let locations = &[PageLocation { offset: 100, compressed_page_size: 10, @@ -31,9 +22,7 @@ mod tests { }]; let num_rows = 10; - let selector = |_| true; - - let row_intervals = compute_rows(&index.indexes, locations, num_rows, &selector).unwrap(); + let row_intervals = compute_rows(&[true; 1], locations, num_rows).unwrap(); assert_eq!(row_intervals, vec![Interval::new(0, 10)]) } @@ -77,8 +66,9 @@ mod tests { .map(|x| x.as_slice() > &[97]) .unwrap_or(false) // no max is present => all nulls => not selected }; + let selected = index.indexes.iter().map(selector).collect::>(); - let rows = compute_rows(&index.indexes, locations, num_rows, &selector).unwrap(); + let rows = compute_rows(&selected, locations, num_rows).unwrap(); assert_eq!(rows, vec![Interval::new(5, 5)]); let pages = select_pages(&rows, locations, num_rows).unwrap(); diff --git a/src/read/mod.rs b/src/read/mod.rs index d1235a34f..44ff10a19 100644 --- a/src/read/mod.rs +++ b/src/read/mod.rs @@ -19,7 +19,6 @@ pub use page::{IndexedPageReader, PageFilter, PageReader}; pub use stream::read_metadata as read_metadata_async; use crate::error::ParquetError; -use crate::indexes::FilteredPage; use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData}; use crate::page::CompressedDataPage; use crate::schema::types::ParquetType; @@ -65,25 +64,6 @@ pub fn get_page_iterator( )) } -/// Returns a new [`IndexedPageReader`] by seeking `reader` to the begining of `column_chunk`. -pub fn get_indexed_page_reader( - column_chunk: &ColumnChunkMetaData, - reader: R, - pages: Vec, - buffer: Vec, - data_buffer: Vec, -) -> Result> { - Ok(IndexedPageReader::new( - reader, - column_chunk.compression(), - column_chunk.descriptor().descriptor.clone(), - column_chunk.byte_range().0, - pages, - buffer, - data_buffer, - )) -} - /// Returns an [`Iterator`] of [`ColumnChunkMetaData`] corresponding to the columns /// from `field` at `row_group`. /// For primitive fields (e.g. `i64`), the iterator has exactly one item. diff --git a/src/read/page/indexed_reader.rs b/src/read/page/indexed_reader.rs index 11997d6e3..03151a84f 100644 --- a/src/read/page/indexed_reader.rs +++ b/src/read/page/indexed_reader.rs @@ -7,7 +7,7 @@ use std::{ use crate::{ error::ParquetError, indexes::FilteredPage, - metadata::Descriptor, + metadata::{ColumnChunkMetaData, Descriptor}, page::{CompressedDataPage, DictPage, ParquetPageHeader}, parquet_bridge::Compression, }; @@ -102,15 +102,15 @@ fn read_dict_page( } impl IndexedPageReader { + /// Returns a new [`IndexedPageReader`]. pub fn new( reader: R, - compression: Compression, - descriptor: Descriptor, - column_start: u64, + column: &ColumnChunkMetaData, pages: Vec, buffer: Vec, data_buffer: Vec, ) -> Self { + let column_start = column.byte_range().0; // a dictionary page exists iff the first data page is not at the start of // the column let dictionary = match pages.get(0) { @@ -128,8 +128,8 @@ impl IndexedPageReader { let pages = pages.into_iter().collect(); Self { reader, - compression, - descriptor, + compression: column.compression(), + descriptor: column.descriptor().descriptor.clone(), buffer, data_buffer, pages,