Skip to content

Commit

Permalink
make timeunits handling more dry
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Feb 13, 2022
1 parent 3f26c55 commit 95f776d
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 129 deletions.
17 changes: 12 additions & 5 deletions polars/polars-core/src/chunked_array/temporal/conversion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,22 +37,29 @@ impl From<&AnyValue<'_>> for NaiveTime {

// Used by lazy for literal conversion
#[cfg(feature = "private")]
pub fn naive_datetime_to_datetime_ns(v: &NaiveDateTime) -> i64 {
pub fn datetime_to_timestamp_ns(v: NaiveDateTime) -> i64 {
v.timestamp_nanos()
}

// Used by lazy for literal conversion
#[cfg(feature = "private")]
pub fn naive_datetime_to_datetime_ms(v: &NaiveDateTime) -> i64 {
pub fn datetime_to_timestamp_ms(v: NaiveDateTime) -> i64 {
v.timestamp_millis()
}

pub(crate) fn naive_datetime_to_date(v: &NaiveDateTime) -> i32 {
(naive_datetime_to_datetime_ms(v) / (MILLISECONDS * SECONDS_IN_DAY)) as i32
// Used by lazy for literal conversion
#[cfg(feature = "private")]
pub fn datetime_to_timestamp_us(v: NaiveDateTime) -> i64 {
let us = v.timestamp() * 1_000_000;
us + v.timestamp_subsec_micros() as i64
}

pub(crate) fn naive_datetime_to_date(v: NaiveDateTime) -> i32 {
(datetime_to_timestamp_ms(v) / (MILLISECONDS * SECONDS_IN_DAY)) as i32
}

pub(crate) fn naive_date_to_date(nd: NaiveDate) -> i32 {
let nt = NaiveTime::from_hms(0, 0, 0);
let ndt = NaiveDateTime::new(nd, nt);
naive_datetime_to_date(&ndt)
naive_datetime_to_date(ndt)
}
2 changes: 1 addition & 1 deletion polars/polars-core/src/chunked_array/temporal/date.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arrow::temporal_conversions::date32_to_date;
pub(crate) fn naive_date_to_date(nd: NaiveDate) -> i32 {
let nt = NaiveTime::from_hms(0, 0, 0);
let ndt = NaiveDateTime::new(nd, nt);
naive_datetime_to_date(&ndt)
naive_datetime_to_date(ndt)
}

impl DateChunked {
Expand Down
26 changes: 11 additions & 15 deletions polars/polars-core/src/chunked_array/temporal/datetime.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::conversion::{naive_datetime_to_datetime_ms, naive_datetime_to_datetime_ns};
use super::conversion::{datetime_to_timestamp_ms, datetime_to_timestamp_ns};
use super::*;
use crate::prelude::DataType::Datetime;
use crate::prelude::*;
Expand Down Expand Up @@ -176,10 +176,10 @@ impl DatetimeChunked {
tu: TimeUnit,
) -> Self {
let func = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns,
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
};
let vals = v.into_iter().map(|nd| func(&nd)).collect::<Vec<_>>();
let vals = v.into_iter().map(func).collect::<Vec<_>>();
Int64Chunked::from_vec(name, vals).into_datetime(tu, None)
}

Expand All @@ -189,27 +189,23 @@ impl DatetimeChunked {
tu: TimeUnit,
) -> Self {
let func = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns,
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
};
let vals = v.into_iter().map(|opt_nd| opt_nd.map(|nd| (func(&nd))));
let vals = v.into_iter().map(|opt_nd| opt_nd.map(func));
Int64Chunked::from_iter_options(name, vals).into_datetime(tu, None)
}

pub fn parse_from_str_slice(name: &str, v: &[&str], fmt: &str, tu: TimeUnit) -> Self {
let func = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns,
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
};

Int64Chunked::from_iter_options(
name,
v.iter().map(|s| {
NaiveDateTime::parse_from_str(s, fmt)
.ok()
.as_ref()
.map(func)
}),
v.iter()
.map(|s| NaiveDateTime::parse_from_str(s, fmt).ok().map(func)),
)
.into_datetime(tu, None)
}
Expand Down
23 changes: 8 additions & 15 deletions polars/polars-core/src/chunked_array/temporal/utf8.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,8 @@ impl Utf8Chunked {
};

let func = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns,
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
};

let mut ca: Int64Chunked = self
Expand All @@ -249,7 +249,7 @@ impl Utf8Chunked {
if s.is_empty() {
return None;
}
match NaiveDateTime::parse_from_str(s, fmt).map(|dt| func(&dt)) {
match NaiveDateTime::parse_from_str(s, fmt).map(func) {
Ok(nd) => return Some(nd),
Err(e) => {
let e: ParseErrorByteCopy = e.into();
Expand Down Expand Up @@ -316,27 +316,20 @@ impl Utf8Chunked {
};

let func = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns,
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
};

let mut ca: Int64Chunked = match self.has_validity() {
false => self
.into_no_null_iter()
.map(|s| {
NaiveDateTime::parse_from_str(s, fmt)
.ok()
.map(|dt| func(&dt))
})
.map(|s| NaiveDateTime::parse_from_str(s, fmt).ok().map(func))
.collect_trusted(),
_ => self
.into_iter()
.map(|opt_s| {
let opt_nd = opt_s.map(|s| {
NaiveDateTime::parse_from_str(s, fmt)
.ok()
.map(|dt| func(&dt))
});
let opt_nd =
opt_s.map(|s| NaiveDateTime::parse_from_str(s, fmt).ok().map(func));
match opt_nd {
None => None,
Some(None) => None,
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ impl PhysicalExpr for LiteralExpr {
DateTime(ndt, tu) => {
use polars_core::chunked_array::temporal::conversion::*;
let timestamp = match tu {
TimeUnit::Nanoseconds => naive_datetime_to_datetime_ns(ndt),
TimeUnit::Milliseconds => naive_datetime_to_datetime_ms(ndt),
TimeUnit::Nanoseconds => datetime_to_timestamp_ns(*ndt),
TimeUnit::Milliseconds => datetime_to_timestamp_ms(*ndt),
};
Int64Chunked::full("literal", timestamp, 1)
.into_datetime(*tu, None)
Expand Down
166 changes: 75 additions & 91 deletions polars/polars-time/src/windows/duration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use super::calendar::{
};
use chrono::{Datelike, NaiveDate, NaiveDateTime, NaiveTime, Timelike};
use polars_arrow::export::arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, MILLISECONDS,
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime, MILLISECONDS,
};
use polars_core::prelude::{
datetime_to_timestamp_ms, datetime_to_timestamp_ns, datetime_to_timestamp_us,
};
use std::ops::Mul;

Expand Down Expand Up @@ -191,12 +194,23 @@ impl Duration {
}

#[inline]
pub fn truncate_ms(&self, t: i64) -> i64 {
pub fn truncate_impl<F, G, J>(
&self,
t: i64,
nsecs_to_unit: F,
timestamp_to_datetime: G,
datetime_to_timestamp: J,
) -> i64
where
F: Fn(i64) -> i64,
G: Fn(i64) -> NaiveDateTime,
J: Fn(NaiveDateTime) -> i64,
{
match (self.months, self.nsecs) {
(0, 0) => panic!("duration may not be zero"),
// truncate by milliseconds
(0, _) => {
let duration = self.nsecs as i64 / 1_000_000;
let duration = nsecs_to_unit(self.nsecs);
let mut remainder = t % duration;
if remainder < 0 {
remainder += duration
Expand All @@ -205,7 +219,7 @@ impl Duration {
}
// truncate by months
(_, 0) => {
let ts = timestamp_ms_to_datetime(t);
let ts = timestamp_to_datetime(t);
let (year, month) = (ts.year(), ts.month());

// determine the total number of months and truncate
Expand All @@ -216,46 +230,56 @@ impl Duration {

// recreate a new time from the year and month combination
let (year, month) = ((total / 12), ((total % 12) + 1) as u32);
new_datetime(year, month, 1, 0, 0, 0, 0).timestamp_millis()
let dt = new_datetime(year, month, 1, 0, 0, 0, 0);
datetime_to_timestamp(dt)
}
_ => panic!("duration may not mix month and nanosecond units"),
}
}

// Truncate the given nanoseconds timestamp by the window boundary.
// Truncate the given ns timestamp by the window boundary.
#[inline]
pub fn truncate_ns(&self, t: i64) -> i64 {
match (self.months, self.nsecs) {
(0, 0) => panic!("duration may not be zero"),
// truncate by nanoseconds
(0, _) => {
let duration = self.nsecs as i64;
let mut remainder = t % duration;
if remainder < 0 {
remainder += duration
}
t - remainder
}
// truncate by months
(_, 0) => {
let ts = timestamp_ns_to_datetime(t);
let (year, month) = (ts.year(), ts.month());
self.truncate_impl(
t,
|nsecs| nsecs,
timestamp_ns_to_datetime,
datetime_to_timestamp_ns,
)
}

// determine the total number of months and truncate
// the number of months by the duration amount
let mut total = (year * 12) as i32 + (month - 1) as i32;
let remainder = total % self.months as i32;
total -= remainder;
// Truncate the given ns timestamp by the window boundary.
#[inline]
pub fn truncate_us(&self, t: i64) -> i64 {
self.truncate_impl(
t,
|nsecs| nsecs / 1000,
timestamp_us_to_datetime,
datetime_to_timestamp_us,
)
}

// recreate a new time from the year and month combination
let (year, month) = ((total / 12), ((total % 12) + 1) as u32);
new_datetime(year, month, 1, 0, 0, 0, 0).timestamp_nanos()
}
_ => panic!("duration may not mix month and nanosecond units"),
}
// Truncate the given ms timestamp by the window boundary.
#[inline]
pub fn truncate_ms(&self, t: i64) -> i64 {
self.truncate_impl(
t,
|nsecs| nsecs / 1_000_000,
timestamp_ms_to_datetime,
datetime_to_timestamp_ms,
)
}

pub fn add_ms(&self, t: i64) -> i64 {
fn add_impl_month<F, G>(
&self,
t: i64,
timestamp_to_datetime: F,
datetime_to_timestamp: G,
) -> i64
where
F: Fn(i64) -> NaiveDateTime,
G: Fn(NaiveDateTime) -> i64,
{
let d = self;
let mut new_t = t;

Expand All @@ -267,7 +291,7 @@ impl Duration {

// Retrieve the current date and increment the values
// based on the number of months
let ts = timestamp_ms_to_datetime(t);
let ts = timestamp_to_datetime(t);
let mut year = ts.year();
let mut month = ts.month() as i32;
let mut day = ts.day();
Expand Down Expand Up @@ -301,72 +325,32 @@ impl Duration {
let minute = ts.minute();
let sec = ts.second();
let nsec = ts.nanosecond();
let ts =
new_datetime(year, month as u32, day, hour, minute, sec, nsec).timestamp_millis();
new_t = ts;
let dt = new_datetime(year, month as u32, day, hour, minute, sec, nsec);
new_t = datetime_to_timestamp(dt);
}
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs / 1_000_000
new_t
}

pub fn add_ns(&self, t: i64) -> i64 {
let d = self;
let mut new_t = t;

if d.months > 0 {
let mut months = d.months;
if d.negative {
months = -months;
}

// Retrieve the current date and increment the values
// based on the number of months
let ts = timestamp_ns_to_datetime(t);
let mut year = ts.year();
let mut month = ts.month() as i32;
let mut day = ts.day();
year += (months / 12) as i32;
month += (months % 12) as i32;

// if the month overflowed or underflowed, adjust the year
// accordingly. Because we add the modulo for the months
// the year will only adjust by one
if month > 12 {
year += 1;
month -= 12;
} else if month <= 0 {
year -= 1;
month += 12;
}

// Normalize the day if we are past the end of the month.
let mut last_day_of_month = last_day_of_month(month);
if month == (chrono::Month::February.number_from_month() as i32) && is_leap_year(year) {
last_day_of_month += 1;
}

if day > last_day_of_month {
day = last_day_of_month
}

// Retrieve the original time and construct a data
// with the new year, month and day
let hour = ts.hour();
let minute = ts.minute();
let sec = ts.second();
let nsec = ts.nanosecond();
let ts =
new_datetime(year, month as u32, day, hour, minute, sec, nsec).timestamp_nanos();
new_t = ts;
}
let new_t = self.add_impl_month(t, timestamp_ns_to_datetime, datetime_to_timestamp_ns);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs
}

// original silently overflows:
// see https://github.com/influxdata/influxdb_iox/issues/2890

// We keep the panic for now until we better understand the issue
pub fn add_us(&self, t: i64) -> i64 {
let d = self;
let new_t = self.add_impl_month(t, timestamp_us_to_datetime, datetime_to_timestamp_us);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs
}

pub fn add_ms(&self, t: i64) -> i64 {
let d = self;
let new_t = self.add_impl_month(t, timestamp_ms_to_datetime, datetime_to_timestamp_ms);
let nsecs = if d.negative { -d.nsecs } else { d.nsecs };
new_t + nsecs / 1_000_000
}
}

impl Mul<i64> for Duration {
Expand Down

0 comments on commit 95f776d

Please sign in to comment.