Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support for parquet sidecar to FileReader (#1215)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 9, 2022
1 parent 56189bd commit 838deca
Show file tree
Hide file tree
Showing 11 changed files with 138 additions and 173 deletions.
34 changes: 20 additions & 14 deletions benches/read_parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,16 @@ fn to_buffer(
buffer
}

fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let file = Cursor::new(buffer);
fn read_chunk(buffer: &[u8], size: usize, column: usize) -> Result<()> {
let mut reader = Cursor::new(buffer);

let reader = read::FileReader::try_new(file, Some(&[column]), None, None, None)?;
let metadata = read::read_metadata(&mut reader)?;

let schema = read::infer_schema(&metadata)?;

let schema = schema.filter(|index, _| index == column);

let reader = read::FileReader::new(reader, metadata.row_groups, schema, None, None);

for maybe_chunk in reader {
let columns = maybe_chunk?;
Expand All @@ -49,43 +55,43 @@ fn add_benchmark(c: &mut Criterion) {
let size = 2usize.pow(i);
let buffer = to_buffer(size, true, false, false, false);
let a = format!("read i64 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let a = format!("read utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let a = format!("read utf8 large 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 6).unwrap()));

let a = format!("read utf8 emoji 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 12).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 12).unwrap()));

let a = format!("read bool 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 3).unwrap()));

let buffer = to_buffer(size, true, true, false, false);
let a = format!("read utf8 dict 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, false, true);
let a = format!("read i64 snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, true, false, true, false);
let a = format!("read utf8 multi 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read utf8 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));

let buffer = to_buffer(size, true, false, true, true);
let a = format!("read i64 multi snappy 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 0).unwrap()));

let buffer = to_buffer(size, false, false, false, false);
let a = format!("read required utf8 2^{}", i);
c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap()));
c.bench_function(&a, |b| b.iter(|| read_chunk(&buffer, size, 2).unwrap()));
});
}

Expand Down
36 changes: 25 additions & 11 deletions examples/parquet_read.rs
Original file line number Diff line number Diff line change
@@ -1,30 +1,44 @@
use std::fs::File;
use std::time::SystemTime;

use arrow2::error::Result;
use arrow2::error::Error;
use arrow2::io::parquet::read;

fn main() -> Result<()> {
fn main() -> Result<(), Error> {
// say we have a file
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let mut reader = File::open(file_path)?;

let reader = File::open(file_path)?;
let reader = read::FileReader::try_new(reader, None, Some(1024 * 8 * 8), None, None)?;
// we can read its metadata:
let metadata = read::read_metadata(&mut reader)?;

println!("{:#?}", reader.schema());
// and infer a [`Schema`] from the `metadata`.
let schema = read::infer_schema(&metadata)?;

// say we want to evaluate if the we can skip some row groups based on a field's value
let field = &reader.schema().fields[0];
// we can filter the columns we need (here we select all)
let schema = schema.filter(|_index, _field| true);

// we can deserialize the parquet statistics from this field
let statistics = read::statistics::deserialize(field, &reader.metadata().row_groups)?;
// we can read the statistics of all parquet's row groups (here for the first field)
let statistics = read::statistics::deserialize(&schema.fields[0], &metadata.row_groups)?;

println!("{:#?}", statistics);

// say we found that we only need to read the first two row groups, "0" and "1"
let row_groups = metadata
.row_groups
.into_iter()
.enumerate()
.filter(|(index, _)| *index == 0 || *index == 1)
.map(|(_, row_group)| row_group)
.collect();

// we can then read the row groups into chunks
let chunks = read::FileReader::new(reader, row_groups, schema, Some(1024 * 8 * 8), None);

let start = SystemTime::now();
for maybe_chunk in reader {
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
assert!(!chunk.is_empty());
}
Expand Down
2 changes: 1 addition & 1 deletion examples/parquet_read_async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ async fn main() -> Result<()> {
// the runtime.
// Furthermore, this operation is trivially paralellizable e.g. via rayon, as each iterator
// can be advanced in parallel (parallel decompression and deserialization).
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows() as usize, None);
let chunks = RowGroupDeserializer::new(column_chunks, row_group.num_rows(), None);
for maybe_chunk in chunks {
let chunk = maybe_chunk?;
println!("{}", chunk.len());
Expand Down
2 changes: 1 addition & 1 deletion examples/s3/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ async fn main() -> Result<()> {

// this is CPU-bounded and should be sent to a separate thread-pool.
// We do it here for simplicity
let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows() as usize, None);
let chunks = read::RowGroupDeserializer::new(column_chunks, group.num_rows(), None);
let chunks = chunks.collect::<Result<Vec<_>>>()?;

// this is a single chunk because chunk_size is `None`
Expand Down
22 changes: 22 additions & 0 deletions src/datatypes/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,28 @@ impl Schema {
metadata,
}
}

/// Returns a new [`Schema`] with a subset of all fields whose `predicate`
/// evaluates to true.
pub fn filter<F: Fn(usize, &Field) -> bool>(self, predicate: F) -> Self {
let fields = self
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if (predicate)(index, &f) {
Some(f)
} else {
None
}
})
.collect();

Schema {
fields,
metadata: self.metadata,
}
}
}

impl From<Vec<Field>> for Schema {
Expand Down
120 changes: 15 additions & 105 deletions src/io/parquet/read/file.rs
Original file line number Diff line number Diff line change
@@ -1,115 +1,42 @@
use std::io::{Read, Seek};
use std::sync::Arc;

use crate::array::Array;
use crate::chunk::Chunk;
use crate::datatypes::Schema;
use crate::error::Result;
use crate::io::parquet::read::read_columns_many;
use crate::{
datatypes::Field,
error::{Error, Result},
};

use super::{infer_schema, read_metadata, FileMetaData, RowGroupDeserializer, RowGroupMetaData};

type GroupFilter = Arc<dyn Fn(usize, &RowGroupMetaData) -> bool + Send + Sync>;
use super::{RowGroupDeserializer, RowGroupMetaData};

/// An iterator of [`Chunk`]s coming from row groups of a parquet file.
///
/// This can be thought of a flatten chain of [`Iterator<Item=Chunk>`] - each row group is sequentially
/// mapped to an [`Iterator<Item=Chunk>`] and each iterator is iterated upon until either the limit
/// or the last iterator ends.
/// # Implementation
/// This iterator mixes IO-bounded and CPU-bounded operations.
/// This iterator is single threaded on both IO-bounded and CPU-bounded tasks, and mixes them.
pub struct FileReader<R: Read + Seek> {
row_groups: RowGroupReader<R>,
metadata: FileMetaData,
remaining_rows: usize,
current_row_group: Option<RowGroupDeserializer>,
}

impl<R: Read + Seek> FileReader<R> {
/// Creates a new [`FileReader`] by reading the metadata from `reader` and constructing
/// Arrow's schema from it.
///
/// # Error
/// This function errors iff:
/// * reading the metadata from the reader fails
/// * it is not possible to derive an arrow schema from the parquet file
/// * the projection contains columns that do not exist
pub fn try_new(
mut reader: R,
projection: Option<&[usize]>,
/// Returns a new [`FileReader`].
pub fn new(
reader: R,
row_groups: Vec<RowGroupMetaData>,
schema: Schema,
chunk_size: Option<usize>,
limit: Option<usize>,
groups_filter: Option<GroupFilter>,
) -> Result<Self> {
let metadata = read_metadata(&mut reader)?;

let schema = infer_schema(&metadata)?;

let schema_metadata = schema.metadata;
let fields: Vec<Field> = if let Some(projection) = &projection {
schema
.fields
.into_iter()
.enumerate()
.filter_map(|(index, f)| {
if projection.iter().any(|&i| i == index) {
Some(f)
} else {
None
}
})
.collect()
} else {
schema.fields.into_iter().collect()
};

if let Some(projection) = &projection {
if fields.len() != projection.len() {
return Err(Error::InvalidArgumentError(
"While reading parquet, some columns in the projection do not exist in the file"
.to_string(),
));
}
}

let schema = Schema {
fields,
metadata: schema_metadata,
};

let row_groups = RowGroupReader::new(
reader,
schema,
groups_filter,
metadata.row_groups.clone(),
chunk_size,
limit,
);
) -> Self {
let row_groups = RowGroupReader::new(reader, schema, row_groups, chunk_size, limit);

Ok(Self {
Self {
row_groups,
metadata,
remaining_rows: limit.unwrap_or(usize::MAX),
current_row_group: None,
})
}

/// Returns the derived arrow [`Schema`] of the file
pub fn schema(&self) -> &Schema {
&self.row_groups.schema
}

/// Returns parquet's [`FileMetaData`].
pub fn metadata(&self) -> &FileMetaData {
&self.metadata
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.row_groups.set_groups_filter(groups_filter);
}
}

fn next_row_group(&mut self) -> Result<Option<RowGroupDeserializer>> {
Expand Down Expand Up @@ -178,7 +105,6 @@ impl<R: Read + Seek> Iterator for FileReader<R> {
pub struct RowGroupReader<R: Read + Seek> {
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: std::iter::Enumerate<std::vec::IntoIter<RowGroupMetaData>>,
chunk_size: Option<usize>,
remaining_rows: usize,
Expand All @@ -189,26 +115,19 @@ impl<R: Read + Seek> RowGroupReader<R> {
pub fn new(
reader: R,
schema: Schema,
groups_filter: Option<GroupFilter>,
row_groups: Vec<RowGroupMetaData>,
chunk_size: Option<usize>,
limit: Option<usize>,
) -> Self {
Self {
reader,
schema,
groups_filter,
row_groups: row_groups.into_iter().enumerate(),
chunk_size,
remaining_rows: limit.unwrap_or(usize::MAX),
}
}

/// Sets the groups filter
pub fn set_groups_filter(&mut self, groups_filter: GroupFilter) {
self.groups_filter = Some(groups_filter);
}

#[inline]
fn _next(&mut self) -> Result<Option<RowGroupDeserializer>> {
if self.schema.fields.is_empty() {
Expand All @@ -219,14 +138,7 @@ impl<R: Read + Seek> RowGroupReader<R> {
return Ok(None);
}

let row_group = if let Some(groups_filter) = self.groups_filter.as_ref() {
self.row_groups
.by_ref()
.find(|(index, row_group)| (groups_filter)(*index, row_group))
} else {
self.row_groups.next()
};
let row_group = if let Some((_, row_group)) = row_group {
let row_group = if let Some((_, row_group)) = self.row_groups.next() {
row_group
} else {
return Ok(None);
Expand All @@ -242,12 +154,10 @@ impl<R: Read + Seek> RowGroupReader<R> {

let result = RowGroupDeserializer::new(
column_chunks,
row_group.num_rows() as usize,
row_group.num_rows(),
Some(self.remaining_rows),
);
self.remaining_rows = self
.remaining_rows
.saturating_sub(row_group.num_rows() as usize);
self.remaining_rows = self.remaining_rows.saturating_sub(row_group.num_rows());
Ok(Some(result))
}
}
Expand Down
4 changes: 1 addition & 3 deletions src/io/parquet/read/row_group.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,6 @@ pub async fn read_columns_many_async<
field_columns
.into_iter()
.zip(fields.into_iter())
.map(|(columns, field)| {
to_deserializer(columns, field, row_group.num_rows() as usize, chunk_size)
})
.map(|(columns, field)| to_deserializer(columns, field, row_group.num_rows(), chunk_size))
.collect()
}
Loading

0 comments on commit 838deca

Please sign in to comment.