Skip to content

Commit

Permalink
cumulative_eval (#3400)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 15, 2022
1 parent 13e9fc8 commit dd67372
Show file tree
Hide file tree
Showing 17 changed files with 254 additions and 16 deletions.
3 changes: 3 additions & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ log = ["polars-core/log", "polars-lazy/log"]
partition_by = ["polars-core/partition_by"]
semi_anti_join = ["polars-core/semi_anti_join"]
list_eval = ["polars-lazy/list_eval"]
cumulative_eval = ["polars-lazy/cumulative_eval"]
chunked_ids = ["polars-core/chunked_ids", "polars-lazy/chunked_ids"]
to_dummies = ["polars-ops/to_dummies"]
bigidx = ["polars-core/bigidx", "polars-lazy/bigidx"]
Expand Down Expand Up @@ -228,6 +229,8 @@ docs-selection = [
"string_encoding",
"product",
"to_dummies",
"list_eval",
"cumulative_eval",
]

bench = [
Expand Down
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,7 @@ impl DataFrame {
}

#[cfg(feature = "private")]
#[inline]
pub fn get_columns_mut(&mut self) -> &mut Vec<Series> {
&mut self.columns
}
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ dot_diagram = []
unique_counts = ["polars-core/unique_counts"]
log = ["polars-core/log"]
list_eval = []
cumulative_eval = []
chunked_ids = []
list_to_struct = ["polars-ops/list_to_struct"]
python = ["pyo3"]
Expand Down
116 changes: 116 additions & 0 deletions polars/polars-lazy/src/dsl/eval.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use super::*;
use crate::physical_plan::state::ExecutionState;
use parking_lot::Mutex;
use rayon::prelude::*;

pub(super) fn prepare_eval_expr(mut expr: Expr) -> Expr {
expr.mutate().apply(|e| match e {
Expr::Column(name) => {
*name = Arc::from("");
true
}
Expr::Nth(_) => {
*e = Expr::Column(Arc::from(""));
true
}
_ => true,
});
expr
}

impl Expr {
/// Run an expression over a sliding window that increases `1` slot every iteration.
///
/// # Warning
/// this can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
/// that visit all elements.
pub fn cumulative_eval(self, expr: Expr, min_periods: usize, parallel: bool) -> Self {
let expr2 = expr.clone();
let func = move |mut s: Series| {
let name = s.name().to_string();
s.rename("");
let expr = expr.clone();
let mut arena = Arena::with_capacity(10);
let aexpr = to_aexpr(expr, &mut arena);
let planner = DefaultPlanner::default();
let phys_expr = planner.create_physical_expr(aexpr, Context::Default, &mut arena)?;

let state = ExecutionState::new();

let mut err = None;

let avs = if parallel {
let m_err = Mutex::new(None);
let avs = (1..s.len() + 1)
.into_par_iter()
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
let df = DataFrame::new_no_checks(vec![s]);
let out = phys_expr.evaluate(&df, &state);
match out {
Ok(s) => s.get(0).into_static().unwrap(),
Err(e) => {
*m_err.lock() = Some(e);
AnyValue::Null
}
}
} else {
AnyValue::Null
}
})
.collect::<Vec<_>>();
err = m_err.lock().take();
avs
} else {
let mut df_container = DataFrame::new_no_checks(vec![]);
(1..s.len() + 1)
.map(|len| {
let s = s.slice(0, len);
if (len - s.null_count()) >= min_periods {
df_container.get_columns_mut().push(s);
let out = phys_expr.evaluate(&df_container, &state);
df_container.get_columns_mut().clear();
match out {
Ok(s) => s.get(0).into_static().unwrap(),
Err(e) => {
err = Some(e);
AnyValue::Null
}
}
} else {
AnyValue::Null
}
})
.collect::<Vec<_>>()
};

match err {
None => Ok(Series::new(&name, avs)),
Some(e) => Err(e),
}
};

self.map(
func,
GetOutput::map_field(move |f| {
// dummy df to determine output dtype
let dtype = f
.data_type()
.inner_dtype()
.cloned()
.unwrap_or_else(|| f.data_type().clone());

let df = Series::new_empty("", &dtype).into_frame();
match df.lazy().select([expr2.clone()]).collect() {
Ok(out) => {
let dtype = out.get_columns()[0].dtype();
Field::new(f.name(), dtype.clone())
}
Err(_) => Field::new(f.name(), DataType::Null),
}
}),
)
.with_fmt("expanding_eval")
}
}
15 changes: 3 additions & 12 deletions polars/polars-lazy/src/dsl/list.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::dsl::eval::prepare_eval_expr;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use parking_lot::Mutex;
Expand Down Expand Up @@ -211,18 +212,8 @@ impl ListNameSpace {

/// Run any [`Expr`] on these lists elements
#[cfg(feature = "list_eval")]
pub fn eval(self, mut expr: Expr, parallel: bool) -> Expr {
expr.mutate().apply(|e| match e {
Expr::Column(name) => {
*name = Arc::from("");
true
}
Expr::Nth(_) => {
*e = Expr::Column(Arc::from(""));
true
}
_ => true,
});
pub fn eval(self, expr: Expr, parallel: bool) -> Expr {
let expr = prepare_eval_expr(expr);

let expr2 = expr.clone();
let func = move |s: Series| {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/src/dsl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod cat;
pub use cat::*;
#[cfg(feature = "temporal")]
mod dt;
mod eval;
mod expr;
pub(crate) mod function_expr;
#[cfg(feature = "compile")]
Expand Down
2 changes: 2 additions & 0 deletions polars/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@
//! - `unique_counts` - Count unique values in expressions.
//! - `log` - Logarithms for `Series`.
//! - `list_to_struct` - Convert `List` to `Struct` dtypes.
//! - `list_eval` - Apply expressions over list elements.
//! - `cumulative_eval` - Apply expressions over cumulatively increasing windows.
//! * `DataFrame` pretty printing
//! - `fmt` - Activate DataFrame formatting
//!
Expand Down
1 change: 1 addition & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ features = [
"partition_by",
"semi_anti_join",
"list_eval",
"cumulative_eval",
"list_to_struct",
"to_dummies",
]
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/expression.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ Computations
Expr.cummax
Expr.cumprod
Expr.cumcount
Expr.cumulative_eval
Expr.dot
Expr.mode
Expr.n_unique
Expand Down
1 change: 1 addition & 0 deletions py-polars/docs/source/reference/series.rst
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ Computations
Series.cummin
Series.cummax
Series.cumprod
Series.cumulative_eval
Series.arg_true
Series.arg_unique
Series.unique
Expand Down
1 change: 1 addition & 0 deletions py-polars/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def version() -> str:
count,
cov,
duration,
element,
exclude,
first,
fold,
Expand Down
2 changes: 1 addition & 1 deletion py-polars/polars/internals/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
from .expr import Expr, expr_to_lit_or_expr, selection_to_pyexpr_list, wrap_expr
from .frame import DataFrame, LazyFrame, wrap_df, wrap_ldf
from .functions import concat, date_range # DataFrame.describe() & DataFrame.upsample()
from .lazy_functions import all, argsort_by, col, concat_list, lit, select
from .lazy_functions import all, argsort_by, col, concat_list, element, lit, select
from .series import Series, wrap_s
from .whenthen import when # used in expr.clip()
59 changes: 57 additions & 2 deletions py-polars/polars/internals/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -2884,7 +2884,7 @@ def log(self, base: float = math.e) -> "Expr":
"""
return wrap_expr(self._pyexpr.log(base))

def entropy(self, base: float = math.e, normalize: bool = False) -> "Expr":
def entropy(self, base: float = math.e, normalize: bool = True) -> "Expr":
"""
Compute the entropy as `-sum(pk * log(pk)`.
where `pk` are discrete probabilities.
Expand All @@ -2899,6 +2899,61 @@ def entropy(self, base: float = math.e, normalize: bool = False) -> "Expr":
"""
return wrap_expr(self._pyexpr.entropy(base, normalize))

def cumulative_eval(
self, expr: "Expr", min_periods: int = 1, parallel: bool = False
) -> "Expr":
"""
Run an expression over a sliding window that increases `1` slot every iteration.
.. warning::
This can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
that visit all elements.
.. warning::
This API is exprerimental and may change without it being considered a breaking change.
Parameters
----------
expr
Expression to evaluate
min_periods
Number of valid values there should be in the window before the expression is evaluated.
valid values = `length - null_count`
parallel
Run in parallel. Don't do this in a groupby or another operation that already has much parallelization.
Examples
--------
>>> df = pl.DataFrame({"values": [1, 2, 3, 4, 5]})
>>> df.select(
... [
... pl.col("values").cumulative_eval(
... pl.element().first() - pl.element().last() ** 2
... )
... ]
... )
shape: (5, 1)
┌────────┐
│ values │
│ --- │
│ f64 │
╞════════╡
│ 0.0 │
├╌╌╌╌╌╌╌╌┤
│ -3.0 │
├╌╌╌╌╌╌╌╌┤
│ -8.0 │
├╌╌╌╌╌╌╌╌┤
│ -15.0 │
├╌╌╌╌╌╌╌╌┤
│ -24.0 │
└────────┘
"""
return wrap_expr(
self._pyexpr.cumulative_eval(expr._pyexpr, min_periods, parallel)
)

# Below are the namespaces defined. Keep these at the end of the definition of Expr, as to not confuse mypy with
# the type annotation `str` with the namespace "str"

Expand Down Expand Up @@ -3554,7 +3609,7 @@ def eval(self, expr: "Expr", parallel: bool = False) -> "Expr":
>>> df = pl.DataFrame({"a": [1, 8, 3], "b": [4, 5, 2]})
>>> df.with_column(
... pl.concat_list(["a", "b"]).arr.eval(pl.first().rank()).alias("rank")
... pl.concat_list(["a", "b"]).arr.eval(pl.element().rank()).alias("rank")
... )
shape: (3, 3)
┌─────┬─────┬────────────┐
Expand Down
7 changes: 7 additions & 0 deletions py-polars/polars/internals/lazy_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,13 @@ def col(
return pli.wrap_expr(pycol(name))


def element() -> "pli.Expr":
"""
Alias for an element in evaluated in an `eval` expression
"""
return col("")


@overload
def count(column: str) -> "pli.Expr":
...
Expand Down
45 changes: 44 additions & 1 deletion py-polars/polars/internals/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,49 @@ def entropy(self, base: float = math.e, normalize: bool = False) -> Optional[flo
"""
return pli.select(pli.lit(self).entropy(base, normalize)).to_series()[0]

def cumulative_eval(
self, expr: "pli.Expr", min_periods: int = 1, parallel: bool = False
) -> "Series":
"""
Run an expression over a sliding window that increases `1` slot every iteration.
.. warning::
This can be really slow as it can have `O(n^2)` complexity. Don't use this for operations
that visit all elements.
.. warning::
This API is exprerimental and may change without it being considered a breaking change.
Parameters
----------
expr
Expression to evaluate
min_periods
Number of valid values there should be in the window before the expression is evaluated.
valid values = `length - null_count`
parallel
Run in parallel. Don't do this in a groupby or another operation that already has much parallelization.
Examples
--------
>>> s = pl.Series("values", [1, 2, 3, 4, 5])
>>> s.cumulative_eval(pl.element().first() - pl.element().last() ** 2)
shape: (5,)
Series: 'values' [f64]
[
0.0
-3.0
-8.0
-15.0
-24.0
]
"""
return pli.select(
pli.lit(self).cumulative_eval(expr, min_periods, parallel)
).to_series()

@property
def name(self) -> str:
"""
Expand Down Expand Up @@ -4449,7 +4492,7 @@ def eval(self, expr: "pli.Expr", parallel: bool = False) -> "Series":
>>> df = pl.DataFrame({"a": [1, 8, 3], "b": [4, 5, 2]})
>>> df.with_column(
... pl.concat_list(["a", "b"]).arr.eval(pl.first().rank()).alias("rank")
... pl.concat_list(["a", "b"]).arr.eval(pl.element().rank()).alias("rank")
... )
shape: (3, 3)
┌─────┬─────┬────────────┐
Expand Down
7 changes: 7 additions & 0 deletions py-polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,13 @@ impl PyExpr {
self.inner.clone().arr().eval(expr.inner, parallel).into()
}

fn cumulative_eval(&self, expr: PyExpr, min_periods: usize, parallel: bool) -> Self {
self.inner
.clone()
.cumulative_eval(expr.inner, min_periods, parallel)
.into()
}

fn lst_to_struct(&self, width_strat: &str, name_gen: Option<PyObject>) -> PyResult<Self> {
let n_fields = match width_strat {
"first_non_null" => ListToStructWidthStrategy::FirstNonNull,
Expand Down

0 comments on commit dd67372

Please sign in to comment.