Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for page-level filter pushdown (column and offset indexes) #107

Merged
merged 16 commits into from
Mar 25, 2022
28 changes: 12 additions & 16 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use parquet2::bloom_filter;
use parquet2::error::Result;
use parquet2::indexes;

// ANCHOR: deserialize
use parquet2::encoding::Encoding;
Expand Down Expand Up @@ -52,24 +51,21 @@ fn main() -> Result<()> {
// ANCHOR: column_metadata
let row_group = 0;
let column = 0;
let column_metadata = metadata.row_groups[row_group].column(column);
let columns = metadata.row_groups[row_group].columns();
let column_metadata = &columns[column];
// ANCHOR_END: column_metadata

// ANCHOR: column_index
// read the column index
let index = indexes::read_column(&mut reader, column_metadata.column_chunk())?;
if let Some(index) = index {
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");
}

// read the offset index containing page locations
let maybe_pages = indexes::read_page_locations(&mut reader, column_metadata.column_chunk())?;
if let Some(pages) = maybe_pages {
// there are page locations in the file
println!("{pages:?}");
}
// read the column indexes of every column
use parquet2::read;
let index = read::read_columns_indexes(&mut reader, columns)?;
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");

// read the offset indexes containing page locations of every column
let pages = read::read_pages_locations(&mut reader, columns)?;
println!("{pages:?}");
// ANCHOR_END: column_index

// ANCHOR: statistics
Expand Down
5 changes: 1 addition & 4 deletions src/bloom_filter/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@ pub fn read<R: Read + Seek>(
mut reader: &mut R,
bitset: &mut Vec<u8>,
) -> Result<(), ParquetError> {
let offset = column_metadata
.metadata()
.ok_or_else(|| ParquetError::OutOfSpec("Column metadata is required".to_string()))?
.bloom_filter_offset;
let offset = column_metadata.metadata().bloom_filter_offset;

let offset = if let Some(offset) = offset {
offset as u64
Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,12 @@ impl From<std::num::TryFromIntError> for ParquetError {
}
}

impl From<std::array::TryFromSliceError> for ParquetError {
fn from(e: std::array::TryFromSliceError) -> ParquetError {
ParquetError::OutOfSpec(format!("Can't deserialize to parquet native type: {}", e))
}
}

/// A specialized `Result` for Parquet errors.
pub type Result<T> = std::result::Result<T, ParquetError>;

Expand Down
321 changes: 321 additions & 0 deletions src/indexes/index.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
use std::any::Any;

use parquet_format_async_temp::ColumnIndex;

use crate::parquet_bridge::BoundaryOrder;
use crate::schema::types::PrimitiveType;
use crate::{error::ParquetError, schema::types::PhysicalType, types::NativeType};

/// Trait object representing a [`ColumnIndex`] in Rust's native format.
///
/// See [`NativeIndex`], [`ByteIndex`] and [`FixedLenByteIndex`] for concrete implementations.
pub trait Index: Send + Sync + std::fmt::Debug {
fn as_any(&self) -> &dyn Any;

fn physical_type(&self) -> &PhysicalType;
}

impl PartialEq for dyn Index + '_ {
fn eq(&self, that: &dyn Index) -> bool {
equal(self, that)
}
}

impl Eq for dyn Index + '_ {}

fn equal(lhs: &dyn Index, rhs: &dyn Index) -> bool {
if lhs.physical_type() != rhs.physical_type() {
return false;
}

match lhs.physical_type() {
PhysicalType::Boolean => {
lhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
== rhs.as_any().downcast_ref::<BooleanIndex>().unwrap()
}
PhysicalType::Int32 => {
lhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i32>>().unwrap()
}
PhysicalType::Int64 => {
lhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<i64>>().unwrap()
}
PhysicalType::Int96 => {
lhs.as_any()
.downcast_ref::<NativeIndex<[u32; 3]>>()
.unwrap()
== rhs
.as_any()
.downcast_ref::<NativeIndex<[u32; 3]>>()
.unwrap()
}
PhysicalType::Float => {
lhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f32>>().unwrap()
}
PhysicalType::Double => {
lhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
== rhs.as_any().downcast_ref::<NativeIndex<f64>>().unwrap()
}
PhysicalType::ByteArray => {
lhs.as_any().downcast_ref::<ByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<ByteIndex>().unwrap()
}
PhysicalType::FixedLenByteArray(_) => {
lhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
== rhs.as_any().downcast_ref::<FixedLenByteIndex>().unwrap()
}
}
}

/// An index of a column of [`NativeType`] physical representation
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct NativeIndex<T: NativeType> {
/// The primitive type
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<T>>,
/// the order
pub boundary_order: BoundaryOrder,
}

impl<T: NativeType> NativeIndex<T> {
/// Creates a new [`NativeIndex`]
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min.as_slice().try_into()?;
let max = max.as_slice().try_into()?;
(Some(T::from_le_bytes(min)), Some(T::from_le_bytes(max)))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

/// The index of a page, containing the min and max values of the page.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct PageIndex<T> {
/// The minimum value in the page. It is None when all values are null
pub min: Option<T>,
/// The maximum value in the page. It is None when all values are null
pub max: Option<T>,
/// The number of null values in the page
pub null_count: Option<i64>,
}

impl<T: NativeType> Index for NativeIndex<T> {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&T::TYPE
}
}

/// An index of a column of bytes physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct ByteIndex {
/// The [`PrimitiveType`].
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl ByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for ByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&PhysicalType::ByteArray
}
}

/// An index of a column of fixed len byte physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct FixedLenByteIndex {
/// The [`PrimitiveType`].
pub primitive_type: PrimitiveType,
/// The indexes, one item per page
pub indexes: Vec<PageIndex<Vec<u8>>>,
pub boundary_order: BoundaryOrder,
}

impl FixedLenByteIndex {
pub(crate) fn try_new(
index: ColumnIndex,
primitive_type: PrimitiveType,
) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
primitive_type,
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for FixedLenByteIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&self.primitive_type.physical_type
}
}

/// An index of a column of boolean physical type
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct BooleanIndex {
/// The indexes, one item per page
pub indexes: Vec<PageIndex<bool>>,
pub boundary_order: BoundaryOrder,
}

impl BooleanIndex {
pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
let len = index.min_values.len();

let null_counts = index
.null_counts
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);

let indexes = index
.min_values
.into_iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
.map(|(((min, max), is_null), null_count)| {
let (min, max) = if is_null {
(None, None)
} else {
let min = min[0] == 1;
let max = max[0] == 1;
(Some(min), Some(max))
};
Ok(PageIndex {
min,
max,
null_count,
})
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok(Self {
indexes,
boundary_order: index.boundary_order.try_into()?,
})
}
}

impl Index for BooleanIndex {
fn as_any(&self) -> &dyn Any {
self
}

fn physical_type(&self) -> &PhysicalType {
&PhysicalType::Boolean
}
}
Loading