Skip to content

Commit

Permalink
fix lower bound of dynamic_groupby of larger intervals (#2711)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 20, 2022
1 parent f741284 commit 71c256a
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 44 deletions.
2 changes: 1 addition & 1 deletion polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ test = [
]

# don't use this
private = ["polars-lazy/private"]
private = ["polars-lazy/private", "polars-core/private", "polars-time/private"]

# all opt-in datatypes
dtype-full = [
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub use arrow;
#[cfg(all(feature = "private", feature = "temporal"))]
#[cfg(feature = "temporal")]
pub use chrono;

#[cfg(feature = "private")]
Expand Down
9 changes: 9 additions & 0 deletions polars/polars-core/src/named_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,15 @@ impl<T: PolarsNumericType> ChunkedArray<T> {
}
}

/// For any [`ChunkedArray`] and [`Series`]
impl<T: IntoSeries> NamedFrom<T, T> for Series {
fn new(name: &str, t: T) -> Self {
let mut s = t.into_series();
s.rename(name);
s
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-time/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ polars-core = { version = "0.19.0", path = "../polars-core", features = ["tempor
[features]
dtype-date = []
dtype-datetime = []
private = []

default = ["private"]
24 changes: 23 additions & 1 deletion polars/polars-time/src/date_range.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ pub fn in_nanoseconds_window(ndt: &NaiveDateTime) -> bool {
!(ndt.year() > 2554 || ndt.year() < 1386)
}

pub fn date_range(
#[cfg(feature = "private")]
#[doc(hidden)]
pub fn date_range_impl(
name: &str,
start: i64,
stop: i64,
Expand All @@ -18,3 +20,23 @@ pub fn date_range(
Int64Chunked::new_vec(name, date_range_vec(start, stop, every, closed, tu))
.into_datetime(tu, None)
}

/// Create a [`DateTimeChunked`] from a given `start` and `stop` date and a given `every` interval.
pub fn date_range(
name: &str,
start: NaiveDateTime,
stop: NaiveDateTime,
every: Duration,
closed: ClosedWindow,
tu: TimeUnit,
) -> DatetimeChunked {
let (start, stop) = match tu {
TimeUnit::Nanoseconds => (start.timestamp_nanos(), stop.timestamp_nanos()),
TimeUnit::Microseconds => (
start.timestamp() + start.timestamp_subsec_micros() as i64,
stop.timestamp() + stop.timestamp_subsec_millis() as i64,
),
TimeUnit::Milliseconds => (start.timestamp_millis(), stop.timestamp_millis()),
};
date_range_impl(name, start, stop, every, closed, tu)
}
36 changes: 10 additions & 26 deletions polars/polars-time/src/groupby/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ impl Wrap<&DataFrame> {
)
}

let time = self.0.column(&options.index_column)?;
let time = self.0.column(&options.index_column)?.rechunk();
let time_type = time.dtype();

if time.null_count() > 0 {
Expand Down Expand Up @@ -203,28 +203,12 @@ impl Wrap<&DataFrame> {
};

let groups = if by.is_empty() {
let mut groups_slice = dt
.downcast_iter()
.map(|vals| {
let ts = vals.values().as_slice();
let (groups, lower, upper) = groupby_windows(
w,
ts,
options.include_boundaries,
options.closed_window,
tu,
);
update_bounds(lower, upper);
groups
})
.collect::<Vec<_>>();
// we don't flatmap because in case of a single chunk we don't need to reallocate the inner vec,
// just pop it.
if groups_slice.len() == 1 {
GroupsProxy::Slice(groups_slice.pop().unwrap())
} else {
GroupsProxy::Slice(groups_slice.into_iter().flatten().collect())
}
let vals = dt.downcast_iter().next().unwrap();
let ts = vals.values().as_slice();
let (groups, lower, upper) =
groupby_windows(w, ts, options.include_boundaries, options.closed_window, tu);
update_bounds(lower, upper);
GroupsProxy::Slice(groups)
} else {
let groups = self
.0
Expand Down Expand Up @@ -432,7 +416,7 @@ mod test {
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(3, 0, 0)
.timestamp_millis();
let range = date_range(
let range = date_range_impl(
"date",
start,
stop,
Expand Down Expand Up @@ -478,7 +462,7 @@ mod test {
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(3, 0, 0)
.timestamp_millis();
let range = date_range(
let range = date_range_impl(
"_upper_boundary",
start,
stop,
Expand All @@ -496,7 +480,7 @@ mod test {
let stop = NaiveDate::from_ymd(2021, 12, 16)
.and_hms(2, 0, 0)
.timestamp_millis();
let range = date_range(
let range = date_range_impl(
"_lower_boundary",
start,
stop,
Expand Down
14 changes: 10 additions & 4 deletions polars/polars-time/src/upsample.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,16 @@ fn upsample_single_impl(
TimeUnit::Microseconds => offset.add_us(first),
TimeUnit::Milliseconds => offset.add_ms(first),
};
let range =
date_range(index_col_name, first, last, every, ClosedWindow::Both, *tu)
.into_series()
.into_frame();
let range = date_range_impl(
index_col_name,
first,
last,
every,
ClosedWindow::Both,
*tu,
)
.into_series()
.into_frame();
range.join(
source,
&[index_col_name],
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-time/src/windows/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl Duration {
self.months == 0 && self.nsecs == 0
}

fn months_only(&self) -> bool {
pub(crate) fn months_only(&self) -> bool {
self.months != 0 && self.nsecs == 0
}

Expand Down Expand Up @@ -212,7 +212,7 @@ impl Duration {
{
match (self.months, self.nsecs) {
(0, 0) => panic!("duration may not be zero"),
// truncate by milliseconds
// truncate by ns/us/ms
(0, _) => {
let duration = nsecs_to_unit(self.nsecs);
let mut remainder = t % duration;
Expand Down
47 changes: 40 additions & 7 deletions polars/polars-time/src/windows/window.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::prelude::*;
use polars_arrow::export::arrow::temporal_conversions::{MILLISECONDS, NANOSECONDS};
use polars_core::export::arrow::temporal_conversions::MICROSECONDS;
use polars_core::prelude::*;
use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY;

/// Represents a window in time
#[derive(Copy, Clone)]
Expand All @@ -21,11 +24,13 @@ impl Window {
}
}

/// Truncate the given ns timestamp by the window boundary.
pub fn truncate_ns(&self, t: i64) -> i64 {
let t = self.every.truncate_ns(t);
self.offset.add_ns(t)
}

/// Truncate the given ns timestamp by the window boundary.
pub fn truncate_us(&self, t: i64) -> i64 {
let t = self.every.truncate_us(t);
self.offset.add_us(t)
Expand All @@ -52,26 +57,54 @@ impl Window {
/// returns the bounds for the earliest window bounds
/// that contains the given time t. For underlapping windows that
/// do not contain time t, the window directly after time t will be returned.
///
/// For `every` larger than `1day` we just take the given timestamp `t` as start as truncation
/// does not seems intuitive.
/// Below 1 day, it make sense to truncate to:
/// - days
/// - hours
/// - 15 minutes
/// - etc.
///
/// But for 2w3d, it does not make sense to start it on a different lower bound, so we start at `t`
pub fn get_earliest_bounds_ns(&self, t: i64) -> Bounds {
// original code translates offset here
// we don't. Seems unintuitive to me.
let start = self.truncate_ns(t);
let start = if !self.every.months_only()
&& self.every.duration_ns() > NANOSECONDS * SECONDS_IN_DAY
{
t
} else {
// original code translates offset here
// we don't. Seems unintuitive to me.
self.truncate_ns(t)
};

let stop = self.period.add_ns(start);

Bounds::new_checked(start, stop)
}

pub fn get_earliest_bounds_us(&self, t: i64) -> Bounds {
// original code translates offset here
// we don't. Seems unintuitive to me.
let start = self.truncate_us(t);
let start = if !self.every.months_only()
&& self.every.duration_us() > MICROSECONDS * SECONDS_IN_DAY
{
t
} else {
self.truncate_us(t)
};
let stop = self.period.add_us(start);

Bounds::new_checked(start, stop)
}

pub fn get_earliest_bounds_ms(&self, t: i64) -> Bounds {
let start = self.truncate_ms(t);
let start = if !self.every.months_only()
&& self.every.duration_ms() > MILLISECONDS * SECONDS_IN_DAY
{
t
} else {
self.truncate_ms(t)
};

let stop = self.period.add_ms(start);

Bounds::new_checked(start, stop)
Expand Down
2 changes: 1 addition & 1 deletion polars/src/export.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
/// re-exports for utility
pub use polars_core::export::arrow;
pub use polars_core::export::*;
49 changes: 49 additions & 0 deletions polars/tests/it/lazy/groupby_dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use super::*;
use polars::export::chrono::prelude::*;

#[test]
#[cfg(all(
feature = "temporal",
feature = "dtype-date",
feature = "dynamic_groupby"
))]
fn test_groupby_dynamic_week_bounds() -> Result<()> {
let start = NaiveDate::from_ymd(2022, 2, 1).and_hms(0, 0, 0);
let stop = NaiveDate::from_ymd(2022, 2, 14).and_hms(0, 0, 0);
let range = date_range(
"dt",
start,
stop,
Duration::parse("1d"),
ClosedWindow::Left,
TimeUnit::Milliseconds,
)
.into_series();

let a = Int32Chunked::full("a", 1, range.len());
let df = df![
"dt" => range,
"a" => a
]?;

let out = df
.lazy()
.groupby_dynamic(
[],
DynamicGroupOptions {
index_column: "dt".into(),
every: Duration::parse("1w"),
period: Duration::parse("1w"),
offset: Duration::parse("0w"),
closed_window: ClosedWindow::Left,
truncate: false,
include_boundaries: true,
},
)
.agg([col("a").sum()])
.collect()?;
let a = out.column("a")?;
assert_eq!(a.get(0), AnyValue::Int32(7));
assert_eq!(a.get(1), AnyValue::Int32(6));
Ok(())
}
1 change: 1 addition & 0 deletions polars/tests/it/lazy/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod explodes;
mod groupby_dynamic;
mod projection_queries;
mod window_expressions;

Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ fn py_date_range(
"ms" => TimeUnit::Milliseconds,
_ => panic!("{}", "expected one of {'ns', 'ms'}"),
};
polars::time::date_range(name, start, stop, Duration::parse(every), closed.0, tu)
polars::time::date_range_impl(name, start, stop, Duration::parse(every), closed.0, tu)
.into_series()
.into()
}
Expand Down

0 comments on commit 71c256a

Please sign in to comment.