Skip to content

Commit

Permalink
python date/datetime function and use expressions in fold operation l…
Browse files Browse the repository at this point in the history
…amdba (#1719)
  • Loading branch information
ritchie46 committed Nov 9, 2021
1 parent 17f4f78 commit 9c518f7
Show file tree
Hide file tree
Showing 12 changed files with 289 additions and 35 deletions.
2 changes: 2 additions & 0 deletions polars/polars-core/src/export.rs
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
pub use arrow;
#[cfg(all(feature = "private", feature = "temporal"))]
pub use chrono;
101 changes: 101 additions & 0 deletions polars/polars-lazy/src/functions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,3 +192,104 @@ pub fn arange(low: Expr, high: Expr, step: usize) -> Expr {
)
}
}

#[cfg(feature = "temporal")]
pub fn datetime(
year: Expr,
month: Expr,
day: Expr,
hour: Option<Expr>,
minute: Option<Expr>,
second: Option<Expr>,
millisecond: Option<Expr>,
) -> Expr {
use polars_core::export::chrono::NaiveDate;
use polars_core::utils::CustomIterTools;

let function = NoEq::new(Arc::new(move |s: &mut [Series]| {
assert_eq!(s.len(), 7);
let max_len = s.iter().map(|s| s.len()).max().unwrap();
let mut year = s[0].cast(&DataType::Int32)?;
if year.len() < max_len {
year = year.expand_at_index(0, max_len)
}
let year = year.i32()?;
let mut month = s[1].cast(&DataType::UInt32)?;
if month.len() < max_len {
month = month.expand_at_index(0, max_len);
}
let month = month.u32()?;
let mut day = s[2].cast(&DataType::UInt32)?;
if day.len() < max_len {
day = day.expand_at_index(0, max_len);
}
let day = day.u32()?;
let mut hour = s[3].cast(&DataType::UInt32)?;
if hour.len() < max_len {
hour = hour.expand_at_index(0, max_len);
}
let hour = hour.u32()?;

let mut minute = s[4].cast(&DataType::UInt32)?;
if minute.len() < max_len {
minute = minute.expand_at_index(0, max_len);
}
let minute = minute.u32()?;

let mut second = s[5].cast(&DataType::UInt32)?;
if second.len() < max_len {
second = second.expand_at_index(0, max_len);
}
let second = second.u32()?;

let mut millisecond = s[6].cast(&DataType::UInt32)?;
if millisecond.len() < max_len {
millisecond = millisecond.expand_at_index(0, max_len);
}
let millisecond = millisecond.u32()?;

let ca: Int64Chunked = year
.into_iter()
.zip(month.into_iter())
.zip(day.into_iter())
.zip(hour.into_iter())
.zip(minute.into_iter())
.zip(second.into_iter())
.zip(millisecond.into_iter())
.map(|((((((y, m), d), h), mnt), s), ms)| {
if let (Some(y), Some(m), Some(d), Some(h), Some(mnt), Some(s), Some(ms)) =
(y, m, d, h, mnt, s, ms)
{
Some(
NaiveDate::from_ymd(y, m, d)
.and_hms_milli(h, mnt, s, ms)
.timestamp_millis(),
)
} else {
None
}
})
.trust_my_length(max_len)
.collect_trusted();

Ok(ca.into_date().into_series())
}) as Arc<dyn SeriesUdf>);
Expr::Function {
input: vec![
year,
month,
day,
hour.unwrap_or_else(|| lit(0)),
minute.unwrap_or_else(|| lit(0)),
second.unwrap_or_else(|| lit(0)),
millisecond.unwrap_or_else(|| lit(0)),
],
function,
output_type: GetOutput::from_type(DataType::Datetime),
options: FunctionOptions {
collect_groups: ApplyOptions::ApplyFlat,
input_wildcard_expansion: true,
},
}
.alias("datetime")
}
4 changes: 3 additions & 1 deletion py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ These functions can be used as expression and sometimes also in eager contexts.

col
count
to_list
list
std
var
max
Expand Down Expand Up @@ -45,6 +45,8 @@ These functions can be used as expression and sometimes also in eager contexts.
format
when
exclude
datetime
date

Constructor
-----------
Expand Down
2 changes: 2 additions & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

# explicit imports make mypy happy
from .lazy import *
from .lazy import _date as date
from .lazy import _datetime as datetime
from .lazy import col, lit
from .lazy import to_list as list
from .string_cache import *
Expand Down
78 changes: 77 additions & 1 deletion py-polars/polars/lazy/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from polars.polars import lit as pylit
from polars.polars import map_mul as _map_mul
from polars.polars import pearson_corr as pypearson_corr
from polars.polars import py_datetime
from polars.polars import spearman_rank_corr as pyspearman_rank_corr

_DOCUMENTING = False
Expand Down Expand Up @@ -66,6 +67,8 @@
"collect_all",
"exclude",
"format",
"_datetime",
"_date",
]


Expand Down Expand Up @@ -519,7 +522,7 @@ def apply(
## Context
* Select/Project
Don't do this, use `map_mul`
Don't do this, use `map`
* GroupBy
expected type `f`: Callable[[Series], Series]
Applies a python function over each group.
Expand Down Expand Up @@ -767,6 +770,79 @@ def argsort_by(
return pl.lazy.expr.wrap_expr(pyargsort_by(exprs, reverse))


def _datetime(
year: "pl.Expr",
month: "pl.Expr",
day: "pl.Expr",
hour: Optional["pl.Expr"] = None,
minute: Optional["pl.Expr"] = None,
second: Optional["pl.Expr"] = None,
millisecond: Optional["pl.Expr"] = None,
) -> "pl.Expr":
"""
Create polars Datetime from distinct time components.
Parameters
----------
year
column or literal.
month
column or literal, ranging from 1-12.
day
column or literal, ranging from 1-31.
hour
column or literal, ranging from 1-24.
minute
column or literal, ranging from 1-60.
second
column or literal, ranging from 1-60.
millisecond
column or literal, ranging from 1-1000.
Returns
-------
Expr of type pl.Datetime
"""

year = pl.expr_to_lit_or_expr(year, str_to_lit=False) # type: ignore
month = pl.expr_to_lit_or_expr(month, str_to_lit=False) # type: ignore
day = pl.expr_to_lit_or_expr(day, str_to_lit=False) # type: ignore

if hour is not None:
hour = pl.expr_to_lit_or_expr(hour, str_to_lit=False)._pyexpr # type: ignore
if minute is not None:
minute = pl.expr_to_lit_or_expr(minute, str_to_lit=False)._pyexpr # type: ignore
if second is not None:
second = pl.expr_to_lit_or_expr(second, str_to_lit=False)._pyexpr # type: ignore
if millisecond is not None:
millisecond = pl.expr_to_lit_or_expr(millisecond, str_to_lit=False)._pyexpr # type: ignore
return pl.wrap_expr(
py_datetime(
year._pyexpr, month._pyexpr, day._pyexpr, hour, minute, second, millisecond
)
)


def _date(year: "pl.Expr", month: "pl.Expr", day: "pl.Expr") -> "pl.Expr":
"""
Create polars Date from distinct time components.
Parameters
----------
year
column or literal.
month
column or literal, ranging from 1-12.
day
column or literal, ranging from 1-31.
Returns
-------
Expr of type pl.Date
"""
return _datetime(year, month, day).cast(pl.Date).alias("date")


def concat_str(exprs: tp.List["pl.Expr"], sep: str = "") -> "pl.Expr":
"""
Concat Utf8 Series in linear time. Non utf8 columns are cast to utf8.
Expand Down
6 changes: 4 additions & 2 deletions py-polars/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ impl PyDataFrame {
} else {
Some(s.as_bytes()[0])
}
} else {
} else {
None
};
let encoding = match encoding {
Expand Down Expand Up @@ -944,7 +944,9 @@ impl PyDataFrame {
pub fn transpose(&self, include_header: bool, names: &str) -> PyResult<Self> {
let mut df = self.df.transpose().map_err(PyPolarsEr::from)?;
if include_header {
let s = Utf8Chunked::new_from_iter(names, self.df.get_columns().iter().map(|s| s.name())).into_series();
let s =
Utf8Chunked::new_from_iter(names, self.df.get_columns().iter().map(|s| s.name()))
.into_series();
df.insert_at_idx(0, s).unwrap();
}
Ok(df.into())
Expand Down
29 changes: 23 additions & 6 deletions py-polars/src/lazy/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,29 @@ pub(crate) fn binary_lambda(lambda: &PyObject, a: Series, b: Series) -> Result<S
e.pvalue(py).to_string()
),
};
// unpack the wrapper in a PySeries
let py_pyseries = result_series_wrapper
.getattr(py, "_s")
.expect("Could net get series attribute '_s'. Make sure that you return a Series object.");
// Downcast to Rust
let pyseries = py_pyseries.extract::<PySeries>(py).unwrap();
let pyseries = if let Some(expr) = result_series_wrapper.getattr(py, "_pyexpr").ok() {
let pyexpr = expr.extract::<PyExpr>(py).unwrap();
let expr = pyexpr.inner;
let df = DataFrame::new_no_checks(vec![]);
let out = df
.lazy()
.select([expr])
.with_predicate_pushdown(false)
.with_projection_pushdown(false)
.with_aggregate_pushdown(false)
.collect()?;

let s = out.select_at_idx(0).unwrap().clone();
PySeries::new(s)
} else {
// unpack the wrapper in a PySeries
let py_pyseries = result_series_wrapper.getattr(py, "_s").expect(
"Could net get series attribute '_s'. Make sure that you return a Series object.",
);
// Downcast to Rust
py_pyseries.extract::<PySeries>(py).unwrap()
};

// Finally get the actual Series
Ok(pyseries.series)
}
Expand Down
2 changes: 1 addition & 1 deletion py-polars/src/lazy/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl PyLazyFrame {
comment_char: Option<&str>,
quote_char: Option<&str>,
null_values: Option<Wrap<NullValues>>,
infer_schema_length: Option<usize>
infer_schema_length: Option<usize>,
) -> Self {
let null_values = null_values.map(|w| w.0);
let comment_char = comment_char.map(|s| s.as_bytes()[0]);
Expand Down
39 changes: 34 additions & 5 deletions py-polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use crate::error::PyPolarsEr;
use crate::file::get_either_file;
use crate::prelude::{DataType, PyDataType};
use mimalloc::MiMalloc;
use polars::functions::diag_concat_df;
use polars_core::export::arrow::io::ipc::read::read_file_metadata;
use pyo3::types::PyDict;
use polars::functions::diag_concat_df;

#[global_allocator]
static GLOBAL: MiMalloc = MiMalloc;
Expand Down Expand Up @@ -150,6 +150,32 @@ fn concat_lst(s: Vec<dsl::PyExpr>) -> dsl::PyExpr {
polars::lazy::functions::concat_lst(s).into()
}

#[pyfunction]
fn py_datetime(
year: dsl::PyExpr,
month: dsl::PyExpr,
day: dsl::PyExpr,
hour: Option<dsl::PyExpr>,
minute: Option<dsl::PyExpr>,
second: Option<dsl::PyExpr>,
millisecond: Option<dsl::PyExpr>,
) -> dsl::PyExpr {
let hour = hour.map(|e| e.inner);
let minute = minute.map(|e| e.inner);
let second = second.map(|e| e.inner);
let millisecond = millisecond.map(|e| e.inner);
polars::lazy::functions::datetime(
year.inner,
month.inner,
day.inner,
hour,
minute,
second,
millisecond,
)
.into()
}

#[pyfunction]
fn concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
let (seq, _len) = get_pyseq(dfs)?;
Expand All @@ -171,10 +197,12 @@ fn py_diag_concat_df(dfs: &PyAny) -> PyResult<PyDataFrame> {
let (seq, _len) = get_pyseq(dfs)?;
let iter = seq.iter()?;

let dfs = iter.map(|item| {
let item = item?;
get_df(item)
}).collect::<PyResult<Vec<_>>>()?;
let dfs = iter
.map(|item| {
let item = item?;
get_df(item)
})
.collect::<PyResult<Vec<_>>>()?;

let df = diag_concat_df(&dfs).map_err(PyPolarsEr::from)?;
Ok(df.into())
Expand Down Expand Up @@ -274,5 +302,6 @@ fn polars(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_wrapped(wrap_pyfunction!(spearman_rank_corr)).unwrap();
m.add_wrapped(wrap_pyfunction!(map_mul)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_diag_concat_df)).unwrap();
m.add_wrapped(wrap_pyfunction!(py_datetime)).unwrap();
Ok(())
}
6 changes: 4 additions & 2 deletions py-polars/src/list_construction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ pub fn py_seq_to_list(name: &str, seq: &PyAny, dtype: &PyAny) -> PyResult<Series
let (seq, len) = get_pyseq(seq)?;
let s = match dtype {
DataType::Int64 => {
let mut builder = ListPrimitiveChunkedBuilder::<i64>::new(name, len, len * 5, DataType::Int64);
let mut builder =
ListPrimitiveChunkedBuilder::<i64>::new(name, len, len * 5, DataType::Int64);
for sub_seq in seq.iter()? {
let sub_seq = sub_seq?;
let (sub_seq, len) = get_pyseq(sub_seq)?;
Expand All @@ -31,7 +32,8 @@ pub fn py_seq_to_list(name: &str, seq: &PyAny, dtype: &PyAny) -> PyResult<Series
builder.finish().into_series()
}
DataType::Float64 => {
let mut builder = ListPrimitiveChunkedBuilder::<f64>::new(name, len, len * 5, DataType::Float64);
let mut builder =
ListPrimitiveChunkedBuilder::<f64>::new(name, len, len * 5, DataType::Float64);
for sub_seq in seq.iter()? {
let sub_seq = sub_seq?;
let (sub_seq, len) = get_pyseq(sub_seq)?;
Expand Down

0 comments on commit 9c518f7

Please sign in to comment.