Skip to content

Commit

Permalink
reduce peak memory of reading parquet by row groups (#4006)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jul 13, 2022
1 parent 6a64d6f commit f794a07
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 3 deletions.
15 changes: 15 additions & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,22 @@ impl DataFrame {
POOL.install(|| self.columns.par_iter().map(|s| func(s)).collect())
}

// reduce monomorphization
fn try_apply_columns_par(
&self,
func: &(dyn Fn(&Series) -> Result<Series> + Send + Sync),
) -> Result<Vec<Series>> {
POOL.install(|| self.columns.par_iter().map(|s| func(s)).collect())
}

// reduce monomorphization
fn try_apply_columns(
&self,
func: &(dyn Fn(&Series) -> Result<Series> + Send + Sync),
) -> Result<Vec<Series>> {
self.columns.iter().map(|s| func(s)).collect()
}

/// Get the index of the column.
fn check_name_to_idx(&self, name: &str) -> Result<usize> {
self.find_idx_by_name(name)
Expand Down Expand Up @@ -1473,6 +1482,12 @@ impl DataFrame {
Ok(DataFrame::new_no_checks(new_col))
}

/// Same as `filter` but does not parallelize.
pub fn _filter_seq(&self, mask: &BooleanChunked) -> Result<Self> {
let new_col = self.try_apply_columns(&|s| s.filter(mask))?;
Ok(DataFrame::new_no_checks(new_col))
}

/// Take `DataFrame` value by indexes from an iterator.
///
/// # Example
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/src/parquet/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ fn rg_to_dfs(
df.with_row_count_mut(&rc.name, Some(previous_row_count + rc.offset));
}

apply_predicate(&mut df, predicate.as_deref())?;
apply_predicate(&mut df, predicate.as_deref(), true)?;
apply_aggregations(&mut df, aggregate)?;

previous_row_count += current_row_count;
Expand Down Expand Up @@ -201,7 +201,7 @@ fn rg_to_dfs_par(
df.with_row_count_mut(&rc.name, Some(row_count_start as IdxSize + rc.offset));
}

apply_predicate(&mut df, predicate.as_deref())?;
apply_predicate(&mut df, predicate.as_deref(), false)?;
apply_aggregations(&mut df, aggregate)?;

Ok(Some(df))
Expand Down
8 changes: 7 additions & 1 deletion polars/polars-io/src/predicates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,17 @@ pub(crate) fn arrow_schema_to_empty_df(schema: &ArrowSchema) -> DataFrame {
pub(crate) fn apply_predicate(
df: &mut DataFrame,
predicate: Option<&dyn PhysicalIoExpr>,
parallel: bool,
) -> Result<()> {
if let (Some(predicate), false) = (&predicate, df.is_empty()) {
let s = predicate.evaluate(df)?;
let mask = s.bool().expect("filter predicates was not of type boolean");
*df = df.filter(mask)?;

if parallel {
*df = df.filter(mask)?;
} else {
*df = df._filter_seq(mask)?;
}
}
Ok(())
}

0 comments on commit f794a07

Please sign in to comment.