Skip to content

Commit

Permalink
Improved API
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 22, 2022
1 parent 9211367 commit ace0757
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 56 deletions.
32 changes: 15 additions & 17 deletions src/indexes/intervals.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ use parquet_format_async_temp::PageLocation;

use crate::error::ParquetError;

use super::index::PageIndex;

/// An interval
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Interval {
Expand All @@ -21,7 +19,7 @@ impl Interval {
}

/// Returns the set of (row) intervals of the pages.
fn compute_row_page_intervals(
fn compute_page_row_intervals(
locations: &[PageLocation],
num_rows: u64,
) -> Result<Vec<Interval>, ParquetError> {
Expand Down Expand Up @@ -49,25 +47,25 @@ fn compute_row_page_intervals(

/// Returns the set of intervals `(start, len)` containing all the
/// selected rows (for a given column)
pub fn compute_rows<'a, T>(
index: &'a [PageIndex<T>],
pub fn compute_rows(
selected: &[bool],
locations: &[PageLocation],
num_rows: u64,
selector: &dyn Fn(&'a PageIndex<T>) -> bool,
) -> Result<Vec<Interval>, ParquetError> {
let page_intervals = compute_row_page_intervals(locations, num_rows)?;
let page_intervals = compute_page_row_intervals(locations, num_rows)?;

Ok(index
Ok(selected
.iter()
.zip(page_intervals.iter().copied())
.filter_map(|(index, page)| {
let is_selected = selector(index);
if is_selected {
Some(page)
} else {
None
}
})
.filter_map(
|(&is_selected, page)| {
if is_selected {
Some(page)
} else {
None
}
},
)
.collect())
}

Expand Down Expand Up @@ -121,7 +119,7 @@ pub fn select_pages(
locations: &[PageLocation],
num_rows: u64,
) -> Result<Vec<FilteredPage>, ParquetError> {
let page_intervals = compute_row_page_intervals(locations, num_rows)?;
let page_intervals = compute_page_row_intervals(locations, num_rows)?;

page_intervals
.into_iter()
Expand Down
16 changes: 3 additions & 13 deletions src/indexes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,14 @@ mod tests {

#[test]
fn test_basic() {
let index = NativeIndex {
primitive_type: PrimitiveType::from_physical("c1".to_string(), PhysicalType::Int32),
indexes: vec![PageIndex {
min: Some(0i32),
max: Some(10),
null_count: Some(0),
}],
boundary_order: Default::default(),
};
let locations = &[PageLocation {
offset: 100,
compressed_page_size: 10,
first_row_index: 0,
}];
let num_rows = 10;

let selector = |_| true;

let row_intervals = compute_rows(&index.indexes, locations, num_rows, &selector).unwrap();
let row_intervals = compute_rows(&[true; 1], locations, num_rows).unwrap();
assert_eq!(row_intervals, vec![Interval::new(0, 10)])
}

Expand Down Expand Up @@ -77,8 +66,9 @@ mod tests {
.map(|x| x.as_slice() > &[97])
.unwrap_or(false) // no max is present => all nulls => not selected
};
let selected = index.indexes.iter().map(selector).collect::<Vec<_>>();

let rows = compute_rows(&index.indexes, locations, num_rows, &selector).unwrap();
let rows = compute_rows(&selected, locations, num_rows).unwrap();
assert_eq!(rows, vec![Interval::new(5, 5)]);

let pages = select_pages(&rows, locations, num_rows).unwrap();
Expand Down
20 changes: 0 additions & 20 deletions src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub use page::{IndexedPageReader, PageFilter, PageReader};
pub use stream::read_metadata as read_metadata_async;

use crate::error::ParquetError;
use crate::indexes::FilteredPage;
use crate::metadata::{ColumnChunkMetaData, RowGroupMetaData};
use crate::page::CompressedDataPage;
use crate::schema::types::ParquetType;
Expand Down Expand Up @@ -65,25 +64,6 @@ pub fn get_page_iterator<R: Read + Seek>(
))
}

/// Returns a new [`IndexedPageReader`] by seeking `reader` to the begining of `column_chunk`.
pub fn get_indexed_page_reader<R: Read + Seek>(
column_chunk: &ColumnChunkMetaData,
reader: R,
pages: Vec<FilteredPage>,
buffer: Vec<u8>,
data_buffer: Vec<u8>,
) -> Result<IndexedPageReader<R>> {
Ok(IndexedPageReader::new(
reader,
column_chunk.compression(),
column_chunk.descriptor().descriptor.clone(),
column_chunk.byte_range().0,
pages,
buffer,
data_buffer,
))
}

/// Returns an [`Iterator`] of [`ColumnChunkMetaData`] corresponding to the columns
/// from `field` at `row_group`.
/// For primitive fields (e.g. `i64`), the iterator has exactly one item.
Expand Down
12 changes: 6 additions & 6 deletions src/read/page/indexed_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::{
use crate::{
error::ParquetError,
indexes::FilteredPage,
metadata::Descriptor,
metadata::{ColumnChunkMetaData, Descriptor},
page::{CompressedDataPage, DictPage, ParquetPageHeader},
parquet_bridge::Compression,
};
Expand Down Expand Up @@ -102,15 +102,15 @@ fn read_dict_page<R: Read + Seek>(
}

impl<R: Read + Seek> IndexedPageReader<R> {
/// Returns a new [`IndexedPageReader`].
pub fn new(
reader: R,
compression: Compression,
descriptor: Descriptor,
column_start: u64,
column: &ColumnChunkMetaData,
pages: Vec<FilteredPage>,
buffer: Vec<u8>,
data_buffer: Vec<u8>,
) -> Self {
let column_start = column.byte_range().0;
// a dictionary page exists iff the first data page is not at the start of
// the column
let dictionary = match pages.get(0) {
Expand All @@ -128,8 +128,8 @@ impl<R: Read + Seek> IndexedPageReader<R> {
let pages = pages.into_iter().collect();
Self {
reader,
compression,
descriptor,
compression: column.compression(),
descriptor: column.descriptor().descriptor.clone(),
buffer,
data_buffer,
pages,
Expand Down

0 comments on commit ace0757

Please sign in to comment.