Skip to content

Commit

Permalink
make dynamic_groupby correct if combined with normal groupby
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 18, 2021
1 parent 6de63df commit 02e4e24
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 26 deletions.
124 changes: 113 additions & 11 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,12 @@ impl DataFrame {
if options.include_boundaries {
groups
.iter()
.map(|g| {
let offset = g.0;
let dt =
unsafe { dt.take_unchecked((g.1.iter().map(|i| *i as usize)).into()) };
.map(|base_g| {
let offset = base_g.0;
let dt = unsafe {
dt.take_unchecked((base_g.1.iter().map(|i| *i as usize)).into())
};

let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (mut sub_groups, lower, upper) = polars_time::groupby::groupby(
Expand All @@ -85,6 +87,12 @@ impl DataFrame {
options.include_boundaries,
options.closed_window,
);
let _lower = Int64Chunked::new_vec("lower", lower.clone())
.into_date()
.into_series();
let _higher = Int64Chunked::new_vec("upper", upper.clone())
.into_date()
.into_series();

match (&mut lower_bound, &mut upper_bound) {
(None, None) => {
Expand All @@ -99,9 +107,10 @@ impl DataFrame {
}

sub_groups.iter_mut().for_each(|g| {
g.0 += offset;
g.0 = offset;
for x in g.1.iter_mut() {
*x += offset
debug_assert!((*x as usize) < base_g.1.len());
unsafe { *x = *base_g.1.get_unchecked(*x as usize) }
}
});
sub_groups
Expand All @@ -112,10 +121,10 @@ impl DataFrame {
POOL.install(|| {
groups
.par_iter()
.map(|g| {
let offset = g.0;
.map(|base_g| {
let offset = base_g.0;
let dt = unsafe {
dt.take_unchecked((g.1.iter().map(|i| *i as usize)).into())
dt.take_unchecked((base_g.1.iter().map(|i| *i as usize)).into())
};
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
Expand All @@ -127,9 +136,10 @@ impl DataFrame {
);

sub_groups.iter_mut().for_each(|g| {
g.0 += offset;
g.0 = offset;
for x in g.1.iter_mut() {
*x += offset
debug_assert!((*x as usize) < base_g.1.len());
unsafe { *x = *base_g.1.get_unchecked(*x as usize) }
}
});
sub_groups
Expand Down Expand Up @@ -170,3 +180,95 @@ impl DataFrame {
.map(|s| (s, by, groups))
}
}

#[cfg(test)]
mod test {
use super::*;
use crate::time::date_range;
use polars_time::export::chrono::prelude::*;

#[test]
fn test_dynamic_groupby_window() {
let start = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(0, 0, 0)
.timestamp_nanos();
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(3, 0, 0)
.timestamp_nanos();
let range = date_range(
start,
stop,
Duration::parse("30m"),
ClosedWindow::Both,
"date",
)
.into_series();

let groups = Series::new("groups", ["a", "a", "a", "b", "b", "a", "a"]);
let df = DataFrame::new(vec![range, groups.clone()]).unwrap();

let (time_key, mut keys, groups) = df
.groupby_dynamic(
vec![groups],
&DynamicGroupOptions {
time_column: "date".into(),
every: Duration::parse("1h"),
period: Duration::parse("1h"),
offset: Duration::parse("0h"),
truncate: true,
include_boundaries: true,
closed_window: ClosedWindow::Both,
},
)
.unwrap();

keys.push(time_key);
let out = DataFrame::new(keys).unwrap();
let g = out.column("groups").unwrap();
let g = g.utf8().unwrap();
let g = g.into_no_null_iter().collect::<Vec<_>>();
assert_eq!(g, &["a", "a", "a", "b"]);

let upper = out.column("_upper_boundary").unwrap().slice(0, 3);
let start = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(1, 0, 0)
.timestamp_nanos();
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(3, 0, 0)
.timestamp_nanos();
let range = date_range(
start,
stop,
Duration::parse("1h"),
ClosedWindow::Both,
"_upper_boundary",
)
.into_series();
assert_eq!(&upper, &range);

let upper = out.column("_lower_boundary").unwrap().slice(0, 3);
let start = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(0, 0, 0)
.timestamp_nanos();
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(2, 0, 0)
.timestamp_nanos();
let range = date_range(
start,
stop,
Duration::parse("1h"),
ClosedWindow::Both,
"_lower_boundary",
)
.into_series();
assert_eq!(&upper, &range);

let expected = vec![
(0u32, vec![0u32, 1, 2]),
(0u32, vec![2]),
(0u32, vec![5, 6]),
(3u32, vec![3, 4]),
];
assert_eq!(expected, groups);
}
}
92 changes: 77 additions & 15 deletions py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2265,25 +2265,27 @@ def groupby_dynamic(
be seen as a rolling window, with a window size determined by dates/times instead of slots in the DataFrame.
A window is defined by:
- every: interval of the window
- period: length of the window
- offset: offset of the window
The `every`, `period` and `offset` arguments are created with
the following string language:
1ns # 1 nanosecond
1us # 1 microsecond
1ms # 1 millisecond
1s # 1 second
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1mo # 1 calendar month
1y # 1 calendar year
- 1ns (1 nanosecond)
- 1us (1 microsecond)
- 1ms (1 millisecond)
- 1s (1 second)
- 1m (1 minute)
- 1h (1 hour)
- 1d (1 day)
- 1w (1 week)
- 1mo (1 calendar month)
- 1y (1 calendar year)
3d12h4m25s # 3 days, 12 hours, 4 minutes, and 25 seconds
Or combine them:
"3d12h4m25s" # 3 days, 12 hours, 4 minutes, and 25 seconds
.. warning::
This API is experimental and may change without it being considered a breaking change.
Expand Down Expand Up @@ -2346,7 +2348,8 @@ def groupby_dynamic(
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌┤
│ 2021-12-16 03:00:00 ┆ 6 │
└─────────────────────┴─────┘
>>> # group by windows of 1 hour starting at 2021-12-16 00:00:00
Group by windows of 1 hour starting at 2021-12-16 00:00:00.
>>> (
... df.groupby_dynamic("time", every="1h").agg(
... [pl.col("time").min(), pl.col("time").max()]
Expand All @@ -2364,7 +2367,8 @@ def groupby_dynamic(
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-12-16 02:00:00 ┆ 2021-12-16 02:30:00 ┆ 2021-12-16 03:00:00 │
└─────────────────────┴─────────────────────┴─────────────────────┘
>>> # the window boundaries can also be added to the aggregation result
The window boundaries can also be added to the aggregation result
>>> (
... df.groupby_dynamic("time", every="1h", include_boundaries=True).agg(
... [pl.col("time").count()]
Expand All @@ -2382,7 +2386,8 @@ def groupby_dynamic(
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-12-16 02:00:00 ┆ 2021-12-16 03:00:00 ┆ 2021-12-16 02:00:00 ┆ 2 │
└─────────────────────┴─────────────────────┴─────────────────────┴────────────┘
>>> # when closed="left", should not include right end of interval [lower_bound, upper_bound)
When closed="left", should not include right end of interval [lower_bound, upper_bound)
>>> (
... df.groupby_dynamic("time", every="1h", closed="left").agg(
... [pl.col("time").count(), pl.col("time").list()]
Expand All @@ -2400,7 +2405,8 @@ def groupby_dynamic(
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2021-12-16 02:00:00 ┆ 2 ┆ [2021-12-16 02:00:00, 2021-12-16... │
└─────────────────────┴────────────┴─────────────────────────────────────┘
>>> # when closed="both" the time values at the window boundaries belong to 2 groups
When closed="both" the time values at the window boundaries belong to 2 groups.
>>> (
... df.groupby_dynamic("time", every="1h", closed="both").agg(
... [pl.col("time").count()]
Expand All @@ -2419,6 +2425,62 @@ def groupby_dynamic(
│ 2021-12-16 02:00:00 ┆ 3 │
└─────────────────────┴────────────┘
Dynamic groupbys can also be combined with grouping on normal keys
>>> df = pl.DataFrame(
... {
... "time": pl.date_range(
... low=datetime(2021, 12, 16),
... high=datetime(2021, 12, 16, 3),
... interval="30m",
... ),
... "groups": ["a", "a", "a", "b", "b", "a", "a"],
... }
... )
>>> df
shape: (7, 2)
┌─────────────────────┬────────┐
│ time ┆ groups │
│ --- ┆ --- │
│ datetime ┆ str │
╞═════════════════════╪════════╡
│ 2021-12-16 00:00:00 ┆ a │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 00:30:00 ┆ a │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 01:00:00 ┆ a │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 01:30:00 ┆ b │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 02:00:00 ┆ b │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 02:30:00 ┆ a │
├╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌┤
│ 2021-12-16 03:00:00 ┆ a │
└─────────────────────┴────────┘
>>> (
... df.groupby_dynamic(
... "time",
... every="1h",
... closed="both",
... by="groups",
... include_boundaries=True,
... ).agg([pl.col("time").count()])
... )
shape: (4, 5)
┌────────┬─────────────────────┬─────────────────────┬─────────────────────┬────────────┐
│ groups ┆ _lower_boundary ┆ _upper_boundary ┆ time ┆ time_count │
│ --- ┆ --- ┆ --- ┆ --- ┆ --- │
│ str ┆ datetime ┆ datetime ┆ datetime ┆ u32 │
╞════════╪═════════════════════╪═════════════════════╪═════════════════════╪════════════╡
│ a ┆ 2021-12-16 00:00:00 ┆ 2021-12-16 01:00:00 ┆ 2021-12-16 00:00:00 ┆ 3 │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ a ┆ 2021-12-16 01:00:00 ┆ 2021-12-16 02:00:00 ┆ 2021-12-16 00:00:00 ┆ 1 │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ a ┆ 2021-12-16 02:00:00 ┆ 2021-12-16 03:00:00 ┆ 2021-12-16 00:00:00 ┆ 2 │
├╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┤
│ b ┆ 2021-12-16 01:00:00 ┆ 2021-12-16 02:00:00 ┆ 2021-12-16 01:00:00 ┆ 2 │
└────────┴─────────────────────┴─────────────────────┴─────────────────────┴────────────┘
"""

return DynamicGroupBy(
Expand Down

0 comments on commit 02e4e24

Please sign in to comment.