Skip to content

Commit

Permalink
feat(rust): truncate by calendar weeks (#5759)
Browse files Browse the repository at this point in the history
  • Loading branch information
cannero committed Dec 27, 2022
1 parent 1ca1307 commit 7e1ddb9
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 31 deletions.
130 changes: 109 additions & 21 deletions polars/polars-time/src/windows/duration.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::cmp::Ordering;
use std::ops::Mul;

use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Weekday};
use polars_arrow::export::arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime, MILLISECONDS,
};
Expand All @@ -23,6 +23,8 @@ use super::calendar::{
pub struct Duration {
// the number of months for the duration
months: i64,
// the number of weeks for the duration
weeks: i64,
// the number of nanoseconds for the duration
nsecs: i64,
// indicates if the duration is negative
Expand All @@ -48,6 +50,7 @@ impl Duration {
pub fn new(fixed_slots: i64) -> Self {
Duration {
months: 0,
weeks: 0,
nsecs: fixed_slots.abs(),
negative: fixed_slots < 0,
parsed_int: true,
Expand Down Expand Up @@ -79,6 +82,7 @@ impl Duration {
}

let mut nsecs = 0;
let mut weeks = 0;
let mut months = 0;
let mut iter = duration.char_indices();
let negative = duration.starts_with('-');
Expand Down Expand Up @@ -125,7 +129,7 @@ impl Duration {
"m" => nsecs += n * NS_MINUTE,
"h" => nsecs += n * NS_HOUR,
"d" => nsecs += n * NS_DAY,
"w" => nsecs += n * NS_WEEK,
"w" => weeks += n,
"mo" => months += n,
"y" => months += n * 12,
// we will read indexes as nanoseconds
Expand All @@ -140,6 +144,7 @@ impl Duration {
}
Duration {
nsecs: nsecs.abs(),
weeks: weeks.abs(),
months: months.abs(),
negative,
parsed_int,
Expand Down Expand Up @@ -167,6 +172,14 @@ impl Duration {
_ => {}
}
Duration::from_months(months)
} else if self.weeks_only() && interval.weeks_only() {
let mut weeks = self.weeks() % interval.weeks();

match (self.negative, interval.negative) {
(true, true) | (true, false) => weeks = -weeks + interval.weeks(),
_ => {}
}
Duration::from_weeks(weeks)
} else {
let mut offset = self.duration_ns();
if offset == 0 {
Expand All @@ -188,6 +201,7 @@ impl Duration {
let (negative, nsecs) = Self::to_positive(v);
Self {
months: 0,
weeks: 0,
nsecs,
negative,
parsed_int: false,
Expand All @@ -199,6 +213,19 @@ impl Duration {
let (negative, months) = Self::to_positive(v);
Self {
months,
weeks: 0,
nsecs: 0,
negative,
parsed_int: false,
}
}

/// Creates a [`Duration`] that represents a fixed number of weeks.
pub(crate) fn from_weeks(v: i64) -> Self {
let (negative, weeks) = Self::to_positive(v);
Self {
months: 0,
weeks,
nsecs: 0,
negative,
parsed_int: false,
Expand All @@ -207,17 +234,26 @@ impl Duration {

/// `true` if zero duration.
pub fn is_zero(&self) -> bool {
self.months == 0 && self.nsecs == 0
self.months == 0 && self.weeks == 0 && self.nsecs == 0
}

pub fn months_only(&self) -> bool {
self.months != 0 && self.nsecs == 0
self.months != 0 && self.weeks == 0 && self.nsecs == 0
}

pub fn months(&self) -> i64 {
self.months
}

pub fn weeks_only(&self) -> bool {
self.months == 0 && self.weeks != 0 && self.nsecs == 0
}

pub fn weeks(&self) -> i64 {
self.weeks
}

/// Returns the nanoseconds from the `Duration` without the weeks or months part.
pub fn nanoseconds(&self) -> i64 {
self.nsecs
}
Expand All @@ -226,19 +262,20 @@ impl Duration {
#[cfg(feature = "private")]
#[doc(hidden)]
pub const fn duration_ns(&self) -> i64 {
self.months * 28 * 24 * 3600 * NANOSECONDS + self.nsecs
self.months * 28 * 24 * 3600 * NANOSECONDS + self.weeks * NS_WEEK + self.nsecs
}

#[cfg(feature = "private")]
#[doc(hidden)]
pub const fn duration_us(&self) -> i64 {
self.months * 28 * 24 * 3600 * MICROSECONDS + self.nsecs / 1000
self.months * 28 * 24 * 3600 * MICROSECONDS + (self.weeks * NS_WEEK + self.nsecs) / 1000
}

#[cfg(feature = "private")]
#[doc(hidden)]
pub const fn duration_ms(&self) -> i64 {
self.months * 28 * 24 * 3600 * MILLISECONDS + self.nsecs / 1_000_000
self.months * 28 * 24 * 3600 * MILLISECONDS
+ (self.weeks * NS_WEEK + self.nsecs) / 1_000_000
}

#[inline]
Expand All @@ -254,19 +291,28 @@ impl Duration {
G: Fn(i64) -> NaiveDateTime,
J: Fn(NaiveDateTime) -> i64,
{
match (self.months, self.nsecs) {
(0, 0) => panic!("duration may not be zero"),
match (self.months, self.weeks, self.nsecs) {
(0, 0, 0) => panic!("duration may not be zero"),
// truncate by ns/us/ms
(0, _) => {
(0, 0, _) => {
let duration = nsecs_to_unit(self.nsecs);
let mut remainder = t % duration;
if remainder < 0 {
remainder += duration
}
t - remainder
}
// truncate by weeks
(0, _, 0) => {
let dt = timestamp_to_datetime(t).date();
let week_timestamp = dt.week(Weekday::Mon);
let first_day_of_week =
week_timestamp.first_day() - chrono::Duration::weeks(self.weeks - 1);

datetime_to_timestamp(first_day_of_week.and_time(NaiveTime::default()))
}
// truncate by months
(_, 0) => {
(_, 0, 0) => {
let ts = timestamp_to_datetime(t);
let (year, month) = (ts.year(), ts.month());

Expand All @@ -278,10 +324,11 @@ impl Duration {

// recreate a new time from the year and month combination
let (year, month) = ((total / 12), ((total % 12) + 1) as u32);

let dt = new_datetime(year, month, 1, 0, 0, 0, 0);
datetime_to_timestamp(dt)
}
_ => panic!("duration may not mix month and nanosecond units"),
_ => panic!("duration may not mix month, weeks and nanosecond units"),
}
}

Expand Down Expand Up @@ -318,15 +365,17 @@ impl Duration {
)
}

fn add_impl_month<F, G>(
fn add_impl_month_or_week<F, G, J>(
&self,
t: i64,
timestamp_to_datetime: F,
datetime_to_timestamp: G,
nsecs_to_unit: F,
timestamp_to_datetime: G,
datetime_to_timestamp: J,
) -> i64
where
F: Fn(i64) -> NaiveDateTime,
G: Fn(NaiveDateTime) -> i64,
F: Fn(i64) -> i64,
G: Fn(i64) -> NaiveDateTime,
J: Fn(NaiveDateTime) -> i64,
{
let d = self;
let mut new_t = t;
Expand Down Expand Up @@ -376,26 +425,47 @@ impl Duration {
let dt = new_datetime(year, month as u32, day, hour, minute, sec, nsec);
new_t = datetime_to_timestamp(dt);
}

if d.weeks > 0 {
let t_weeks = nsecs_to_unit(self.weeks * NS_WEEK);
new_t += if d.negative { -t_weeks } else { t_weeks };
}

new_t
}

pub fn add_ns(&self, t: i64) -> i64 {
let d = self;
let new_t = self.add_impl_month(t, timestamp_ns_to_datetime, datetime_to_timestamp_ns);
let new_t = self.add_impl_month_or_week(
t,
|nsecs| nsecs,
timestamp_ns_to_datetime,
datetime_to_timestamp_ns,
);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs
}

pub fn add_us(&self, t: i64) -> i64 {
let d = self;
let new_t = self.add_impl_month(t, timestamp_us_to_datetime, datetime_to_timestamp_us);
let new_t = self.add_impl_month_or_week(
t,
|nsecs| nsecs / 1000,
timestamp_us_to_datetime,
datetime_to_timestamp_us,
);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs / 1_000
}

pub fn add_ms(&self, t: i64) -> i64 {
let d = self;
let new_t = self.add_impl_month(t, timestamp_ms_to_datetime, datetime_to_timestamp_ms);
let new_t = self.add_impl_month_or_week(
t,
|nsecs| nsecs / 1_000_000,
timestamp_ms_to_datetime,
datetime_to_timestamp_ms,
);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs / 1_000_000
}
Expand All @@ -410,6 +480,7 @@ impl Mul<i64> for Duration {
self.negative = !self.negative
}
self.months *= rhs;
self.weeks *= rhs;
self.nsecs *= rhs;
self
}
Expand Down Expand Up @@ -443,8 +514,25 @@ mod test {
let out = Duration::parse("123ns40ms");
assert_eq!(out.nsecs, 40 * NS_MILLISECOND + 123);
let out = Duration::parse("123ns40ms1w");
assert_eq!(out.nsecs, 40 * NS_MILLISECOND + 123 + NS_WEEK);
assert_eq!(out.nsecs, 40 * NS_MILLISECOND + 123);
assert_eq!(out.duration_ns(), 40 * NS_MILLISECOND + 123 + NS_WEEK);
let out = Duration::parse("-123ns40ms1w");
assert!(out.negative);
let out = Duration::parse("5w");
assert_eq!(out.weeks(), 5);
}

#[test]
fn test_add_ns() {
let t = 1;
let seven_days = Duration::parse("7d");
let one_week = Duration::parse("1w");

assert_eq!(seven_days.add_ns(t), one_week.add_ns(t));

let seven_days_negative = Duration::parse("-7d");
let one_week_negative = Duration::parse("-1w");

assert_eq!(seven_days_negative.add_ns(t), one_week_negative.add_ns(t));
}
}
6 changes: 3 additions & 3 deletions polars/polars-time/src/windows/groupby.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ pub(crate) fn groupby_values_iter_full_lookbehind(
tu: TimeUnit,
start_offset: usize,
) -> impl Iterator<Item = (IdxSize, IdxSize)> + TrustedLen + '_ {
debug_assert!(offset.nanoseconds() >= period.nanoseconds());
debug_assert!(offset.duration_ns() >= period.duration_ns());
debug_assert!(offset.negative);

let add = match tu {
Expand Down Expand Up @@ -417,12 +417,12 @@ pub fn groupby_values(

// we have a (partial) lookbehind window
if offset.negative {
if offset.nanoseconds() >= period.nanoseconds() {
if offset.duration_ns() >= period.duration_ns() {
// lookbehind
// window is within 2 periods length of t
// ------t---
// [------]
if offset.nanoseconds() < period.nanoseconds() * 2 {
if offset.duration_ns() < period.duration_ns() * 2 {
let vals = thread_offsets
.par_iter()
.copied()
Expand Down
6 changes: 3 additions & 3 deletions polars/polars-time/src/windows/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,20 @@ impl Window {

/// Round the given ns timestamp by the window boundary.
pub fn round_ns(&self, t: i64) -> i64 {
let t = t + self.every.nanoseconds() / 2_i64;
let t = t + self.every.duration_ns() / 2_i64;
self.truncate_ns(t)
}

/// Round the given us timestamp by the window boundary.
pub fn round_us(&self, t: i64) -> i64 {
let t = t + self.every.nanoseconds()
let t = t + self.every.duration_ns()
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Microsecond) as i64);
self.truncate_us(t)
}

/// Round the given ms timestamp by the window boundary.
pub fn round_ms(&self, t: i64) -> i64 {
let t = t + self.every.nanoseconds()
let t = t + self.every.duration_ns()
/ (2 * timeunit_scale(ArrowTimeUnit::Nanosecond, ArrowTimeUnit::Millisecond) as i64);
self.truncate_ms(t)
}
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/expr/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def truncate(
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1w # 1 calendar week starting at Monday
1mo # 1 calendar month
1y # 1 calendar year
Expand Down Expand Up @@ -181,7 +181,7 @@ def round(
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1w # 1 calendar week
1mo # 1 calendar month
1y # 1 calendar year
Expand Down
4 changes: 2 additions & 2 deletions py-polars/polars/internals/series/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ def truncate(
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1w # 1 calendar week
1mo # 1 calendar month
1y # 1 calendar year
Expand Down Expand Up @@ -1457,7 +1457,7 @@ def round(
1m # 1 minute
1h # 1 hour
1d # 1 day
1w # 1 week
1w # 1 calendar week
1mo # 1 calendar month
1y # 1 calendar year
Expand Down

0 comments on commit 7e1ddb9

Please sign in to comment.