Skip to content

Commit

Permalink
parquet: low memory arg (#4050)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 17, 2022
1 parent 35304d5 commit 8406450
Show file tree
Hide file tree
Showing 15 changed files with 85 additions and 33 deletions.
25 changes: 18 additions & 7 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ impl DataFrame {
self.columns.iter().map(|s| s.estimated_size()).sum()
}

// reduce monomorphization
fn apply_columns(&self, func: &(dyn Fn(&Series) -> Series)) -> Vec<Series> {
self.columns.iter().map(|s| func(s)).collect()
}

// reduce monomorphization
fn apply_columns_par(&self, func: &(dyn Fn(&Series) -> Series + Send + Sync)) -> Vec<Series> {
POOL.install(|| self.columns.par_iter().map(|s| func(s)).collect())
Expand Down Expand Up @@ -2186,13 +2191,19 @@ impl DataFrame {
if offset == 0 && length == self.height() {
return self.clone();
}
let col = POOL.install(|| {
self.columns
.par_iter()
.map(|s| s.slice(offset, length))
.collect::<Vec<_>>()
});
DataFrame::new_no_checks(col)
DataFrame::new_no_checks(self.apply_columns_par(&|s| s.slice(offset, length)))
}

#[must_use]
pub fn _slice_and_realloc(&self, offset: i64, length: usize) -> Self {
if offset == 0 && length == self.height() {
return self.clone();
}
DataFrame::new_no_checks(self.apply_columns(&|s| {
let mut out = s.slice(offset, length);
out.shrink_to_fit();
out
}))
}

/// Get the head of the `DataFrame`.
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/series/implementations/struct_.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,12 @@ impl SeriesTrait for SeriesWrap<StructChunked> {
is_not_null.reduce(|lhs, rhs| lhs.bitand(rhs)).unwrap()
}

fn shrink_to_fit(&mut self) {
self.0.fields_mut().iter_mut().for_each(|s| {
s.shrink_to_fit();
});
}

fn reverse(&self) -> Series {
self.0.apply_fields(|s| s.reverse()).into_series()
}
Expand Down
11 changes: 11 additions & 0 deletions polars/polars-io/src/parquet/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub struct ParquetReader<R: Read + Seek> {
projection: Option<Vec<usize>>,
parallel: ParallelStrategy,
row_count: Option<RowCount>,
low_memory: bool,
}

impl<R: MmapBytesReader> ParquetReader<R> {
Expand All @@ -67,6 +68,7 @@ impl<R: MmapBytesReader> ParquetReader<R> {
aggregate,
self.parallel,
self.row_count,
self.low_memory,
)
.map(|mut df| {
if rechunk {
Expand All @@ -76,6 +78,13 @@ impl<R: MmapBytesReader> ParquetReader<R> {
})
}

/// Try to reduce memory pressure at the expense of performance. If setting this does not reduce memory
/// enough, turn off parallelization.
pub fn set_low_memory(mut self, low_memory: bool) -> Self {
self.low_memory = low_memory;
self
}

/// Read the parquet file in parallel (default). The single threaded reader consumes less memory.
pub fn read_parallel(mut self, parallel: ParallelStrategy) -> Self {
self.parallel = parallel;
Expand Down Expand Up @@ -127,6 +136,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
projection: None,
parallel: Default::default(),
row_count: None,
low_memory: false,
}
}

Expand All @@ -153,6 +163,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
None,
self.parallel,
self.row_count,
self.low_memory,
)
.map(|mut df| {
if self.rechunk {
Expand Down
7 changes: 6 additions & 1 deletion polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,7 @@ pub fn read_parquet<R: MmapBytesReader>(
aggregate: Option<&[ScanAggregation]>,
mut parallel: ParallelStrategy,
row_count: Option<RowCount>,
low_memory: bool,
) -> Result<DataFrame> {
let file_metadata = metadata
.map(Ok)
Expand Down Expand Up @@ -287,6 +288,10 @@ pub fn read_parquet<R: MmapBytesReader>(
} else {
let mut df = accumulate_dataframes_vertical(dfs.into_iter())?;
apply_aggregations(&mut df, aggregate)?;
Ok(df.slice_par(0, limit))
Ok(if low_memory {
df._slice_and_realloc(0, limit)
} else {
df.slice_par(0, limit)
})
}
}
16 changes: 12 additions & 4 deletions polars/polars-lazy/src/frame/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ pub struct ScanArgsParquet {
pub parallel: ParallelStrategy,
pub rechunk: bool,
pub row_count: Option<RowCount>,
pub low_memory: bool,
}

impl Default for ScanArgsParquet {
Expand All @@ -20,6 +21,7 @@ impl Default for ScanArgsParquet {
parallel: Default::default(),
rechunk: true,
row_count: None,
low_memory: false,
}
}
}
Expand All @@ -32,11 +34,13 @@ impl LazyFrame {
parallel: ParallelStrategy,
row_count: Option<RowCount>,
rechunk: bool,
low_memory: bool,
) -> Result<Self> {
let mut lf: LazyFrame =
LogicalPlanBuilder::scan_parquet(path, n_rows, cache, parallel, row_count, rechunk)?
.build()
.into();
let mut lf: LazyFrame = LogicalPlanBuilder::scan_parquet(
path, n_rows, cache, parallel, row_count, rechunk, low_memory,
)?
.build()
.into();
lf.opt_state.file_caching = true;
Ok(lf)
}
Expand All @@ -55,6 +59,7 @@ impl LazyFrame {

/// Create a LazyFrame directly from a parquet scan.
#[cfg_attr(docsrs, doc(cfg(feature = "parquet")))]
#[deprecated(note = "please use `concat_lf` instead")]
pub fn scan_parquet_files(paths: Vec<String>, args: ScanArgsParquet) -> Result<Self> {
let lfs = paths
.iter()
Expand All @@ -66,6 +71,7 @@ impl LazyFrame {
args.parallel,
None,
args.rechunk,
args.low_memory,
)
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -90,6 +96,7 @@ impl LazyFrame {
ParallelStrategy::None,
None,
args.rechunk,
args.low_memory,
)
})
.collect::<Result<Vec<_>>>()?;
Expand All @@ -103,6 +110,7 @@ impl LazyFrame {
args.parallel,
args.row_count,
args.rechunk,
args.low_memory,
)
}
}
Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ impl LogicalPlanBuilder {
parallel: polars_io::parquet::ParallelStrategy,
row_count: Option<RowCount>,
rechunk: bool,
low_memory: bool,
) -> Result<Self> {
use polars_io::SerReader as _;

Expand All @@ -111,6 +112,7 @@ impl LogicalPlanBuilder {
row_count,
rechunk,
file_counter: Default::default(),
low_memory,
},
}
.into())
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/logical_plan/options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub struct ParquetOptions {
pub(crate) rechunk: bool,
pub(crate) row_count: Option<RowCount>,
pub(crate) file_counter: FileCount,
pub(crate) low_memory: bool,
}

#[derive(Clone, Debug)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ impl ParquetExec {
.read_parallel(self.options.parallel)
.with_row_count(std::mem::take(&mut self.options.row_count))
.set_rechunk(self.options.rechunk)
.set_low_memory(self.options.low_memory)
._finish_with_scan_ops(
predicate,
aggregate,
Expand Down
3 changes: 1 addition & 2 deletions polars/polars-lazy/src/tests/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,7 @@ fn test_parquet_globbing() -> Result<()> {
n_rows: None,
cache: true,
parallel: Default::default(),
rechunk: false,
row_count: None,
..Default::default()
},
)?
.collect()?;
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn scan_foods_parquet(parallel: bool) -> LazyFrame {
cache: false,
parallel,
rechunk: true,
row_count: None,
..Default::default()
};
LazyFrame::scan_parquet(out_path, args).unwrap()
}
Expand Down
17 changes: 4 additions & 13 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -649,21 +649,10 @@ def _read_parquet(
parallel: str = "auto",
row_count_name: str | None = None,
row_count_offset: int = 0,
low_memory: bool = False,
) -> DF:
"""
Read into a DataFrame from a parquet file.
Parameters
----------
file
Path to a file or a file-like object. Any valid filepath can be used.
columns
Columns to select. Accepts a list of column indices (starting at zero) or a list of column names.
n_rows
Stop reading from parquet file after reading ``n_rows``.
parallel
Any of { 'auto', 'columns', 'row_groups', 'none' }
This determines the direction of parallelism. 'auto' will try to determine the optimal direction.
See Also: `pl.read_csv`
"""
if isinstance(file, (str, Path)):
file = format_path(file)
Expand All @@ -678,6 +667,7 @@ def _read_parquet(
parallel=parallel,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
low_memory=low_memory,
)

if columns is None:
Expand All @@ -698,6 +688,7 @@ def _read_parquet(
n_rows,
parallel,
_prepare_row_count_args(row_count_name, row_count_offset),
low_memory=low_memory,
)
return self

Expand Down
15 changes: 10 additions & 5 deletions py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def scan_parquet(
row_count_name: str | None = None,
row_count_offset: int = 0,
storage_options: dict | None = None,
low_memory: bool = False,
) -> LDF:
"""
See Also
Expand All @@ -204,6 +205,7 @@ def scan_parquet(
parallel,
rechunk,
_prepare_row_count_args(row_count_name, row_count_offset),
low_memory,
)
return self

Expand Down Expand Up @@ -1804,8 +1806,10 @@ def limit(self: LDF, n: int = 5) -> LDF:
"""
Limit the LazyFrame to the first `n` rows.
Note if you don't want the rows to be scanned, use the :func:`fetch` operation
instead.
.. note::
Consider using the :func:`fetch` operation when you only want to test your query.
The :func:`fetch` operation will load the first `n` rows at the scan level, whereas
the :func:`head`/:func:`limit` are applied at the end.
Parameters
----------
Expand All @@ -1818,9 +1822,10 @@ def head(self: LDF, n: int = 5) -> LDF:
"""
Gets the first `n` rows of the DataFrame.
You probably don't want to use this!
Consider using the :func:`fetch` operation instead. The :func:`fetch` operation will truly
load the first `n` rows lazily.
.. note::
Consider using the :func:`fetch` operation when you only want to test your query.
The :func:`fetch` operation will load the first `n` rows at the scan level, whereas
the :func:`head`/:func:`limit` are applied at the end.
This operation instead loads all the rows and only applies the ``head`` at the end.
Expand Down
8 changes: 8 additions & 0 deletions py-polars/polars/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,7 @@ def scan_parquet(
row_count_name: str | None = None,
row_count_offset: int = 0,
storage_options: dict | None = None,
low_memory: bool = False,
**kwargs: Any,
) -> LazyFrame:
"""
Expand Down Expand Up @@ -648,6 +649,8 @@ def scan_parquet(
Extra options that make sense for ``fsspec.open()`` or a
particular storage connection.
e.g. host, port, username, password, etc.
low_memory: bool
Reduce memory pressure at the expense of performance.
"""

# Map legacy arguments to current ones and remove them from kwargs.
Expand All @@ -665,6 +668,7 @@ def scan_parquet(
row_count_name=row_count_name,
row_count_offset=row_count_offset,
storage_options=storage_options,
low_memory=low_memory,
)


Expand Down Expand Up @@ -787,6 +791,7 @@ def read_parquet(
parallel: str = "auto",
row_count_name: str | None = None,
row_count_offset: int = 0,
low_memory: bool = False,
**kwargs: Any,
) -> DataFrame:
"""
Expand Down Expand Up @@ -817,6 +822,8 @@ def read_parquet(
If not None, this will insert a row count column with give name into the DataFrame.
row_count_offset
Offset to start the row_count column (only use if the name is set).
low_memory: bool
Reduce memory pressure at the expense of performance.
**kwargs
kwargs for `pyarrow.parquet.read_table <https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html>`_.
Expand Down Expand Up @@ -859,6 +866,7 @@ def read_parquet(
parallel=parallel,
row_count_name=row_count_name,
row_count_offset=row_count_offset,
low_memory=low_memory,
)


Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ impl PyDataFrame {
n_rows: Option<usize>,
parallel: Wrap<ParallelStrategy>,
row_count: Option<(String, IdxSize)>,
low_memory: bool,
) -> PyResult<Self> {
use EitherRustPythonFile::*;

Expand All @@ -223,6 +224,7 @@ impl PyDataFrame {
.read_parallel(parallel.0)
.with_n_rows(n_rows)
.with_row_count(row_count)
.set_low_memory(low_memory)
.finish()
}
Rust(f) => ParquetReader::new(f.into_inner())
Expand Down
2 changes: 2 additions & 0 deletions py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ impl PyLazyFrame {
parallel: Wrap<ParallelStrategy>,
rechunk: bool,
row_count: Option<(String, IdxSize)>,
low_memory: bool,
) -> PyResult<Self> {
let row_count = row_count.map(|(name, offset)| RowCount { name, offset });
let args = ScanArgsParquet {
Expand All @@ -235,6 +236,7 @@ impl PyLazyFrame {
parallel: parallel.0,
rechunk,
row_count,
low_memory,
};
let lf = LazyFrame::scan_parquet(path, args).map_err(PyPolarsErr::from)?;
Ok(lf.into())
Expand Down

0 comments on commit 8406450

Please sign in to comment.