Skip to content

Commit

Permalink
add duration expression (#3017)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Mar 31, 2022
1 parent 66c59e0 commit a999618
Show file tree
Hide file tree
Showing 6 changed files with 246 additions and 19 deletions.
108 changes: 99 additions & 9 deletions polars/polars-lazy/src/dsl/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
//!
use crate::prelude::*;
use crate::utils::has_wildcard;
use polars_core::export::arrow::temporal_conversions::NANOSECONDS;
use polars_core::prelude::*;
use polars_core::utils::arrow::temporal_conversions::SECONDS_IN_DAY;
use polars_core::utils::get_supertype;
use rayon::prelude::*;
use std::ops::{BitAnd, BitOr};
Expand Down Expand Up @@ -302,19 +304,30 @@ pub fn arange(low: Expr, high: Expr, step: usize) -> Expr {
}
}

#[derive(Default)]
pub struct DatetimeArgs {
pub year: Expr,
pub month: Expr,
pub day: Expr,
pub hour: Option<Expr>,
pub minute: Option<Expr>,
pub second: Option<Expr>,
pub millisecond: Option<Expr>,
}

#[cfg(feature = "temporal")]
pub fn datetime(
year: Expr,
month: Expr,
day: Expr,
hour: Option<Expr>,
minute: Option<Expr>,
second: Option<Expr>,
millisecond: Option<Expr>,
) -> Expr {
pub fn datetime(args: DatetimeArgs) -> Expr {
use polars_core::export::chrono::NaiveDate;
use polars_core::utils::CustomIterTools;

let year = args.year;
let month = args.month;
let day = args.day;
let hour = args.hour;
let minute = args.minute;
let second = args.second;
let millisecond = args.millisecond;

let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
assert_eq!(s.len(), 7);
let max_len = s.iter().map(|s| s.len()).max().unwrap();
Expand Down Expand Up @@ -404,6 +417,83 @@ pub fn datetime(
.alias("datetime")
}

#[derive(Default)]
pub struct DurationArgs {
pub days: Option<Expr>,
pub seconds: Option<Expr>,
pub nanoseconds: Option<Expr>,
pub milliseconds: Option<Expr>,
pub minutes: Option<Expr>,
pub hours: Option<Expr>,
pub weeks: Option<Expr>,
}

#[cfg(feature = "temporal")]
pub fn duration(args: DurationArgs) -> Expr {
let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
assert_eq!(s.len(), 7);
let days = s[0].cast(&DataType::Int64).unwrap();
let seconds = s[1].cast(&DataType::Int64).unwrap();
let mut nanoseconds = s[2].cast(&DataType::Int64).unwrap();
let milliseconds = s[3].cast(&DataType::Int64).unwrap();
let minutes = s[4].cast(&DataType::Int64).unwrap();
let hours = s[5].cast(&DataType::Int64).unwrap();
let weeks = s[6].cast(&DataType::Int64).unwrap();

let max_len = s.iter().map(|s| s.len()).max().unwrap();

let condition = |s: &Series| {
// check if not literal 0 || full column
(s.len() != max_len && s.get(0) != AnyValue::Int64(0)) || s.len() == max_len
};

if nanoseconds.len() != max_len {
nanoseconds = nanoseconds.expand_at_index(0, max_len);
}
if condition(&days) {
nanoseconds = (&nanoseconds + &days) * NANOSECONDS * SECONDS_IN_DAY;
}
if condition(&seconds) {
nanoseconds = (&nanoseconds + &seconds) * NANOSECONDS;
}
if condition(&milliseconds) {
nanoseconds = (&nanoseconds + &milliseconds) * 1_000_000;
}
if condition(&minutes) {
nanoseconds = (&nanoseconds + &minutes) * NANOSECONDS * 60;
}
if condition(&hours) {
nanoseconds = (&nanoseconds + &hours) * NANOSECONDS * 60 * 60;
}
if condition(&weeks) {
nanoseconds = (&nanoseconds + &weeks) * NANOSECONDS * SECONDS_IN_DAY * 7;
}

nanoseconds.cast(&DataType::Duration(TimeUnit::Nanoseconds))
}) as Arc<dyn SeriesUdf>);

Expr::Function {
input: vec![
args.days.unwrap_or_else(|| lit(0i64)),
args.seconds.unwrap_or_else(|| lit(0i64)),
args.nanoseconds.unwrap_or_else(|| lit(0i64)),
args.milliseconds.unwrap_or_else(|| lit(0i64)),
args.minutes.unwrap_or_else(|| lit(0i64)),
args.hours.unwrap_or_else(|| lit(0i64)),
args.weeks.unwrap_or_else(|| lit(0i64)),
],
function,
output_type: GetOutput::from_type(DataType::Datetime(TimeUnit::Milliseconds, None)),
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: true,
auto_explode: false,
fmt_str: "duration",
},
}
.alias("duration")
}

/// Concat multiple
pub fn concat<L: AsRef<[LazyFrame]>>(inputs: L, rechunk: bool) -> Result<LazyFrame> {
let mut inputs = inputs.as_ref().to_vec();
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ These functions can be used as expression and sometimes also in eager contexts.
when
exclude
datetime
duration
date
struct

Expand Down
1 change: 1 addition & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def version() -> str:
concat_str,
count,
cov,
duration,
exclude,
first,
fold,
Expand Down
76 changes: 73 additions & 3 deletions py-polars/polars/internals/lazy_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
from polars.polars import max_exprs as _max_exprs
from polars.polars import min_exprs as _min_exprs
from polars.polars import pearson_corr as pypearson_corr
from polars.polars import py_datetime
from polars.polars import py_datetime, py_duration
from polars.polars import repeat as _repeat
from polars.polars import spearman_rank_corr as pyspearman_rank_corr

Expand Down Expand Up @@ -1032,6 +1032,76 @@ def argsort_by(
return pli.wrap_expr(pyargsort_by(exprs, reverse))


def duration(
days: Optional[Union["pli.Expr", str]] = None,
seconds: Optional[Union["pli.Expr", str]] = None,
nanoseconds: Optional[Union["pli.Expr", str]] = None,
milliseconds: Optional[Union["pli.Expr", str]] = None,
minutes: Optional[Union["pli.Expr", str]] = None,
hours: Optional[Union["pli.Expr", str]] = None,
weeks: Optional[Union["pli.Expr", str]] = None,
) -> "pli.Expr":
"""
Create polars `Duration` from distinct time components.
Returns
-------
Expr of type `pl.Duration`
Examples
--------
>>> from datetime import datetime
>>> df = pl.DataFrame(
... {
... "datetime": [datetime(2022, 1, 1), datetime(2022, 1, 2)],
... "add": [1, 2],
... }
... )
>>> df.select(
... [
... (pl.col("datetime") + pl.duration(weeks="add")).alias("add_weeks"),
... (pl.col("datetime") + pl.duration(days="add")).alias("add_days"),
... (pl.col("datetime") + pl.duration(seconds="add")).alias("add_seconds"),
... (pl.col("datetime") + pl.duration(milliseconds="add")).alias(
... "add_milliseconds"
... ),
... (pl.col("datetime") + pl.duration(hours="add")).alias("add_hours"),
... ]
... )
shape: (2, 5)
┌────────────┬────────────┬─────────────────────┬──────────────┬─────────────────────┐
│ add_weeks ┆ add_days ┆ add_seconds ┆ add_millisec ┆ add_hours │
│ --- ┆ --- ┆ --- ┆ onds ┆ --- │
│ datetime[m ┆ datetime[m ┆ datetime[ms] ┆ --- ┆ datetime[ms] │
│ s] ┆ s] ┆ ┆ datetime[ms] ┆ │
╞════════════╪════════════╪═════════════════════╪══════════════╪═════════════════════╡
│ 2022-01-08 ┆ 2022-01-02 ┆ 2022-01-01 00:00:01 ┆ 2022-01-01 ┆ 2022-01-01 01:00:00 │
│ 00:00:00 ┆ 00:00:00 ┆ ┆ 00:00:00.001 ┆ │
├╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌┼╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌╌┤
│ 2022-01-16 ┆ 2022-01-04 ┆ 2022-01-02 00:00:02 ┆ 2022-01-02 ┆ 2022-01-02 02:00:00 │
│ 00:00:00 ┆ 00:00:00 ┆ ┆ 00:00:00.002 ┆ │
└────────────┴────────────┴─────────────────────┴──────────────┴─────────────────────┘
"""
if hours is not None:
hours = pli.expr_to_lit_or_expr(hours, str_to_lit=False)._pyexpr
if minutes is not None:
minutes = pli.expr_to_lit_or_expr(minutes, str_to_lit=False)._pyexpr
if seconds is not None:
seconds = pli.expr_to_lit_or_expr(seconds, str_to_lit=False)._pyexpr
if milliseconds is not None:
milliseconds = pli.expr_to_lit_or_expr(milliseconds, str_to_lit=False)._pyexpr
if nanoseconds is not None:
nanoseconds = pli.expr_to_lit_or_expr(nanoseconds, str_to_lit=False)._pyexpr
if days is not None:
days = pli.expr_to_lit_or_expr(days, str_to_lit=False)._pyexpr
if weeks is not None:
weeks = pli.expr_to_lit_or_expr(weeks, str_to_lit=False)._pyexpr
return pli.wrap_expr(
py_duration(days, seconds, nanoseconds, milliseconds, minutes, hours, weeks)
)


def _datetime(
year: Union["pli.Expr", str],
month: Union["pli.Expr", str],
Expand All @@ -1042,7 +1112,7 @@ def _datetime(
millisecond: Optional[Union["pli.Expr", str]] = None,
) -> "pli.Expr":
"""
Create polars Datetime from distinct time components.
Create polars `Datetime` from distinct time components.
Parameters
----------
Expand All @@ -1063,7 +1133,7 @@ def _datetime(
Returns
-------
Expr of type pl.Datetime
Expr of type `pl.Datetime`
"""

year_expr = pli.expr_to_lit_or_expr(year, str_to_lit=False)
Expand Down
40 changes: 33 additions & 7 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use crate::error::{
ArrowErrorException, ComputeError, NoDataError, NotFoundError, PyPolarsErr, SchemaError,
};
use crate::file::get_either_file;
use crate::prelude::{ClosedWindow, DataType, Duration, PyDataType};
use crate::prelude::{ClosedWindow, DataType, DatetimeArgs, Duration, DurationArgs, PyDataType};
use dsl::ToExprs;
use mimalloc::MiMalloc;
use polars::functions::{diag_concat_df, hor_concat_df};
Expand Down Expand Up @@ -215,16 +215,41 @@ fn py_datetime(
let minute = minute.map(|e| e.inner);
let second = second.map(|e| e.inner);
let millisecond = millisecond.map(|e| e.inner);
polars::lazy::dsl::datetime(
year.inner,
month.inner,
day.inner,

let args = DatetimeArgs {
year: year.inner,
month: month.inner,
day: day.inner,
hour,
minute,
second,
millisecond,
)
.into()
};

polars::lazy::dsl::datetime(args).into()
}

#[pyfunction]
fn py_duration(
days: Option<PyExpr>,
seconds: Option<PyExpr>,
nanoseconds: Option<PyExpr>,
milliseconds: Option<PyExpr>,
minutes: Option<PyExpr>,
hours: Option<PyExpr>,
weeks: Option<PyExpr>,
) -> dsl::PyExpr {
let args = DurationArgs {
days: days.map(|e| e.inner),
seconds: seconds.map(|e| e.inner),
nanoseconds: nanoseconds.map(|e| e.inner),
milliseconds: milliseconds.map(|e| e.inner),
minutes: minutes.map(|e| e.inner),
hours: hours.map(|e| e.inner),
weeks: weeks.map(|e| e.inner),
};

polars::lazy::dsl::duration(args).into()
}

#[pyfunction]
Expand Down Expand Up @@ -437,6 +462,7 @@ fn polars(py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(py_diag_concat_df)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_hor_concat_df)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_datetime)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_duration)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_date_range)).unwrap();
m.add_wrapped(wrap_pyfunction!(min_exprs)).unwrap();
m.add_wrapped(wrap_pyfunction!(max_exprs)).unwrap();
Expand Down
39 changes: 39 additions & 0 deletions py-polars/tests/test_datelike.py
Original file line number Diff line number Diff line change
Expand Up @@ -598,3 +598,42 @@ def test_asof_join_tolerance_grouper() -> None:
)

assert out.frame_equal(expected)


def test_duration_function() -> None:
df = pl.DataFrame(
{
"datetime": [datetime(2022, 1, 1), datetime(2022, 1, 2)],
"add": [1, 2],
}
)

out = df.select(
[
(pl.col("datetime") + pl.duration(weeks="add")).alias("add_weeks"),
(pl.col("datetime") + pl.duration(days="add")).alias("add_days"),
(pl.col("datetime") + pl.duration(seconds="add")).alias("add_seconds"),
(pl.col("datetime") + pl.duration(milliseconds="add")).alias(
"add_milliseconds"
),
(pl.col("datetime") + pl.duration(hours="add")).alias("add_hours"),
]
)

expected = pl.DataFrame(
{
"add_weeks": [datetime(2022, 1, 8), datetime(2022, 1, 16)],
"add_days": [datetime(2022, 1, 2), datetime(2022, 1, 4)],
"add_seconds": [
datetime(2022, 1, 1, second=1),
datetime(2022, 1, 2, second=2),
],
"add_milliseconds": [
datetime(2022, 1, 1, microsecond=1000),
datetime(2022, 1, 2, microsecond=2000),
],
"add_hours": [datetime(2022, 1, 1, hour=1), datetime(2022, 1, 2, hour=2)],
}
)

assert out.frame_equal(expected)

0 comments on commit a999618

Please sign in to comment.