Skip to content

Commit

Permalink
Added support for page-level filter pushdown (column and offset index…
Browse files Browse the repository at this point in the history
…es) (#107)
  • Loading branch information
jorgecarleitao committed Mar 25, 2022
1 parent 321c991 commit 7cac93e
Show file tree
Hide file tree
Showing 34 changed files with 1,704 additions and 168 deletions.
28 changes: 12 additions & 16 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parquet2::bloom_filter;
use parquet2::error::Result;
use parquet2::indexes;

// ANCHOR: deserialize
use parquet2::encoding::Encoding;
Expand Down Expand Up @@ -52,24 +51,21 @@ fn main() -> Result<()> {
// ANCHOR: column_metadata
let row_group = 0;
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let columns = metadata.row_groups[row_group].columns();
let column_metadata = &columns[column];
// ANCHOR_END: column_metadata

// ANCHOR: column_index
// read the column index
let index = indexes::read_column(&mut reader, column_metadata.column_chunk())?;
if let Some(index) = index {
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");
}

// read the offset index containing page locations
let maybe_pages = indexes::read_page_locations(&mut reader, column_metadata.column_chunk())?;
if let Some(pages) = maybe_pages {
// there are page locations in the file
println!("{pages:?}");
}
// read the column indexes of every column
use parquet2::read;
let index = read::read_columns_indexes(&mut reader, columns)?;
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");

// read the offset indexes containing page locations of every column
let pages = read::read_pages_locations(&mut reader, columns)?;
println!("{pages:?}");
// ANCHOR_END: column_index

// ANCHOR: statistics
Expand Down
5 changes: 1 addition & 4 deletions src/bloom_filter/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ pub fn read<R: Read + Seek>(
mut reader: &mut R,
bitset: &mut Vec<u8>,
) -> Result<(), ParquetError> {
let offset = column_metadata
.metadata()
.ok_or_else(|| ParquetError::OutOfSpec("Column metadata is required".to_string()))?
.bloom_filter_offset;
let offset = column_metadata.metadata().bloom_filter_offset;

let offset = if let Some(offset) = offset {
offset as u64
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ impl From<std::num::TryFromIntError> for ParquetError {
}
}

impl From<std::array::TryFromSliceError> for ParquetError {
fn from(e: std::array::TryFromSliceError) -> ParquetError {
ParquetError::OutOfSpec(format!("Can't deserialize to parquet native type: {}", e))
}
}

/// A specialized `Result` for Parquet errors.
pub type Result<T> = std::result::Result<T, ParquetError>;

Expand Down
321 changes: 321 additions & 0 deletions src/indexes/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
use std::any::Any;

use parquet_format_async_temp::ColumnIndex;

use crate::parquet_bridge::BoundaryOrder;
use crate::schema::types::PrimitiveType;
use crate::{error::ParquetError, schema::types::PhysicalType, types::NativeType};

/// Trait object representing a [`ColumnIndex`] in Rust's native format.
///
/// See [`NativeIndex`], [`ByteIndex`] and [`FixedLenByteIndex`] for concrete implementations.
pub trait Index: Send + Sync + std::fmt::Debug {
fn as_any(&self) -> &dyn Any;

fn physical_type(&self) -> &PhysicalType;
}

impl PartialEq for dyn Index + '_ {
fn eq(&self, that: &dyn Index) -> bool {
equal(self, that)
}
}

impl Eq for dyn Index + '_ {}

fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
if lhs.physical_type() != rhs.physical_type() {
return false;
}

match lhs.physical_type() {
PhysicalType::Boolean => {
lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
== rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
}
PhysicalType::Int32 => {
lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
}
PhysicalType::Int64 => {
lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
}
PhysicalType::Int96 => {
lhs.as_any()
.downcast_ref::<NativeIndex<[u32; 3]>>()
.unwrap()
== rhs
.as_any()
.downcast_ref::<NativeIndex<[u32; 3]>>()
.unwrap()
}
PhysicalType::Float => {
lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
}
PhysicalType::Double => {
lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
}
PhysicalType::ByteArray => {
lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
}
PhysicalType::FixedLenByteArray(_) => {
lhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
}
}
}

/// An index of a column of [`NativeType`] physical representation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NativeIndex<T: NativeType> {
/// The primitive type
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<T>>,
/// the order
pub boundary_order: BoundaryOrder,
}

impl<T: NativeType> NativeIndex<T> {
/// Creates a new [`NativeIndex`]
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice().try_into()?;
let max = max.as_slice().try_into()?;
(Some(T::from_le_bytes(min)), Some(T::from_le_bytes(max)))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

/// The index of a page, containing the min and max values of the page.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageIndex<T> {
/// The minimum value in the page. It is None when all values are null
pub min: Option<T>,
/// The maximum value in the page. It is None when all values are null
pub max: Option<T>,
/// The number of null values in the page
pub null_count: Option<i64>,
}

impl<T: NativeType> Index for NativeIndex<T> {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&T::TYPE
}
}

/// An index of a column of bytes physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteIndex {
/// The [`PrimitiveType`].
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl ByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for ByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&PhysicalType::ByteArray
}
}

/// An index of a column of fixed len byte physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FixedLenByteIndex {
/// The [`PrimitiveType`].
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl FixedLenByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for FixedLenByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&self.primitive_type.physical_type
}
}

/// An index of a column of boolean physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BooleanIndex {
/// The indexes, one item per page
pub indexes: Vec<PageIndex<bool>>,
pub boundary_order: BoundaryOrder,
}

impl BooleanIndex {
pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min[0] == 1;
let max = max[0] == 1;
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for BooleanIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&PhysicalType::Boolean
}
}
Loading

0 comments on commit 7cac93e

Please sign in to comment.