Skip to content

Commit

Permalink
add boundaries and window closed options
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent adf6ef3 commit 74220df
Show file tree
Hide file tree
Showing 10 changed files with 144 additions and 73 deletions.
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ moment = []
diagonal_concat = []
abs = []

dynamic_groupby = ["polars-time"]
dynamic_groupby = ["polars-time", "dtype-datetime", "dtype-date"]

# opt-in datatypes for Series
dtype-date = ["temporal"]
Expand Down
63 changes: 59 additions & 4 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::frame::groupby::GroupTuples;
use crate::prelude::*;
use polars_time::groupby::ClosedWindow;
use polars_time::{Duration, Window};
use rayon::prelude::*;

#[derive(Clone, Debug)]
pub struct DynamicGroupOptions {
Expand All @@ -14,6 +14,9 @@ pub struct DynamicGroupOptions {
pub offset: Duration,
/// truncate the time column values to the window
pub truncate: bool,
// add the boundaries to the dataframe
pub include_boundaries: bool,
pub closed_window: ClosedWindow,
}

impl DataFrame {
Expand All @@ -32,25 +35,63 @@ impl DataFrame {
let dt = time.cast(&DataType::Datetime)?;
let dt = dt.datetime().unwrap();

let mut lower_bound = None;
let mut upper_bound = None;

let groups = if by.is_empty() {
dt.downcast_iter()
.map(|vals| {
let ts = vals.values().as_slice();
polars_time::groupby::groupby(w, ts)
let (groups, lower, upper) = polars_time::groupby::groupby(
w,
ts,
options.include_boundaries,
options.closed_window,
);
match (&mut lower_bound, &mut upper_bound) {
(None, None) => {
lower_bound = Some(lower);
upper_bound = Some(upper);
}
(Some(lower_bound), Some(upper_bound)) => {
lower_bound.extend_from_slice(&lower);
upper_bound.extend_from_slice(&upper);
}
_ => unreachable!(),
}
groups
})
.flatten()
.collect::<Vec<_>>()
} else {
let mut groups = self.groupby_with_series(by.clone(), true)?.groups;
groups.sort_unstable_by_key(|g| g.0);

groups
.par_iter()
.iter()
.map(|g| {
let offset = g.0;
let dt = unsafe { dt.take_unchecked((g.1.iter().map(|i| *i as usize)).into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let mut sub_groups = polars_time::groupby::groupby(w, ts);
let (mut sub_groups, lower, upper) = polars_time::groupby::groupby(
w,
ts,
options.include_boundaries,
options.closed_window,
);

match (&mut lower_bound, &mut upper_bound) {
(None, None) => {
lower_bound = Some(lower);
upper_bound = Some(upper);
}
(Some(lower_bound), Some(upper_bound)) => {
lower_bound.extend_from_slice(&lower);
upper_bound.extend_from_slice(&upper);
}
_ => unreachable!(),
}

sub_groups.iter_mut().for_each(|g| {
g.0 += offset;
Expand All @@ -74,6 +115,20 @@ impl DataFrame {
if options.truncate {
dt = dt.apply(|v| w.truncate(v));
}

if let (true, Some(lower), Some(higher)) =
(options.include_boundaries, lower_bound, upper_bound)
{
let s = Int64Chunked::new_vec("_lower_boundary", lower)
.into_date()
.into_series();
by.push(s);
let s = Int64Chunked::new_vec("_upper_boundary", higher)
.into_date()
.into_series();
by.push(s);
}

dt.into_date()
.into_series()
.cast(time_type)
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,4 @@ pub use crate::chunked_array::ops::unique::rank::{RankMethod, RankOptions};
pub use crate::chunked_array::ops::rolling_window::RollingOptions;

#[cfg(feature = "dynamic_groupby")]
pub use polars_time::Duration;
pub use polars_time::{groupby::ClosedWindow, Duration};
9 changes: 7 additions & 2 deletions polars/polars-time/src/bounds.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::calendar::timestamp_ns_to_datetime;
use crate::groupby::ClosedWindow;
use crate::unit::TimeNanoseconds;
use std::fmt::{Display, Formatter};

Expand Down Expand Up @@ -42,8 +43,12 @@ impl Bounds {
}

// check if nanoseconds is within bounds
pub fn is_member(&self, t: i64) -> bool {
t >= self.start && t <= self.stop
pub fn is_member(&self, t: i64, closed: ClosedWindow) -> bool {
match closed {
ClosedWindow::None => t >= self.start && t <= self.stop,
ClosedWindow::Left => t > self.start && t <= self.stop,
ClosedWindow::Right => t >= self.start && t < self.stop,
}
}

pub fn is_future(&self, t: i64) -> bool {
Expand Down
10 changes: 9 additions & 1 deletion polars/polars-time/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::calendar::{
};
use crate::unit::TimeNanoseconds;
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use std::ops::{Add, Mul};
use std::ops::{Add, Mul, Sub};

#[derive(Copy, Clone, Debug)]
pub struct Duration {
Expand Down Expand Up @@ -291,6 +291,14 @@ impl Add<Duration> for TimeNanoseconds {
}
}

impl Sub<Duration> for TimeNanoseconds {
type Output = Self;

fn sub(self, rhs: Duration) -> Self::Output {
self + rhs * -1
}
}

fn nsecs_timestamp_to_datetime(ts: i64) -> NaiveDateTime {
let secs = ts / 1_000_000_000;
let nsec = ts % 1_000_000_000;
Expand Down
86 changes: 28 additions & 58 deletions polars/polars-time/src/groupby.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
use crate::bounds::Bounds;
use crate::unit::TimeNanoseconds;
use crate::window::Window;

pub type GroupTuples = Vec<(u32, Vec<u32>)>;

pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {
#[derive(Clone, Copy, Debug)]
pub enum ClosedWindow {
Left,
Right,
None,
}

pub fn groupby(
window: Window,
time: &[i64],
include_boundaries: bool,
closed_window: ClosedWindow,
) -> (GroupTuples, Vec<TimeNanoseconds>, Vec<TimeNanoseconds>) {
let boundary = Bounds::from(time);
let size = if include_boundaries {
window.estimate_overlapping_bounds(boundary)
} else {
0
};
let mut lower_bound = Vec::with_capacity(size);
let mut upper_bound = Vec::with_capacity(size);

let mut group_tuples = Vec::with_capacity(window.estimate_overlapping_bounds(boundary));
let mut latest_start = 0;
Expand All @@ -20,7 +40,7 @@ pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {
skip_window = true;
break;
}
if bi.is_member(t) {
if bi.is_member(t, closed_window) {
break;
}
latest_start += 1;
Expand All @@ -41,7 +61,7 @@ pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {

while i < time.len() {
let t = time[i];
if bi.is_member(t) {
if bi.is_member(t, closed_window) {
group.push(i as u32);
} else if bi.is_future(t) {
break;
Expand All @@ -50,62 +70,12 @@ pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {
}

if !group.is_empty() {
if include_boundaries {
lower_bound.push(bi.start);
upper_bound.push(bi.stop);
}
group_tuples.push((group[0], group))
}
}
group_tuples
}

#[cfg(test)]
mod test {
use super::*;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};

#[test]
fn test_group_tuples() {
let dt = &[
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 0, 0),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 0, 15),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 0, 30),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 0, 45),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 1, 0),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 1, 15),
),
NaiveDateTime::new(
NaiveDate::from_ymd(2001, 1, 1),
NaiveTime::from_hms(1, 1, 30),
),
];

let ts = dt.iter().map(|dt| dt.timestamp_nanos()).collect::<Vec<_>>();
let window = Window::new(
Duration::from_seconds(30),
Duration::from_seconds(30),
Duration::from_seconds(0),
);
let gt = groupby(window, &ts)
.into_iter()
.map(|g| g.1)
.collect::<Vec<_>>();

let expected = &[[0, 1, 2], [2, 3, 4], [4, 5, 6]];
assert_eq!(gt, expected);
}
(group_tuples, lower_bound, upper_bound)
}
4 changes: 2 additions & 2 deletions polars/polars-time/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::bounds::Bounds;
use crate::calendar::timestamp_ns_to_datetime;
use crate::duration::Duration;
use crate::groupby::groupby;
use crate::groupby::{groupby, ClosedWindow};
use crate::window::Window;
use chrono::prelude::*;

Expand Down Expand Up @@ -44,7 +44,7 @@ fn test_groups_large_interval() {

let dur = Duration::parse("2d");
let w = Window::new(Duration::parse("2d"), dur.clone(), Duration::from_nsecs(0));
let groups = groupby(w, &ts);
let (groups, _, _) = groupby(w, &ts, false, ClosedWindow::None);
assert_eq!(groups.len(), 3);
assert_eq!(groups[0], (0, vec![0]));
assert_eq!(groups[1], (1, vec![1]));
Expand Down
20 changes: 19 additions & 1 deletion py-polars/polars/internals/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -2255,9 +2255,21 @@ def groupby_dynamic(
period: Optional[str] = None,
offset: Optional[str] = None,
truncate: bool = True,
include_boundaries: bool = True,
closed: str = "none",
by: Optional[Union[str, tp.List[str], "pli.Expr", tp.List["pli.Expr"]]] = None,
) -> "DynamicGroupBy":
return DynamicGroupBy(self, time_column, every, period, offset, truncate, by)
return DynamicGroupBy(
self,
time_column,
every,
period,
offset,
truncate,
include_boundaries,
closed,
by,
)

def upsample(self, by: str, interval: timedelta) -> "DataFrame":
"""
Expand Down Expand Up @@ -3612,6 +3624,8 @@ def __init__(
period: Optional[str],
offset: Optional[str],
truncate: bool = True,
include_boundaries: bool = True,
closed: str = "none",
by: Optional[Union[str, tp.List[str], "pli.Expr", tp.List["pli.Expr"]]] = None,
):
self.df = df
Expand All @@ -3620,6 +3634,8 @@ def __init__(
self.period = period
self.offset = offset
self.truncate = truncate
self.include_boundaries = include_boundaries
self.closed = closed
self.by = by

def agg(
Expand All @@ -3639,6 +3655,8 @@ def agg(
self.period,
self.offset,
self.truncate,
self.include_boundaries,
self.closed,
self.by,
)
.agg(column_to_agg) # type: ignore[arg-type]
Expand Down
4 changes: 3 additions & 1 deletion py-polars/polars/internals/lazy_frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,8 @@ def groupby_dynamic(
period: Optional[str] = None,
offset: Optional[str] = None,
truncate: bool = True,
include_boundaries: bool = True,
closed: str = "none",
by: Optional[Union[str, tp.List[str], "pli.Expr", tp.List["pli.Expr"]]] = None,
) -> "LazyGroupBy":
if period is None:
Expand All @@ -513,7 +515,7 @@ def groupby_dynamic(
offset = "0ns"
by = _prepare_groupby_inputs(by)
lgb = self._ldf.groupby_dynamic(
time_column, every, period, offset, truncate, by
time_column, every, period, offset, truncate, include_boundaries, closed, by
)
return LazyGroupBy(lgb)

Expand Down

0 comments on commit 74220df

Please sign in to comment.