Skip to content

Commit

Permalink
refactor[rust]: ensure we don't explode data in Expr::apply if groups…
Browse files Browse the repository at this point in the history
… overlap. This reduces memory pressure (#4672)
  • Loading branch information
ritchie46 committed Sep 1, 2022
1 parent 0b44280 commit 6b2f9bd
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 22 deletions.
44 changes: 24 additions & 20 deletions polars/polars-lazy/src/physical_plan/expressions/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,14 @@ impl PhysicalExpr for ApplyExpr {
if self.inputs.len() == 1 {
let mut ac = self.inputs[0].evaluate_on_groups(df, groups, state)?;

match self.collect_groups {
ApplyOptions::ApplyGroups => {
match (state.overlapping_groups(), self.collect_groups) {
(_, ApplyOptions::ApplyList) => {
let s = self.function.call_udf(&mut [ac.aggregated()])?;
ac.with_series(s, true);
Ok(ac)
}
// overlapping groups always take this branch as explode/flat_naive bloats data size
(_, ApplyOptions::ApplyGroups) | (true, _) => {
let s = ac.series();

// collection of empty list leads to a null dtype
Expand Down Expand Up @@ -152,7 +158,7 @@ impl PhysicalExpr for ApplyExpr {
ca.rename(&name);
Ok(self.finish_apply_groups(ac, ca))
}
ApplyOptions::ApplyFlat => {
(_, ApplyOptions::ApplyFlat) => {
// make sure the groups are updated because we are about to throw away
// the series' length information
let set_update_groups = match ac.update_groups {
Expand All @@ -167,6 +173,7 @@ impl PhysicalExpr for ApplyExpr {
if let UpdateGroups::WithSeriesLen = ac.update_groups {
ac.groups();
}

let input = ac.flat_naive().into_owned();
let input_len = input.len();
let s = self.function.call_udf(&mut [input])?;
Expand All @@ -182,17 +189,23 @@ impl PhysicalExpr for ApplyExpr {
}
Ok(ac)
}
ApplyOptions::ApplyList => {
let s = self.function.call_udf(&mut [ac.aggregated()])?;
ac.with_series(s, true);
Ok(ac)
}
}
} else {
let mut acs = self.prepare_multiple_inputs(df, groups, state)?;

match self.collect_groups {
ApplyOptions::ApplyGroups => {
match (state.overlapping_groups(), self.collect_groups) {
(_, ApplyOptions::ApplyList) => {
let mut s = acs.iter_mut().map(|ac| ac.aggregated()).collect::<Vec<_>>();
let s = self.function.call_udf(&mut s)?;
// take the first aggregation context that as that is the input series
let mut ac = acs.swap_remove(0);
ac.with_update_groups(UpdateGroups::WithGroupsLen);
ac.with_series(s, true);
Ok(ac)
}

// overlapping groups always take this branch as explode bloats data size
(_, ApplyOptions::ApplyGroups) | (true, _) => {
let mut container = vec![Default::default(); acs.len()];
let name = acs[0].series().name().to_string();

Expand Down Expand Up @@ -226,7 +239,7 @@ impl PhysicalExpr for ApplyExpr {
let ac = self.finish_apply_groups(ac, ca);
Ok(ac)
}
ApplyOptions::ApplyFlat => {
(_, ApplyOptions::ApplyFlat) => {
let mut s = acs
.iter_mut()
.map(|ac| {
Expand All @@ -249,15 +262,6 @@ impl PhysicalExpr for ApplyExpr {
ac.with_series(s, false);
Ok(ac)
}
ApplyOptions::ApplyList => {
let mut s = acs.iter_mut().map(|ac| ac.aggregated()).collect::<Vec<_>>();
let s = self.function.call_udf(&mut s)?;
// take the first aggregation context that as that is the input series
let mut ac = acs.swap_remove(0);
ac.with_update_groups(UpdateGroups::WithGroupsLen);
ac.with_series(s, true);
Ok(ac)
}
}
}
}
Expand Down
13 changes: 12 additions & 1 deletion polars/polars-lazy/src/physical_plan/expressions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,18 @@ impl<'a> AggregationContext<'a> {
pub(crate) fn flat_naive(&self) -> Cow<'_, Series> {
match &self.state {
AggState::NotAggregated(s) => Cow::Borrowed(s),
AggState::AggregatedList(s) => Cow::Owned(s.explode().unwrap()),
AggState::AggregatedList(s) => {
#[cfg(debug_assertions)]
{
// panic so we find cases where we accidentally explode overlapping groups
// we don't want this as this can create a lot of data
if let GroupsProxy::Slice { rolling: true, .. } = self.groups.as_ref() {
panic!("implementation error, polars should not hit this branch for overlapping groups")
}
}

Cow::Owned(s.explode().unwrap())
}
AggState::AggregatedFlat(s) => Cow::Borrowed(s),
AggState::Literal(s) => Cow::Borrowed(s),
}
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/physical_plan/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ bitflags! {
/// Indicates that a groupby operations groups may overlap.
/// If this is the case, an `explode` will yield more values than rows in original `df`,
/// this breaks some assumptions
const OVERLAPPING_GROUPS = 0x03;
const OVERLAPPING_GROUPS = 0x04;
}
}

Expand Down

0 comments on commit 6b2f9bd

Please sign in to comment.