Skip to content

Commit

Permalink
improve dynamic_groupby performance/memory usage (#2439)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jan 22, 2022
1 parent c1669ea commit 3e739cc
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 86 deletions.
106 changes: 73 additions & 33 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::frame::groupby::GroupsProxy;
use crate::prelude::*;
use crate::POOL;
use polars_time::groupby::ClosedWindow;
use polars_time::groupby::{ClosedWindow, GroupsIdx};
use polars_time::{Duration, Window};
use rayon::prelude::*;

Expand Down Expand Up @@ -86,6 +86,15 @@ impl DataFrame {
by: Vec<Series>,
options: &DynamicGroupOptions,
) -> Result<(Series, Vec<Series>, GroupsProxy)> {
if options.offset.parsed_int || options.every.parsed_int || options.period.parsed_int {
assert!(
(options.offset.parsed_int || options.offset.is_zero())
&& (options.every.parsed_int || options.every.is_zero())
&& (options.period.parsed_int || options.period.is_zero()),
"you cannot combine time durations like '2h' with integer durations like '3i'"
)
}

let time = self.column(&options.index_column)?;
let time_type = time.dtype();

Expand Down Expand Up @@ -167,8 +176,9 @@ impl DataFrame {
};

let groups = if by.is_empty() {
dt.downcast_iter()
.flat_map(|vals| {
let mut groups_slice = dt
.downcast_iter()
.map(|vals| {
let ts = vals.values().as_slice();
let (groups, lower, upper) = polars_time::groupby::groupby_windows(
w,
Expand All @@ -180,24 +190,32 @@ impl DataFrame {
update_bounds(lower, upper);
groups
})
.collect::<Vec<_>>()
.collect::<Vec<_>>();
// we don't flatmap because in case of a single chunk we don't need to reallocate the inner vec,
// just pop it.
if groups_slice.len() == 1 {
GroupsProxy::Slice(groups_slice.pop().unwrap())
} else {
GroupsProxy::Slice(groups_slice.into_iter().flatten().collect())
}
} else {
let mut groups = self.groupby_with_series(by.clone(), true)?.groups;
groups.sort();
let groups = groups.into_idx();

// include boundaries cannot be parallel (easily)
if options.include_boundaries {
groups
let groupsidx = groups
.iter()
// we just flat map, because iterate over groups so we almost always need to reallocate
.flat_map(|base_g| {
let dt = unsafe {
dt.take_unchecked((base_g.1.iter().map(|i| *i as usize)).into())
};

let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (mut sub_groups, lower, upper) = polars_time::groupby::groupby_windows(
let (sub_groups, lower, upper) = polars_time::groupby::groupby_windows(
w,
ts,
options.include_boundaries,
Expand All @@ -212,19 +230,12 @@ impl DataFrame {
.into_series();

update_bounds(lower, upper);

sub_groups.iter_mut().for_each(|g| {
g.0 = unsafe { *base_g.1.get_unchecked(g.0 as usize) };
for x in g.1.iter_mut() {
debug_assert!((*x as usize) < base_g.1.len());
unsafe { *x = *base_g.1.get_unchecked(*x as usize) }
}
});
sub_groups
update_subgroups(&sub_groups, base_g)
})
.collect::<Vec<_>>()
.collect::<Vec<_>>();
GroupsProxy::Idx(groupsidx)
} else {
POOL.install(|| {
let groupsidx = POOL.install(|| {
groups
.par_iter()
.flat_map(|base_g| {
Expand All @@ -233,33 +244,25 @@ impl DataFrame {
};
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (mut sub_groups, _, _) = polars_time::groupby::groupby_windows(
let (sub_groups, _, _) = polars_time::groupby::groupby_windows(
w,
ts,
options.include_boundaries,
options.closed_window,
tu.to_polars_time(),
);

sub_groups.iter_mut().for_each(|g| {
g.0 = unsafe { *base_g.1.get_unchecked(g.0 as usize) };
for x in g.1.iter_mut() {
debug_assert!((*x as usize) < base_g.1.len());
unsafe { *x = *base_g.1.get_unchecked(*x as usize) }
}
});
sub_groups
update_subgroups(&sub_groups, base_g)
})
.collect::<Vec<_>>()
})
});
GroupsProxy::Idx(groupsidx)
}
};

// Safety:
// within bounds
let mut dt = unsafe { dt.take_unchecked(groups.iter().map(|g| g.0 as usize).into()) };
let dt = dt.clone().into_series().agg_first(&groups);
let mut dt = dt.datetime().unwrap().as_ref().clone();
for key in by.iter_mut() {
*key = unsafe { key.take_iter_unchecked(&mut groups.iter().map(|g| g.0 as usize)) };
*key = key.agg_first(&groups)
}

if options.truncate {
Expand Down Expand Up @@ -287,7 +290,7 @@ impl DataFrame {
dt.into_datetime(tu, None)
.into_series()
.cast(time_type)
.map(|s| (s, by, GroupsProxy::Idx(groups)))
.map(|s| (s, by, groups))
}

fn impl_groupby_rolling(
Expand Down Expand Up @@ -318,6 +321,25 @@ impl DataFrame {
}
}

fn update_subgroups(sub_groups: &[[u32; 2]], base_g: &(u32, Vec<u32>)) -> GroupsIdx {
sub_groups
.iter()
.map(|&[first, len]| {
let new_first = unsafe { *base_g.1.get_unchecked(first as usize) };

let first = first as usize;
let len = len as usize;
let idx = (first..first + len)
.map(|i| {
debug_assert!(i < base_g.1.len());
unsafe { *base_g.1.get_unchecked(i) }
})
.collect_trusted::<Vec<_>>();
(new_first, idx)
})
.collect_trusted::<Vec<_>>()
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -450,4 +472,22 @@ mod test {
]);
assert_eq!(expected, groups);
}

#[test]
#[should_panic]
fn test_panic_integer_temporal_combine() {
let df = DataFrame::new_no_checks(vec![]);
let _ = df.groupby_dynamic(
vec![],
&DynamicGroupOptions {
index_column: "date".into(),
every: Duration::parse("1h"),
period: Duration::parse("1i"),
offset: Duration::parse("0h"),
truncate: true,
include_boundaries: true,
closed_window: ClosedWindow::Both,
},
);
}
}
11 changes: 10 additions & 1 deletion polars/polars-time/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ pub struct Duration {
nsecs: i64,
// indicates if the duration is negative
negative: bool,
// indicates if an integer string was passed. e.g. "2i"
pub parsed_int: bool,
}

impl Duration {
Expand Down Expand Up @@ -45,6 +47,7 @@ impl Duration {
}

let mut start = 0;
let mut parsed_int = false;

let mut unit = String::with_capacity(2);
while let Some((i, mut ch)) = iter.next() {
Expand Down Expand Up @@ -83,7 +86,10 @@ impl Duration {
"mo" => months += n,
"y" => months += n * 12,
// we will read indexes as nanoseconds
"i" => nsecs += n,
"i" => {
nsecs += n;
parsed_int = true;
}
unit => panic!("unit: '{}' not supported", unit),
}
unit.clear();
Expand All @@ -93,6 +99,7 @@ impl Duration {
nsecs: nsecs.abs(),
months: months.abs(),
negative,
parsed_int,
}
}

Expand Down Expand Up @@ -140,6 +147,7 @@ impl Duration {
months: 0,
nsecs,
negative,
parsed_int: false,
}
}

Expand All @@ -150,6 +158,7 @@ impl Duration {
months,
nsecs: 0,
negative,
parsed_int: false,
}
}

Expand Down
24 changes: 9 additions & 15 deletions polars/polars-time/src/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub fn groupby_windows(
include_boundaries: bool,
closed_window: ClosedWindow,
tu: TimeUnit,
) -> (GroupsIdx, Vec<i64>, Vec<i64>) {
) -> (GroupsSlice, Vec<i64>, Vec<i64>) {
let start = time[0];
let boundary = if time.len() > 1 {
// +1 because left or closed boundary could match the next window if it is on the boundary
Expand Down Expand Up @@ -68,8 +68,6 @@ pub fn groupby_windows(
let mut latest_start = 0;

for bi in window.get_overlapping_bounds_iter(boundary, tu) {
let mut group = vec![];

let mut skip_window = false;
// find starting point of window
while latest_start < time.len() {
Expand All @@ -88,32 +86,28 @@ pub fn groupby_windows(
continue;
}

// subtract 1 because the next window could also start from the same point
latest_start = latest_start.saturating_sub(1);

// find members of this window
let mut i = latest_start;
if i >= time.len() {
break;
}

let first = latest_start as u32;

while i < time.len() {
let t = time[i];
if bi.is_member(t, closed_window) {
group.push(i as u32);
} else if bi.is_future(t) {
if !bi.is_member(t, closed_window) {
break;
}
i += 1
}
let len = (i as u32) - first;

if !group.is_empty() {
if include_boundaries {
lower_bound.push(bi.start);
upper_bound.push(bi.stop);
}
groups.push((group[0], group))
if include_boundaries {
lower_bound.push(bi.start);
upper_bound.push(bi.stop);
}
groups.push([first, len])
}
(groups, lower_bound, upper_bound)
}
Expand Down

0 comments on commit 3e739cc

Please sign in to comment.