Skip to content

Commit

Permalink
parquet: parallelize over row groups (#3924)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 7, 2022
1 parent cb7c219 commit 34f546e
Show file tree
Hide file tree
Showing 15 changed files with 270 additions and 94 deletions.
22 changes: 22 additions & 0 deletions polars/polars-io/src/parquet/predicates.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::predicates::PhysicalIoExpr;
use crate::ArrowResult;
use arrow::array::Array;
use arrow::compute::concatenate::concatenate;
Expand Down Expand Up @@ -86,3 +87,24 @@ pub(crate) fn collect_statistics(
Some(BatchStats { schema, stats })
})
}

pub(super) fn read_this_row_group(
predicate: Option<&Arc<dyn PhysicalIoExpr>>,
file_metadata: &arrow::io::parquet::read::FileMetaData,
schema: &ArrowSchema,
) -> Result<bool> {
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
return Ok(false);
} else if !matches!(should_read, Err(PolarsError::NotFound(_))) {
let _ = should_read?;
}
}
}
}
Ok(true)
}
28 changes: 25 additions & 3 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,31 @@ use crate::prelude::*;
use crate::RowCount;
use arrow::io::parquet::read;
use polars_core::prelude::*;
#[cfg(feature = "serde")]
use serde::{Deserialize, Serialize};
use std::io::{Read, Seek};
use std::sync::Arc;

#[derive(Copy, Clone, Debug)]
#[cfg_attr(feature = "serde", derive(Serialize, Deserialize))]
pub enum ParallelStrategy {
/// Don't parallelize
None,
/// Parallelize over the row groups
Columns,
/// Parallelize over the columns
RowGroups,
/// Automatically determine over which unit to parallelize
/// This will choose the most occurring unit.
Auto,
}

impl Default for ParallelStrategy {
fn default() -> Self {
ParallelStrategy::Auto
}
}

/// Read Apache parquet format into a DataFrame.
#[must_use]
pub struct ParquetReader<R: Read + Seek> {
Expand All @@ -17,7 +39,7 @@ pub struct ParquetReader<R: Read + Seek> {
n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: bool,
parallel: ParallelStrategy,
row_count: Option<RowCount>,
}

Expand Down Expand Up @@ -55,7 +77,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
pub fn read_parallel(mut self, parallel: bool) -> Self {
pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
self.parallel = parallel;
self
}
Expand Down Expand Up @@ -103,7 +125,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
n_rows: None,
columns: None,
projection: None,
parallel: true,
parallel: Default::default(),
row_count: None,
}
}
Expand Down
243 changes: 174 additions & 69 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
use crate::aggregations::{apply_aggregations, ScanAggregation};
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::mmap;
use crate::parquet::mmap::mmap_columns;
use crate::parquet::predicates::collect_statistics;
use crate::parquet::predicates::read_this_row_group;
use crate::parquet::{mmap, ParallelStrategy};
use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr};
use crate::utils::apply_projection;
use crate::RowCount;
use arrow::array::new_empty_array;
use arrow::io::parquet::read;
use arrow::io::parquet::read::{ArrayIter, FileMetaData};
use arrow::io::parquet::read::{ArrayIter, FileMetaData, RowGroupMetaData};
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
Expand All @@ -18,6 +18,25 @@ use std::convert::TryFrom;
use std::ops::Deref;
use std::sync::Arc;

fn column_idx_to_series(
column_i: usize,
md: &RowGroupMetaData,
remaining_rows: usize,
schema: &ArrowSchema,
bytes: &[u8],
chunk_size: usize,
) -> Result<Series> {
let field = &schema.fields[column_i];
let columns = mmap_columns(bytes, md.columns(), &field.name);
let iter = mmap::to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?;

if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}
}

fn array_iter_to_series(
iter: ArrayIter,
field: &ArrowField,
Expand Down Expand Up @@ -51,103 +70,60 @@ fn array_iter_to_series(
}

#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
mut reader: R,
// might parallelize over columns
fn rg_to_dfs(
bytes: &[u8],
n_row_groups: usize,
limit: usize,
projection: Option<&[usize]>,
file_metadata: &FileMetaData,
schema: &ArrowSchema,
metadata: Option<FileMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
mut parallel: bool,
row_count: Option<RowCount>,
) -> Result<DataFrame> {
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader))?;
let row_group_len = file_metadata.row_groups.len();

let projection = projection
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::<Vec<_>>()));

if projection.len() == 1 {
parallel = false;
}

let mut dfs = Vec::with_capacity(row_group_len);
parallel: ParallelStrategy,
projection: &[usize],
) -> Result<Vec<DataFrame>> {
let mut dfs = Vec::with_capacity(n_row_groups);

let mut remaining_rows = limit;

let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();

let mut previous_row_count = 0;
for rg in 0..row_group_len {
for rg in 0..n_row_groups {
let md = &file_metadata.row_groups[rg];
let current_row_count = md.num_rows() as IdxSize;
if let Some(pred) = &predicate {
if let Some(pred) = pred.as_stats_evaluator() {
if let Some(stats) = collect_statistics(&file_metadata.row_groups, schema)? {
let should_read = pred.should_read(&stats);
// a parquet file may not have statistics of all columns
if matches!(should_read, Ok(false)) {
previous_row_count += current_row_count;
continue;
} else if !matches!(should_read, Err(PolarsError::NotFound(_))) {
let _ = should_read?;
}
}
}
}

if !read_this_row_group(predicate.as_ref(), file_metadata, schema)? {
previous_row_count += current_row_count;
continue;
}
// test we don't read the parquet file if this env var is set
#[cfg(debug_assertions)]
{
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let chunk_size = md.num_rows() as usize;
let columns = if parallel {
let columns = if let ParallelStrategy::Columns = parallel {
POOL.install(|| {
projection
.par_iter()
.map(|column_i| {
let field = &schema.fields[*column_i];
let columns = mmap_columns(bytes, md.columns(), &field.name);
let iter = mmap::to_deserializer(
columns,
field.clone(),
column_idx_to_series(
*column_i,
md,
remaining_rows,
Some(chunk_size),
)?;

if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}
schema,
bytes,
chunk_size,
)
})
.collect::<Result<Vec<_>>>()
})?
} else {
projection
.iter()
.map(|column_i| {
let field = &schema.fields[*column_i];
let columns = mmap_columns(bytes, md.columns(), &field.name);
let iter = mmap::to_deserializer(
columns,
field.clone(),
remaining_rows,
Some(chunk_size),
)?;

if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}
column_idx_to_series(*column_i, md, remaining_rows, schema, bytes, chunk_size)
})
.collect::<Result<Vec<_>>>()?
};
Expand All @@ -166,6 +142,135 @@ pub fn read_parquet<R: MmapBytesReader>(
previous_row_count += current_row_count;
dfs.push(df)
}
Ok(dfs)
}

#[allow(clippy::too_many_arguments)]
// parallelizes over row groups
fn rg_to_dfs_par(
bytes: &[u8],
limit: usize,
file_metadata: &FileMetaData,
schema: &ArrowSchema,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
row_count: Option<RowCount>,
projection: &[usize],
) -> Result<Vec<DataFrame>> {
let mut remaining_rows = limit;
let mut previous_row_count = 0;

// compute the limits per row group and the row count offsets
let row_groups = file_metadata
.row_groups
.iter()
.map(|rg_md| {
let row_count_start = previous_row_count;
let num_rows = rg_md.num_rows();
previous_row_count += num_rows;
let local_limit = remaining_rows;
remaining_rows = remaining_rows.saturating_sub(num_rows);

(rg_md, local_limit, row_count_start)
})
.collect::<Vec<_>>();

let dfs = row_groups
.into_par_iter()
.map(|(md, local_limit, row_count_start)| {
if !read_this_row_group(predicate.as_ref(), file_metadata, schema)? {
return Ok(None);
}
// test we don't read the parquet file if this env var is set
#[cfg(debug_assertions)]
{
assert!(std::env::var("POLARS_PANIC_IF_PARQUET_PARSED").is_err())
}

let chunk_size = md.num_rows() as usize;
let columns = projection
.iter()
.map(|column_i| {
column_idx_to_series(*column_i, md, local_limit, schema, bytes, chunk_size)
})
.collect::<Result<Vec<_>>>()?;

let mut df = DataFrame::new_no_checks(columns);

if let Some(rc) = &row_count {
df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}

apply_predicate(&mut df, predicate.as_deref())?;
apply_aggregations(&mut df, aggregate)?;

Ok(Some(df))
})
.collect::<Result<Vec<_>>>()?;
Ok(dfs.into_iter().flatten().collect())
}

#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
mut reader: R,
limit: usize,
projection: Option<&[usize]>,
schema: &ArrowSchema,
metadata: Option<FileMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
mut parallel: ParallelStrategy,
row_count: Option<RowCount>,
) -> Result<DataFrame> {
let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader))?;
let row_group_len = file_metadata.row_groups.len();

let projection = projection
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::<Vec<_>>()));

if let ParallelStrategy::Auto = parallel {
if row_group_len > projection.len() || row_group_len > POOL.current_num_threads() {
parallel = ParallelStrategy::RowGroups;
} else {
parallel = ParallelStrategy::Columns;
}
}

if let (ParallelStrategy::Columns, true) = (parallel, projection.len() == 1) {
parallel = ParallelStrategy::None;
}

let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();
let dfs = match parallel {
ParallelStrategy::Columns | ParallelStrategy::None => rg_to_dfs(
bytes,
row_group_len,
limit,
&file_metadata,
schema,
predicate,
aggregate,
row_count,
parallel,
&projection,
)?,
ParallelStrategy::RowGroups => rg_to_dfs_par(
bytes,
limit,
&file_metadata,
schema,
predicate,
aggregate,
row_count,
&projection,
)?,
// auto should already be replaced by Columns or RowGroups
ParallelStrategy::Auto => unimplemented!(),
};

if dfs.is_empty() {
let schema = if let Cow::Borrowed(_) = projection {
Expand Down

0 comments on commit 34f546e

Please sign in to comment.