Skip to content

Commit

Permalink
feat(rust, python): deprecate 'use_earliest' argument in favour of 'a…
Browse files Browse the repository at this point in the history
…mbiguous', which can take expressions (#10719)
  • Loading branch information
MarcoGorelli committed Aug 26, 2023
1 parent 0e507dd commit 3e84f29
Show file tree
Hide file tree
Showing 35 changed files with 688 additions and 375 deletions.
2 changes: 1 addition & 1 deletion crates/polars-arrow/src/kernels/mod.rs
Expand Up @@ -25,7 +25,7 @@ pub mod take_agg;
mod time;

#[cfg(feature = "timezones")]
pub use time::replace_time_zone;
pub use time::convert_to_naive_local;

/// Internal state of [SlicesIterator]
#[derive(Debug, PartialEq)]
Expand Down
64 changes: 11 additions & 53 deletions crates/polars-arrow/src/kernels/time.rs
@@ -1,30 +1,25 @@
use arrow::array::PrimitiveArray;
use arrow::compute::arity::try_unary;
use arrow::datatypes::{DataType as ArrowDataType, TimeUnit};
use arrow::error::{Error as ArrowError, Result};
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use chrono::{LocalResult, NaiveDateTime, TimeZone};
use chrono_tz::Tz;

use crate::error::PolarsResult;

fn convert_to_naive_local(
pub fn convert_to_naive_local(
from_tz: &Tz,
to_tz: &Tz,
ndt: NaiveDateTime,
use_earliest: Option<bool>,
ambiguous: &str,
) -> Result<NaiveDateTime> {
let ndt = from_tz.from_utc_datetime(&ndt).naive_local();
match to_tz.from_local_datetime(&ndt) {
LocalResult::Single(dt) => Ok(dt.naive_utc()),
LocalResult::Ambiguous(dt_earliest, dt_latest) => match use_earliest {
Some(true) => Ok(dt_earliest.naive_utc()),
Some(false) => Ok(dt_latest.naive_utc()),
None => Err(ArrowError::InvalidArgumentError(
format!("datetime '{}' is ambiguous in time zone '{}'. Please use `use_earliest` to tell how it should be localized.", ndt, to_tz)
))
LocalResult::Ambiguous(dt_earliest, dt_latest) => match ambiguous {
"earliest" => Ok(dt_earliest.naive_utc()),
"latest" => Ok(dt_latest.naive_utc()),
"raise" => Err(ArrowError::InvalidArgumentError(
format!("datetime '{}' is ambiguous in time zone '{}'. Please use `ambiguous` to tell how it should be localized.", ndt, to_tz)
)),
ambiguous => Err(ArrowError::InvalidArgumentError(
format!("Invalid argument {}, expected one of: \"earliest\", \"latest\", \"raise\"", ambiguous)
)),
},
LocalResult::None => Err(ArrowError::InvalidArgumentError(
format!(
Expand All @@ -35,40 +30,3 @@ fn convert_to_naive_local(
)),
}
}

pub fn replace_time_zone(
arr: &PrimitiveArray<i64>,
tu: TimeUnit,
from_tz: &Tz,
to_tz: &Tz,
use_earliest: Option<bool>,
) -> PolarsResult<PrimitiveArray<i64>> {
let res = match tu {
TimeUnit::Millisecond => try_unary(
arr,
|value| {
let ndt = timestamp_ms_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_millis())
},
ArrowDataType::Int64,
),
TimeUnit::Microsecond => try_unary(
arr,
|value| {
let ndt = timestamp_us_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_micros())
},
ArrowDataType::Int64,
),
TimeUnit::Nanosecond => try_unary(
arr,
|value| {
let ndt = timestamp_ns_to_datetime(value);
Ok(convert_to_naive_local(from_tz, to_tz, ndt, use_earliest)?.timestamp_nanos())
},
ArrowDataType::Int64,
),
_ => unreachable!(),
};
Ok(res?)
}
9 changes: 8 additions & 1 deletion crates/polars-io/src/csv/read_impl/mod.rs
Expand Up @@ -46,7 +46,14 @@ pub(crate) fn cast_columns(
(DataType::Utf8, DataType::Datetime(tu, _)) => s
.utf8()
.unwrap()
.as_datetime(None, *tu, false, false, None, None)
.as_datetime(
None,
*tu,
false,
false,
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)
.map(|ca| ca.into_series()),
(_, dt) => s.cast(dt),
}?;
Expand Down
59 changes: 40 additions & 19 deletions crates/polars-ops/src/chunked_array/datetime/replace_time_zone.rs
@@ -1,5 +1,10 @@
use arrow::temporal_conversions::{
timestamp_ms_to_datetime, timestamp_ns_to_datetime, timestamp_us_to_datetime,
};
use chrono::NaiveDateTime;
use chrono_tz::Tz;
use polars_arrow::kernels::replace_time_zone as replace_time_zone_kernel;
use polars_arrow::kernels::convert_to_naive_local;
use polars_core::chunked_array::ops::arity::try_binary_elementwise_values;
use polars_core::prelude::*;

fn parse_time_zone(s: &str) -> PolarsResult<Tz> {
Expand All @@ -8,26 +13,42 @@ fn parse_time_zone(s: &str) -> PolarsResult<Tz> {
}

pub fn replace_time_zone(
ca: &DatetimeChunked,
datetime: &Logical<DatetimeType, Int64Type>,
time_zone: Option<&str>,
use_earliest: Option<bool>,
ambiguous: &Utf8Chunked,
) -> PolarsResult<DatetimeChunked> {
let out: PolarsResult<_> = {
let from_tz = parse_time_zone(ca.time_zone().as_deref().unwrap_or("UTC"))?;
let to_tz = parse_time_zone(time_zone.unwrap_or("UTC"))?;
let chunks = ca.downcast_iter().map(|arr| {
replace_time_zone_kernel(
arr,
ca.time_unit().to_arrow(),
&from_tz,
&to_tz,
use_earliest,
)
});
let out = ChunkedArray::try_from_chunk_iter(ca.name(), chunks)?;
Ok(out.into_datetime(ca.time_unit(), time_zone.map(|x| x.to_string())))
let from_tz = parse_time_zone(datetime.time_zone().as_deref().unwrap_or("UTC"))?;
let to_tz = parse_time_zone(time_zone.unwrap_or("UTC"))?;
let timestamp_to_datetime: fn(i64) -> NaiveDateTime = match datetime.time_unit() {
TimeUnit::Milliseconds => timestamp_ms_to_datetime,
TimeUnit::Microseconds => timestamp_us_to_datetime,
TimeUnit::Nanoseconds => timestamp_ns_to_datetime,
};
let mut out = out?;
out.set_sorted_flag(ca.is_sorted_flag());
let datetime_to_timestamp: fn(NaiveDateTime) -> i64 = match datetime.time_unit() {
TimeUnit::Milliseconds => datetime_to_timestamp_ms,
TimeUnit::Microseconds => datetime_to_timestamp_us,
TimeUnit::Nanoseconds => datetime_to_timestamp_ns,
};
let out = match ambiguous.len() {
1 => match ambiguous.get(0) {
Some(ambiguous) => datetime.0.try_apply(|timestamp| {
let ndt = timestamp_to_datetime(timestamp);
Ok(datetime_to_timestamp(convert_to_naive_local(
&from_tz, &to_tz, ndt, ambiguous,
)?))
}),
_ => Ok(datetime.0.apply(|_| None)),
},
_ => {
try_binary_elementwise_values(datetime, ambiguous, |timestamp: i64, ambiguous: &str| {
let ndt = timestamp_to_datetime(timestamp);
Ok::<i64, PolarsError>(datetime_to_timestamp(convert_to_naive_local(
&from_tz, &to_tz, ndt, ambiguous,
)?))
})
},
};
let mut out = out?.into_datetime(datetime.time_unit(), time_zone.map(|x| x.to_string()));
out.set_sorted_flag(datetime.is_sorted_flag());
Ok(out)
}
25 changes: 12 additions & 13 deletions crates/polars-plan/src/dsl/dt.rs
Expand Up @@ -215,11 +215,12 @@ impl DateLikeNameSpace {
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::TimeStamp(tu)))
}

pub fn truncate(self, options: TruncateOptions) -> Expr {
self.0
.map_private(FunctionExpr::TemporalExpr(TemporalFunction::Truncate(
options,
)))
pub fn truncate(self, options: TruncateOptions, ambiguous: Expr) -> Expr {
self.0.map_many_private(
FunctionExpr::TemporalExpr(TemporalFunction::Truncate(options)),
&[ambiguous],
false,
)
}

// roll backward to the first day of the month
Expand Down Expand Up @@ -267,14 +268,12 @@ impl DateLikeNameSpace {
}

#[cfg(feature = "timezones")]
pub fn replace_time_zone(
self,
time_zone: Option<TimeZone>,
use_earliest: Option<bool>,
) -> Expr {
self.0.map_private(FunctionExpr::TemporalExpr(
TemporalFunction::ReplaceTimeZone(time_zone, use_earliest),
))
pub fn replace_time_zone(self, time_zone: Option<TimeZone>, ambiguous: Expr) -> Expr {
self.0.map_many_private(
FunctionExpr::TemporalExpr(TemporalFunction::ReplaceTimeZone(time_zone)),
&[ambiguous],
false,
)
}

pub fn combine(self, time: Expr, tu: TimeUnit) -> Expr {
Expand Down
55 changes: 37 additions & 18 deletions crates/polars-plan/src/dsl/function_expr/datetime.rs
Expand Up @@ -42,7 +42,7 @@ pub enum TemporalFunction {
DSTOffset,
Round(String, String),
#[cfg(feature = "timezones")]
ReplaceTimeZone(Option<TimeZone>, Option<bool>),
ReplaceTimeZone(Option<TimeZone>),
DateRange {
every: Duration,
closed: ClosedWindow,
Expand All @@ -67,7 +67,6 @@ pub enum TemporalFunction {
DatetimeFunction {
time_unit: TimeUnit,
time_zone: Option<TimeZone>,
use_earliest: Option<bool>,
},
}

Expand Down Expand Up @@ -105,7 +104,7 @@ impl Display for TemporalFunction {
DSTOffset => "dst_offset",
Round(..) => "round",
#[cfg(feature = "timezones")]
ReplaceTimeZone(_, _) => "replace_time_zone",
ReplaceTimeZone(_) => "replace_time_zone",
DateRange { .. } => return write!(f, "date_range"),
DateRanges { .. } => return write!(f, "date_ranges"),
TimeRange { .. } => return write!(f, "time_range"),
Expand Down Expand Up @@ -147,10 +146,12 @@ pub(super) fn ordinal_day(s: &Series) -> PolarsResult<Series> {
pub(super) fn time(s: &Series) -> PolarsResult<Series> {
match s.dtype() {
#[cfg(feature = "timezones")]
DataType::Datetime(_, Some(_)) => {
polars_ops::prelude::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Time)
},
DataType::Datetime(_, Some(_)) => polars_ops::prelude::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Time),
DataType::Datetime(_, _) => s.datetime().unwrap().cast(&DataType::Time),
DataType::Date => s.datetime().unwrap().cast(&DataType::Time),
DataType::Time => Ok(s.clone()),
Expand All @@ -162,8 +163,12 @@ pub(super) fn date(s: &Series) -> PolarsResult<Series> {
#[cfg(feature = "timezones")]
DataType::Datetime(_, Some(tz)) => {
let mut out = {
polars_ops::chunked_array::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Date)?
polars_ops::chunked_array::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Date)?
};
if tz != "UTC" {
// DST transitions may not preserve sortedness.
Expand All @@ -181,8 +186,12 @@ pub(super) fn datetime(s: &Series) -> PolarsResult<Series> {
#[cfg(feature = "timezones")]
DataType::Datetime(tu, Some(tz)) => {
let mut out = {
polars_ops::chunked_array::replace_time_zone(s.datetime().unwrap(), None, None)?
.cast(&DataType::Datetime(*tu, None))?
polars_ops::chunked_array::replace_time_zone(
s.datetime().unwrap(),
None,
&Utf8Chunked::from_iter(std::iter::once("raise")),
)?
.cast(&DataType::Datetime(*tu, None))?
};
if tz != "UTC" {
// DST transitions may not preserve sortedness.
Expand Down Expand Up @@ -216,21 +225,31 @@ pub(super) fn timestamp(s: &Series, tu: TimeUnit) -> PolarsResult<Series> {
s.timestamp(tu).map(|ca| ca.into_series())
}

pub(super) fn truncate(s: &Series, options: &TruncateOptions) -> PolarsResult<Series> {
let mut out = match s.dtype() {
pub(super) fn truncate(s: &[Series], options: &TruncateOptions) -> PolarsResult<Series> {
let time_series = &s[0];
let ambiguous = &s[1].utf8().unwrap();
let mut out = match time_series.dtype() {
DataType::Datetime(_, tz) => match tz {
#[cfg(feature = "timezones")]
Some(tz) => s
Some(tz) => time_series
.datetime()
.unwrap()
.truncate(options, tz.parse::<Tz>().ok().as_ref(), ambiguous)?
.into_series(),
_ => time_series
.datetime()
.unwrap()
.truncate(options, tz.parse::<Tz>().ok().as_ref())?
.truncate(options, None, ambiguous)?
.into_series(),
_ => s.datetime().unwrap().truncate(options, None)?.into_series(),
},
DataType::Date => s.date().unwrap().truncate(options, None)?.into_series(),
DataType::Date => time_series
.date()
.unwrap()
.truncate(options, None, ambiguous)?
.into_series(),
dt => polars_bail!(opq = round, got = dt, expected = "date/datetime"),
};
out.set_sorted_flag(s.is_sorted_flag());
out.set_sorted_flag(time_series.is_sorted_flag());
Ok(out)
}

Expand Down
12 changes: 5 additions & 7 deletions crates/polars-plan/src/dsl/function_expr/dispatch.rs
Expand Up @@ -34,11 +34,9 @@ pub(super) fn set_sorted_flag(s: &Series, sorted: IsSorted) -> PolarsResult<Seri
}

#[cfg(feature = "timezones")]
pub(super) fn replace_time_zone(
s: &Series,
time_zone: Option<&str>,
use_earliest: Option<bool>,
) -> PolarsResult<Series> {
let ca = s.datetime().unwrap();
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, use_earliest)?.into_series())
pub(super) fn replace_time_zone(s: &[Series], time_zone: Option<&str>) -> PolarsResult<Series> {
let s1 = &s[0];
let ca = s1.datetime().unwrap();
let s2 = &s[1].utf8().unwrap();
Ok(polars_ops::prelude::replace_time_zone(ca, time_zone, s2)?.into_series())
}
16 changes: 5 additions & 11 deletions crates/polars-plan/src/dsl/function_expr/mod.rs
Expand Up @@ -677,7 +677,7 @@ impl From<StringFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
},
#[cfg(feature = "temporal")]
Strptime(dtype, options) => {
map!(strings::strptime, dtype.clone(), &options)
map_as_slice!(strings::strptime, dtype.clone(), &options)
},
#[cfg(feature = "concat_str")]
ConcatVertical(delimiter) => map!(strings::concat, &delimiter),
Expand Down Expand Up @@ -749,7 +749,7 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
Nanosecond => map!(datetime::nanosecond),
TimeStamp(tu) => map!(datetime::timestamp, tu),
Truncate(truncate_options) => {
map!(datetime::truncate, &truncate_options)
map_as_slice!(datetime::truncate, &truncate_options)
},
#[cfg(feature = "date_offset")]
MonthStart => map!(datetime::month_start),
Expand All @@ -761,8 +761,8 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
DSTOffset => map!(datetime::dst_offset),
Round(every, offset) => map!(datetime::round, &every, &offset),
#[cfg(feature = "timezones")]
ReplaceTimeZone(tz, use_earliest) => {
map!(dispatch::replace_time_zone, tz.as_deref(), use_earliest)
ReplaceTimeZone(tz) => {
map_as_slice!(dispatch::replace_time_zone, tz.as_deref())
},
Combine(tu) => map_as_slice!(temporal::combine, tu),
DateRange {
Expand Down Expand Up @@ -818,14 +818,8 @@ impl From<TemporalFunction> for SpecialEq<Arc<dyn SeriesUdf>> {
DatetimeFunction {
time_unit,
time_zone,
use_earliest,
} => {
map_as_slice!(
temporal::datetime,
&time_unit,
time_zone.as_deref(),
use_earliest
)
map_as_slice!(temporal::datetime, &time_unit, time_zone.as_deref())
},
}
}
Expand Down

0 comments on commit 3e84f29

Please sign in to comment.