Skip to content

Commit

Permalink
improve windows and allow combination with groupby on normal keys
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent 46feaf5 commit adf6ef3
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 180 deletions.
61 changes: 47 additions & 14 deletions polars/polars-core/src/frame/groupby/dynamic.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::frame::groupby::GroupTuples;
use crate::prelude::*;
use polars_time::{Duration, Window};
use rayon::prelude::*;

#[derive(Clone, Debug)]
pub struct DynamicGroupOptions {
Expand All @@ -16,34 +17,66 @@ pub struct DynamicGroupOptions {
}

impl DataFrame {
pub fn groupby_dynamic(&self, options: &DynamicGroupOptions) -> Result<(Series, GroupTuples)> {
/// Returns: time_keys, keys, grouptuples
pub fn groupby_dynamic(
&self,
mut by: Vec<Series>,
options: &DynamicGroupOptions,
) -> Result<(Series, Vec<Series>, GroupTuples)> {
let w = Window::new(options.every, options.period, options.offset);

let time = self.column(&options.time_column)?;
let time_type = time.dtype();
if time.null_count() > 0 {
panic!("null values in dynamic groupby not yet supported, fill nulls.")
}

let dt = time.cast(&DataType::Datetime)?;
let dt = dt.datetime().unwrap();

let gt = dt
.downcast_iter()
.map(|vals| {
let ts = vals.values().as_slice();
polars_time::groupby::groupby(w, ts)
})
.flatten()
.collect::<Vec<_>>();
let groups = if by.is_empty() {
dt.downcast_iter()
.map(|vals| {
let ts = vals.values().as_slice();
polars_time::groupby::groupby(w, ts)
})
.flatten()
.collect::<Vec<_>>()
} else {
let mut groups = self.groupby_with_series(by.clone(), true)?.groups;
groups.sort_unstable_by_key(|g| g.0);
groups
.par_iter()
.map(|g| {
let offset = g.0;
let dt = unsafe { dt.take_unchecked((g.1.iter().map(|i| *i as usize)).into()) };
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let mut sub_groups = polars_time::groupby::groupby(w, ts);

sub_groups.iter_mut().for_each(|g| {
g.0 += offset;
for x in g.1.iter_mut() {
*x += offset
}
});
sub_groups
})
.flatten()
.collect::<Vec<_>>()
};

// Safety:
// within bounds
let mut dt = unsafe { dt.take_unchecked(gt.iter().map(|g| g.0 as usize).into()) };
let mut dt = unsafe { dt.take_unchecked(groups.iter().map(|g| g.0 as usize).into()) };
for key in by.iter_mut() {
*key = unsafe { key.take_iter_unchecked(&mut groups.iter().map(|g| g.0 as usize)) };
}

if options.truncate {
dt = dt.apply(|v| w.truncate(v));
}

Ok((dt.into_date().into_series(), gt))
dt.into_date()
.into_series()
.cast(time_type)
.map(|s| (s, by, groups))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,13 @@ impl Executor for GroupByDynamicExec {
#[cfg(feature = "dynamic_groupby")]
{
let df = self.input.execute(state)?;
let keys = self
.keys
.iter()
.map(|e| e.evaluate(&df, state))
.collect::<Result<Vec<_>>>()?;

let (key, groups) = df.groupby_dynamic(&self.options)?;
let (time_key, keys, groups) = df.groupby_dynamic(keys, &self.options)?;

let agg_columns = POOL.install(|| {
self.aggs
Expand All @@ -40,8 +45,9 @@ impl Executor for GroupByDynamicExec {
.collect::<Result<Vec<_>>>()
})?;

let mut columns = Vec::with_capacity(agg_columns.len() + 1);
columns.push(key);
let mut columns = Vec::with_capacity(agg_columns.len() + 1 + keys.len());
columns.extend(keys);
columns.push(time_key);
columns.extend(agg_columns.into_iter().flatten());

DataFrame::new(columns)
Expand Down
6 changes: 5 additions & 1 deletion polars/polars-time/src/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl Bounds {

// check if nanoseconds is within bounds
pub fn is_member(&self, t: i64) -> bool {
t >= self.start && t < self.stop
t >= self.start && t <= self.stop
}

pub fn is_future(&self, t: i64) -> bool {
t > self.stop
}
}
44 changes: 41 additions & 3 deletions polars/polars-time/src/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ impl Duration {
}
}
}
if unit.is_empty() {
panic!("expected a unit in the duration string")
}

match &*unit {
"ns" => nsecs += n,
Expand All @@ -82,8 +85,8 @@ impl Duration {
}
}
Duration {
nsecs,
months,
nsecs: nsecs.abs(),
months: months.abs(),
negative,
}
}
Expand All @@ -96,6 +99,35 @@ impl Duration {
}
}

/// Normalize the duration within the interval.
/// It will ensure that the output duration is the smallest positive
/// duration that is the equivalent of the current duration.
#[allow(dead_code)]
pub(crate) fn normalize(&self, interval: &Duration) -> Self {
if self.months_only() && interval.months_only() {
let mut months = self.months() % interval.months();

match (self.negative, interval.negative) {
(true, true) | (true, false) => months = -months + interval.months(),
_ => {}
}
Duration::from_months(months)
} else {
let mut offset = self.duration();
if offset == 0 {
return *self;
}
let every = interval.duration();

if offset < 0 {
offset += every * ((offset / -every) + 1)
} else {
offset -= every * (offset / every)
}
Duration::from_nsecs(offset)
}
}

/// Creates a [`Duration`] that represents a fixed number of nanoseconds.
pub fn from_nsecs(v: i64) -> Self {
let (negative, nsecs) = Self::to_positive(v);
Expand Down Expand Up @@ -129,6 +161,10 @@ impl Duration {
self.months == 0 && self.nsecs == 0
}

fn months_only(&self) -> bool {
self.months != 0 && self.nsecs == 0
}

pub fn months(&self) -> i64 {
self.months
}
Expand All @@ -138,7 +174,7 @@ impl Duration {
}

/// Estimated duration of the window duration. Not a very good one if months != 0.
pub fn duration(&self) -> TimeNanoseconds {
pub const fn duration(&self) -> TimeNanoseconds {
self.months * 30 * 24 * 3600 * NS_SECOND + self.nsecs
}

Expand Down Expand Up @@ -292,5 +328,7 @@ mod test {
assert_eq!(out.nsecs, 40 * NS_MILLISECOND + 123);
let out = Duration::parse("123ns40ms1w");
assert_eq!(out.nsecs, 40 * NS_MILLISECOND + 123 + NS_WEEK);
let out = Duration::parse("-123ns40ms1w");
assert!(out.negative);
}
}
37 changes: 24 additions & 13 deletions polars/polars-time/src/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,43 @@ pub fn groupby(window: Window, time: &[i64]) -> GroupTuples {
for bi in window.get_overlapping_bounds_iter(boundary) {
let mut group = vec![];

let mut skip_window = false;
// find starting point of window
loop {
latest_start += 1;

match time.get(latest_start - 1) {
Some(ts) => {
if bi.is_member(*ts) {
break;
}
}
None => break,
while latest_start < time.len() {
let t = time[latest_start];
if bi.is_future(t) {
skip_window = true;
break;
}
if bi.is_member(t) {
break;
}
latest_start += 1;
}
if skip_window {
latest_start = latest_start.saturating_sub(1);
continue;
}

// subtract 1 because the next window could also start from the same point
latest_start = latest_start.saturating_sub(1);

// find members of this window
let mut i = latest_start;
loop {
group.push(i as u32);
if i >= time.len() || !bi.is_member(time[i]) {
if i >= time.len() {
break;
}

while i < time.len() {
let t = time[i];
if bi.is_member(t) {
group.push(i as u32);
} else if bi.is_future(t) {
break;
}
i += 1
}

if !group.is_empty() {
group_tuples.push((group[0], group))
}
Expand Down
61 changes: 34 additions & 27 deletions polars/polars-time/src/test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::bounds::Bounds;
use crate::calendar::timestamp_ns_to_datetime;
use crate::duration::Duration;
use crate::groupby::groupby;
use crate::window::Window;
use chrono::prelude::*;

Expand Down Expand Up @@ -29,35 +30,41 @@ fn print_ns(ts: &[i64]) {
}

#[test]
fn test_window_boundaries() {
let range_ns = date_range(10);
fn test_groups_large_interval() {
let dates = &[
NaiveDate::from_ymd(2020, 1, 1),
NaiveDate::from_ymd(2020, 1, 11),
NaiveDate::from_ymd(2020, 1, 12),
NaiveDate::from_ymd(2020, 1, 13),
];
let ts = dates
.iter()
.map(|d| d.and_hms(0, 0, 0).timestamp_nanos())
.collect::<Vec<_>>();

let dur = Duration::parse("2d");
let w = Window::new(Duration::parse("2d"), dur.clone(), Duration::from_nsecs(0));
let groups = groupby(w, &ts);
assert_eq!(groups.len(), 3);
assert_eq!(groups[0], (0, vec![0]));
assert_eq!(groups[1], (1, vec![1]));
assert_eq!(groups[2], (1, vec![1, 2, 3]));
}

#[test]
fn test_offset() {
let t = NaiveDate::from_ymd(2020, 1, 2)
.and_hms(0, 0, 0)
.timestamp_nanos();
let w = Window::new(
Duration::from_minutes(20),
Duration::from_minutes(40),
Duration::from_seconds(0),
Duration::parse("5m"),
Duration::parse("5m"),
Duration::parse("-2m"),
);
// wrapping_boundary (
let boundary = Bounds::from(&range_ns);
let overlapping_bounds = w.get_overlapping_bounds(boundary);

let hm_start = overlapping_bounds
.iter()
.map(|b| {
let dt = timestamp_ns_to_datetime(b.start);
(dt.hour(), dt.minute())
})
.collect::<Vec<_>>();
let expected = &[(0, 0), (0, 20), (0, 40), (1, 0), (1, 20), (1, 40)];
assert_eq!(hm_start, expected);

let hm_stop = overlapping_bounds
.iter()
.map(|b| {
let dt = timestamp_ns_to_datetime(b.stop);
(dt.hour(), dt.minute())
})
.collect::<Vec<_>>();
let expected = &[(0, 40), (1, 0), (1, 20), (1, 40), (2, 0), (2, 20)];
assert_eq!(hm_stop, expected);
let b = w.get_earliest_bounds(t);
let start = NaiveDate::from_ymd(2020, 1, 1)
.and_hms(23, 58, 0)
.timestamp_nanos();
assert_eq!(b.start, start);
}
9 changes: 4 additions & 5 deletions polars/polars-time/src/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ impl Window {
}

pub fn truncate(&self, t: TimeNanoseconds) -> TimeNanoseconds {
self.every.truncate_nanoseconds(t)
self.every.truncate_nanoseconds(t) + self.offset
}

/// returns the bounds for the earliest window bounds
/// that contains the given time t. For underlapping windows that
/// do not contain time t, the window directly after time t will be returned.
pub fn get_earliest_bounds(&self, t: TimeNanoseconds) -> Bounds {
// translate offset
let t = t + (self.offset * -1);
//
let stop = self.truncate(t) + self.every + self.offset;
// original code translates offset here
// we don't. Seems unintuitive to me.
let stop = self.truncate(t) + self.every;
let start = stop + self.period * -1;

Bounds::new(start, stop)
Expand Down

0 comments on commit adf6ef3

Please sign in to comment.