Skip to content

Commit

Permalink
parallel parquet reader
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 22, 2021
1 parent fc0ff9d commit af67445
Show file tree
Hide file tree
Showing 7 changed files with 167 additions and 23 deletions.
13 changes: 13 additions & 0 deletions polars/polars-io/src/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,16 @@ impl std::ops::Deref for ReaderBytes<'_> {
}
}
}

impl<'a, T: 'a + MmapBytesReader> From<&'a T> for ReaderBytes<'a> {
fn from(m: &'a T) -> Self {
match m.to_bytes() {
Some(s) => ReaderBytes::Borrowed(s),
None => {
let f = m.to_file().unwrap();
let mmap = unsafe { memmap::Mmap::map(f).unwrap() };
ReaderBytes::Mapped(mmap)
}
}
}
}
1 change: 1 addition & 0 deletions polars/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
//! ```
//!
mod read;
mod read_par;
mod write;

use super::*;
Expand Down
67 changes: 48 additions & 19 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use super::{finish_reader, ArrowReader, ArrowResult, RecordBatch};
use crate::mmap::MmapBytesReader;
use crate::parquet::read_par::parallel_read;
use crate::prelude::*;
use crate::{PhysicalIoExpr, ScanAggregation};
use arrow::io::parquet::read;
Expand All @@ -16,12 +18,10 @@ pub struct ParquetReader<R: Read + Seek> {
n_rows: Option<usize>,
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
parallel: bool,
}

impl<R> ParquetReader<R>
where
R: Read + Seek,
{
impl<R: MmapBytesReader> ParquetReader<R> {
#[cfg(feature = "lazy")]
// todo! hoist to lazy crate
pub fn finish_with_scan_ops(
Expand All @@ -30,17 +30,27 @@ where
aggregate: Option<&[ScanAggregation]>,
projection: Option<&[usize]>,
) -> Result<DataFrame> {
let rechunk = self.rechunk;

let reader = read::RecordReader::try_new(
&mut self.reader,
projection.map(|x| x.to_vec()),
self.n_rows,
None,
None,
)?;
if aggregate.is_none() {
self.finish()
} else {
let rechunk = self.rechunk;

let reader = read::RecordReader::try_new(
&mut self.reader,
projection.map(|x| x.to_vec()),
self.n_rows,
None,
None,
)?;

finish_reader(reader, rechunk, self.n_rows, predicate, aggregate)
}
}

finish_reader(reader, rechunk, self.n_rows, predicate, aggregate)
/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
pub fn read_parallel(mut self, parallel: bool) -> Self {
self.parallel = parallel;
self
}

/// Stop parsing when `n` rows are parsed. By settings this parameter the csv will be parsed
Expand Down Expand Up @@ -81,17 +91,15 @@ impl<R: Read + Seek> ArrowReader for read::RecordReader<R> {
}
}

impl<R> SerReader<R> for ParquetReader<R>
where
R: Read + Seek,
{
impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
fn new(reader: R) -> Self {
ParquetReader {
reader,
rechunk: false,
n_rows: None,
columns: None,
projection: None,
parallel: true,
}
}

Expand All @@ -114,18 +122,39 @@ where
self.projection = Some(prj);
}

if self.parallel {
let rechunk = self.rechunk;
return parallel_read(
self.reader,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
&schema,
Some(metadata),
)
.map(|mut df| {
if rechunk {
df.rechunk();
};
df
});
}

let chunks = read_parquet(
&mut self.reader,
self.n_rows.unwrap_or(usize::MAX),
self.projection.as_deref(),
&schema,
Some(metadata),
)?;
let projection = self.projection.take();
let mut df = accumulate_dataframes_vertical(chunks.into_iter().map(|cols| {
DataFrame::new_no_checks(
cols.into_iter()
.enumerate()
.map(|(i, arr)| {
.map(|(mut i, arr)| {
if let Some(projection) = &projection {
i = projection[i]
}
Series::try_from((schema.field(i).name().as_str(), arr)).unwrap()
})
.collect(),
Expand Down
94 changes: 94 additions & 0 deletions polars/polars-io/src/parquet/read_par.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::mmap::{MmapBytesReader, ReaderBytes};
use arrow::array::ArrayRef;
use arrow::io::parquet::read;
use arrow::io::parquet::read::{FileMetaData, MutStreamingIterator};
use polars_core::prelude::*;
use polars_core::utils::accumulate_dataframes_vertical;
use polars_core::POOL;
use rayon::prelude::*;
use std::borrow::Cow;
use std::convert::TryFrom;
use std::io::Cursor;
use std::ops::Deref;
use std::sync::Arc;

pub(crate) fn parallel_read<R: MmapBytesReader>(
reader: R,
limit: usize,
projection: Option<&[usize]>,
arrow_schema: &ArrowSchema,
metadata: Option<FileMetaData>,
) -> Result<DataFrame> {
let reader = ReaderBytes::from(&reader);
let bytes = reader.deref();
let mut reader = Cursor::new(bytes);

let file_metadata = metadata
.map(Ok)
.unwrap_or_else(|| read::read_metadata(&mut reader))?;

let parq_fields = if let Some(projection) = projection {
let parq_fields = file_metadata.schema().fields();
Cow::Owned(
projection
.iter()
.map(|i| parq_fields[*i].clone())
.collect::<Vec<_>>(),
)
} else {
Cow::Borrowed(file_metadata.schema().fields())
};

let n_groups = file_metadata.row_groups.len();
let mut dfs = Vec::with_capacity(n_groups);

for row_group in 0..n_groups {
let columns = POOL.install(|| {
parq_fields
.par_iter()
.enumerate()
.map(|(mut field_i, field)| {
if let Some(projection) = projection {
field_i = projection[field_i];
}

// <IO bounded>

// create a new reader
let reader = Cursor::new(bytes);

// get compressed column pages
let mut columns = read::get_column_iterator(
reader,
&file_metadata,
row_group,
field_i,
None,
Vec::with_capacity(64),
);

let mut column_chunks = Vec::with_capacity(64);
while let read::State::Some(mut new_iter) = columns.advance().unwrap() {
if let Some((pages, metadata)) = new_iter.get() {
let pages = pages.collect::<Vec<_>>();

column_chunks.push((pages, metadata.clone()));
}
columns = new_iter;
}

// <CPU bounded>
let columns = read::ReadColumnIterator::new(field.clone(), column_chunks);
let field = &arrow_schema.fields()[field_i];

let (arr, _b1, _b2_) =
read::column_iter_to_array(columns, field, Vec::with_capacity(64))?;
Series::try_from((field.name().as_str(), Arc::from(arr) as ArrayRef))
})
.collect::<Result<Vec<_>>>()
})?;
dfs.push(DataFrame::new_no_checks(columns))
}

accumulate_dataframes_vertical(dfs.into_iter()).map(|df| df.slice(0, limit))
}
5 changes: 4 additions & 1 deletion py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ def read_parquet(
file: Union[str, BinaryIO],
columns: Optional[Union[List[int], List[str]]] = None,
n_rows: Optional[int] = None,
parallel: bool = True,
) -> "DataFrame":
"""
Read into a DataFrame from a parquet file.
Expand All @@ -529,6 +530,8 @@ def read_parquet(
Columns to select. Accepts a list of column indices (starting at zero) or a list of column names.
n_rows
Stop reading from parquet file after reading ``n_rows``.
parallel
Read the parquet file in parallel. The single threaded reader consumes less memory.
"""
projection: Optional[Sequence[int]] = None
if columns:
Expand All @@ -541,7 +544,7 @@ def read_parquet(
)

self = DataFrame.__new__(DataFrame)
self._df = PyDataFrame.read_parquet(file, columns, projection, n_rows)
self._df = PyDataFrame.read_parquet(file, columns, projection, n_rows, parallel)
return self

@staticmethod
Expand Down
7 changes: 4 additions & 3 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,7 @@ def read_parquet(
use_pyarrow: bool = _PYARROW_AVAILABLE,
memory_map: bool = True,
storage_options: Optional[Dict] = None,
parallel: bool = True,
**kwargs: Any,
) -> DataFrame:
"""
Expand All @@ -725,6 +726,8 @@ def read_parquet(
Only used when ``use_pyarrow=True``.
storage_options
Extra options that make sense for ``fsspec.open()`` or a particular storage connection, e.g. host, port, username, password, etc.
parallel
Read the parquet file in parallel. The single threaded reader consumes less memory.
**kwargs
kwargs for [pyarrow.parquet.read_table](https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html)
Expand Down Expand Up @@ -761,9 +764,7 @@ def read_parquet(
)

return DataFrame.read_parquet(
source_prep,
columns=columns,
n_rows=n_rows,
source_prep, columns=columns, n_rows=n_rows, parallel=parallel
)


Expand Down
3 changes: 3 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl PyDataFrame {
columns: Option<Vec<String>>,
projection: Option<Vec<usize>>,
n_rows: Option<usize>,
parallel: bool,
) -> PyResult<Self> {
use EitherRustPythonFile::*;

Expand All @@ -187,12 +188,14 @@ impl PyDataFrame {
ParquetReader::new(buf)
.with_projection(projection)
.with_columns(columns)
.read_parallel(parallel)
.with_n_rows(n_rows)
.finish()
}
Rust(f) => ParquetReader::new(f)
.with_projection(projection)
.with_columns(columns)
.read_parallel(parallel)
.with_n_rows(n_rows)
.finish(),
};
Expand Down

0 comments on commit af67445

Please sign in to comment.