Skip to content

Commit

Permalink
add grouptuples to time-window
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Dec 17, 2021
1 parent 37c74b4 commit 9ac3d8e
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 6 deletions.
9 changes: 7 additions & 2 deletions polars/polars-time/src/bounds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ pub struct Bounds {
impl Display for Bounds {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let start = timestamp_ns_to_datetime(*self.start);
let end = timestamp_ns_to_datetime(*self.stop);
write!(f, "Bounds: {} -> {}", start, end)
let stop = timestamp_ns_to_datetime(*self.stop);
write!(f, "Bounds: {} -> {}", start, stop)
}
}

Expand Down Expand Up @@ -40,4 +40,9 @@ impl Bounds {
pub fn is_empty(&self) -> bool {
*self.stop == *self.start
}

// check if nanoseconds is within bounds
pub fn is_member(&self, t: i64) -> bool {
t >= *self.start && t < *self.stop
}
}
73 changes: 73 additions & 0 deletions polars/polars-time/src/groupby.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use crate::bounds::Bounds;
use crate::calendar::timestamp_ns_to_datetime;
use crate::duration::Duration;
use crate::window::Window;

pub fn groupby(window: Window, time: &[i64]) -> Vec<Vec<u32>> {

let mut boundary = Bounds::from(time);

let mut group_tuples = Vec::with_capacity(window.estimate_overlapping_bounds(boundary));
let mut latest_start = 0;

for bi in window.get_overlapping_bounds_iter(boundary) {
let mut group = vec![];
loop {
latest_start += 1;

match time.get(latest_start - 1) {
Some(ts) => {
if bi.is_member(*ts) {
break
}
}
None => {
break
}
}
}

latest_start = latest_start.saturating_sub(1);
let mut i = latest_start;
loop {
group.push(i as u32);
if i >= time.len() || !bi.is_member(time[i]){
break
}
i += 1
}
group_tuples.push(group)
}
group_tuples
}

#[cfg(test)]
mod test {
use super::*;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime};

#[test]
fn test_group_tuples(){
let dt = &[
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 0, 0)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 0, 15)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 0, 30)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 0, 45)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 1, 0)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 1, 15)),
NaiveDateTime::new(NaiveDate::from_ymd(2001, 1, 1), NaiveTime::from_hms(1, 1, 30)),
];

let ts = dt.iter().map(|dt| dt.timestamp_nanos()).collect::<Vec<_>>();
let window = Window::new(Duration::from_seconds(30), Duration::from_seconds(30), Duration::from_seconds(0));
let gt = groupby(window, &ts);

let expected = &[
[0, 1, 2],
[2, 3, 4],
[4, 5, 6]
];
assert_eq!(gt, expected);
}

}
3 changes: 3 additions & 0 deletions polars/polars-time/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// Ported and adapted from influxdb.
// Credits to their work.
// https://github.com/influxdata/influxdb_iox/blob/main/query/src/func/window/internal.rs
// https://github.com/influxdata/flux/blob/3d6c47d9113fe0d919ddd3d4eef242dfc38ab2fb/interval/window.go
// https://github.com/influxdata/flux/blob/1e9bfd49f21c0e679b42acf6fc515ce05c6dec2b/values/time.go#L40
Expand All @@ -9,3 +11,4 @@ mod duration;
mod test;
mod unit;
mod window;
mod groupby;
16 changes: 12 additions & 4 deletions polars/polars-time/src/window.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use crate::bounds::Bounds;
use crate::calendar::timestamp_ns_to_datetime;
use crate::duration::Duration;
use crate::unit::TimeNanoseconds;

Expand Down Expand Up @@ -26,24 +27,30 @@ impl Window {
self.every.truncate_nanoseconds(*t).into()
}

/// GetEarliestBounds returns the bounds for the earliest window bounds
/// returns the bounds for the earliest window bounds
/// that contains the given time t. For underlapping windows that
/// do not contain time t, the window directly after time t will be returned.
pub fn get_earliest_bounds(&self, t: TimeNanoseconds) -> Bounds {
// translate offset
let t = t + (self.offset * -1);
//
let stop = self.truncate(t) + self.every + self.offset;
let start = stop + self.period * -1;

Bounds::new(start, stop)
}

pub(crate) fn estimate_overlapping_bounds(&self, boundary: Bounds) -> usize {
(*boundary.duration() / *self.every.duration()
+ *self.period.duration() / *self.every.duration()) as usize
}

pub fn get_overlapping_bounds(&self, boundary: Bounds) -> Vec<Bounds> {
if boundary.is_empty() {
return vec![];
} else {
// estimate size
let size = (*boundary.duration() / *self.every.duration()
+ *self.period.duration() / *self.every.duration()) as usize;
let size = self.estimate_overlapping_bounds(boundary);
let mut out_bounds = Vec::with_capacity(size);

for bi in self.get_overlapping_bounds_iter(boundary) {
Expand Down Expand Up @@ -81,9 +88,10 @@ impl Iterator for BoundsIter {

fn next(&mut self) -> Option<Self::Item> {
if *self.bi.start < *self.boundary.stop {
let out = self.bi;
self.bi.start = self.bi.start + self.window.every;
self.bi.stop = self.bi.stop + self.window.every;
Some(self.bi)
Some(out)
} else {
None
}
Expand Down

0 comments on commit 9ac3d8e

Please sign in to comment.