Skip to content

Commit

Permalink
Added basic support to read indexes (#100)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Mar 17, 2022
1 parent e499570 commit d6425e2
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 3 deletions.
20 changes: 19 additions & 1 deletion examples/read_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use parquet2::bloom_filter;
use parquet2::error::Result;
use parquet2::indexes;

// ANCHOR: deserialize
use parquet2::bloom_filter;
use parquet2::encoding::Encoding;
use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer, DataPage};
Expand Down Expand Up @@ -60,6 +61,23 @@ fn main() -> Result<()> {
let column_metadata = metadata.row_groups[row_group].column(column);
// ANCHOR_END: column_metadata

// ANCHOR: column_index
// read the column index
let index = indexes::read_column(&mut reader, column_metadata.column_chunk())?;
if let Some(index) = index {
// these are the minimum and maximum within each page, which can be used
// to skip pages.
println!("{index:?}");
}

// read the offset index containing page locations
let maybe_pages = indexes::read_page_locations(&mut reader, column_metadata.column_chunk())?;
if let Some(pages) = maybe_pages {
// there are page locations in the file
println!("{pages:?}");
}
// ANCHOR_END: column_index

// ANCHOR: statistics
if let Some(maybe_stats) = column_metadata.statistics() {
let stats = maybe_stats?;
Expand Down
15 changes: 13 additions & 2 deletions guide/src/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,22 @@ which can be downcasted via its `Statistics::physical_type()`:

## Bloom filters

The metadata of columns can contain bloom filter bitsets that
can be used to pushdown filter operations.
The column metadata may contain bloom filter bitsets that can be used to pushdown
filter operations to row groups.

This crate offers the necessary functionality to check whether an item is not in a column chunk:

```rust,no_run,noplayground
{{#include ../../examples/read_metadata.rs:bloom_filter}}
```

## Column and page indexes

The column metadata may contain column and page indexes that can be used to push down filters
when reading (IO) pages.

This crate offers the necessary functionality to check whether an item is not in a column chunk:

```rust,no_run,noplayground
{{#include ../../examples/read_metadata.rs:column_metadata}}
```
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,12 @@ impl From<std::io::Error> for ParquetError {
}
}

impl From<std::num::TryFromIntError> for ParquetError {
fn from(e: std::num::TryFromIntError) -> ParquetError {
ParquetError::OutOfSpec(format!("Number must be zero or positive: {}", e))
}
}

/// A specialized `Result` for Parquet errors.
pub type Result<T> = std::result::Result<T, ParquetError>;

Expand Down
60 changes: 60 additions & 0 deletions src/indexes/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::convert::TryInto;
use std::io::{Cursor, Read, Seek, SeekFrom};

use parquet_format_async_temp::{
thrift::protocol::TCompactInputProtocol, ColumnChunk, ColumnIndex, OffsetIndex, PageLocation,
};

use crate::error::ParquetError;

/// Read the [`ColumnIndex`] from the [`ColumnChunk`], if available.
pub fn read_column<R: Read + Seek>(
reader: &mut R,
chunk: &ColumnChunk,
) -> Result<Option<ColumnIndex>, ParquetError> {
let (offset, length): (u64, usize) = if let Some(offset) = chunk.column_index_offset {
let length = chunk.column_index_length.ok_or_else(|| {
ParquetError::OutOfSpec(
"The column length must exist if column offset exists".to_string(),
)
})?;
(offset.try_into()?, length.try_into()?)
} else {
return Ok(None);
};

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut d = Cursor::new(&data);
let mut prot = TCompactInputProtocol::new(&mut d);
Ok(Some(ColumnIndex::read_from_in_protocol(&mut prot)?))
}

/// Read [`PageLocation`]s from the [`ColumnChunk`], if available.
pub fn read_page_locations<R: Read + Seek>(
reader: &mut R,
chunk: &ColumnChunk,
) -> Result<Option<Vec<PageLocation>>, ParquetError> {
let (offset, length): (u64, usize) = if let Some(offset) = chunk.offset_index_offset {
let length = chunk.offset_index_length.ok_or_else(|| {
ParquetError::OutOfSpec(
"The column length must exist if column offset exists".to_string(),
)
})?;
(offset.try_into()?, length.try_into()?)
} else {
return Ok(None);
};

reader.seek(SeekFrom::Start(offset))?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;

let mut d = Cursor::new(&data);
let mut prot = TCompactInputProtocol::new(&mut d);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;

Ok(Some(offset.page_locations))
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pub mod error;
pub mod bloom_filter;
pub mod compression;
pub mod encoding;
pub mod indexes;
pub mod metadata;
pub mod page;
mod parquet_bridge;
Expand Down
5 changes: 5 additions & 0 deletions src/metadata/column_chunk_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ impl ColumnChunkMetaData {
self.column_chunk.file_offset
}

/// Returns this column's [`ColumnChunk`]
pub fn column_chunk(&self) -> &ColumnChunk {
&self.column_chunk
}

// The column's metadata
fn column_metadata(&self) -> &ColumnMetaData {
self.column_chunk.meta_data.as_ref().unwrap()
Expand Down

0 comments on commit d6425e2

Please sign in to comment.