Skip to content

Commit

Permalink
Improve parquet reading performance ~35-40% (#3821)
Browse files Browse the repository at this point in the history
* prevent double copy due to mmap and stop early with n_rows

* improve parquet reading performance
  • Loading branch information
ritchie46 committed Jun 27, 2022
1 parent ca28e70 commit 83161a1
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 19 deletions.
66 changes: 66 additions & 0 deletions polars/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use super::*;
use arrow::datatypes::Field;
use arrow::io::parquet::read::{
column_iter_to_arrays, ArrayIter, BasicDecompressor, ColumnChunkMetaData, PageReader,
};

// TODO! make public in arrow2?
pub(super) fn get_field_columns<'a>(
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<&'a ColumnChunkMetaData> {
columns
.iter()
.filter(|x| x.descriptor().path_in_schema[0] == field_name)
.collect()
}

/// memory maps all columns that are part of the parquet field `field_name`
pub(super) fn mmap_columns<'a>(
file: &'a [u8],
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
get_field_columns(columns, field_name)
.into_iter()
.map(|meta| _mmap_single_column(file, meta))
.collect()
}

fn _mmap_single_column<'a>(
file: &'a [u8],
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
let (start, len) = meta.byte_range();
let chunk = &file[start as usize..(start + len) as usize];
(meta, chunk)
}

// similar to arrow2 serializer, except this accepts a slice instead of a vec.
// this allows use to memory map
pub(super) fn to_deserializer<'a>(
columns: Vec<(&ColumnChunkMetaData, &'a [u8])>,
field: Field,
num_rows: usize,
chunk_size: Option<usize>,
) -> ArrowResult<ArrayIter<'a>> {
let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows);

let (columns, types): (Vec<_>, Vec<_>) = columns
.into_iter()
.map(|(column_meta, chunk)| {
let pages = PageReader::new(
std::io::Cursor::new(chunk),
column_meta,
std::sync::Arc::new(|_, _| true),
vec![],
);
(
BasicDecompressor::new(pages, vec![]),
&column_meta.descriptor().descriptor.primitive_type,
)
})
.unzip();

column_iter_to_arrays(columns, types, field, Some(chunk_size))
}
1 change: 1 addition & 0 deletions polars/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
//! }
//! ```
//!
pub(super) mod mmap;
pub mod predicates;
mod read;
mod read_impl;
Expand Down
1 change: 1 addition & 0 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
}

impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
/// Create a new [`ParquetReader`] from an existing `Reader`.
fn new(reader: R) -> Self {
ParquetReader {
reader,
Expand Down
75 changes: 56 additions & 19 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,47 @@
use crate::aggregations::{apply_aggregations, ScanAggregation};
use crate::mmap::{MmapBytesReader, ReaderBytes};
use crate::parquet::mmap;
use crate::parquet::mmap::mmap_columns;
use crate::parquet::predicates::collect_statistics;
use crate::predicates::{apply_predicate, arrow_schema_to_empty_df, PhysicalIoExpr};
use crate::utils::apply_projection;
use crate::RowCount;
use arrow::array::new_empty_array;
use arrow::io::parquet::read;
use arrow::io::parquet::read::{to_deserializer, ArrayIter, FileMetaData};
use arrow::io::parquet::read::{ArrayIter, FileMetaData};
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
use rayon::prelude::*;
use std::borrow::Cow;
use std::convert::TryFrom;
use std::io::Cursor;
use std::ops::Deref;
use std::sync::Arc;

fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result<Series> {
let chunks = iter.collect::<arrow::error::Result<Vec<_>>>()?;
fn array_iter_to_series(
iter: ArrayIter,
field: &ArrowField,
num_rows: Option<usize>,
) -> Result<Series> {
let mut total_count = 0;
let chunks = match num_rows {
None => iter.collect::<arrow::error::Result<Vec<_>>>()?,
Some(n) => {
let mut out = Vec::with_capacity(2);

for arr in iter {
let arr = arr?;
let len = arr.len();
out.push(arr);

total_count += len;
if total_count >= n {
break;
}
}
out
}
};
if chunks.is_empty() {
let arr = new_empty_array(field.data_type.clone());
Series::try_from((field.name.as_str(), arr))
Expand All @@ -29,20 +52,16 @@ fn array_iter_to_series(iter: ArrayIter, field: &ArrowField) -> Result<Series> {

#[allow(clippy::too_many_arguments)]
pub fn read_parquet<R: MmapBytesReader>(
reader: R,
mut reader: R,
limit: usize,
projection: Option<&[usize]>,
schema: &ArrowSchema,
metadata: Option<FileMetaData>,
predicate: Option<Arc<dyn PhysicalIoExpr>>,
aggregate: Option<&[ScanAggregation]>,
parallel: bool,
mut parallel: bool,
row_count: Option<RowCount>,
) -> Result<DataFrame> {
let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();
let mut reader = Cursor::new(bytes);

let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader))?;
Expand All @@ -52,10 +71,17 @@ pub fn read_parquet<R: MmapBytesReader>(
.map(Cow::Borrowed)
.unwrap_or_else(|| Cow::Owned((0usize..schema.fields.len()).collect::<Vec<_>>()));

if projection.len() == 1 {
parallel = false;
}

let mut dfs = Vec::with_capacity(row_group_len);

let mut remaining_rows = limit;

let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();

let mut previous_row_count = 0;
for rg in 0..row_group_len {
let md = &file_metadata.row_groups[rg];
Expand Down Expand Up @@ -87,17 +113,20 @@ pub fn read_parquet<R: MmapBytesReader>(
projection
.par_iter()
.map(|column_i| {
let mut reader = Cursor::new(bytes);
let field = &schema.fields[*column_i];
let columns = read::read_columns(&mut reader, md.columns(), &field.name)?;
let iter = to_deserializer(
let columns = mmap_columns(bytes, md.columns(), &field.name);
let iter = mmap::to_deserializer(
columns,
field.clone(),
remaining_rows,
Some(chunk_size),
)?;

array_iter_to_series(iter, field)
if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}
})
.collect::<Result<Vec<_>>>()
})?
Expand All @@ -106,11 +135,19 @@ pub fn read_parquet<R: MmapBytesReader>(
.iter()
.map(|column_i| {
let field = &schema.fields[*column_i];
let columns = read::read_columns(&mut reader, md.columns(), &field.name)?;
let iter =
to_deserializer(columns, field.clone(), remaining_rows, Some(chunk_size))?;

array_iter_to_series(iter, field)
let columns = mmap_columns(bytes, md.columns(), &field.name);
let iter = mmap::to_deserializer(
columns,
field.clone(),
remaining_rows,
Some(chunk_size),
)?;

if remaining_rows < md.num_rows() {
array_iter_to_series(iter, field, Some(remaining_rows))
} else {
array_iter_to_series(iter, field, None)
}
})
.collect::<Result<Vec<_>>>()?
};
Expand Down

0 comments on commit 83161a1

Please sign in to comment.