Skip to content

Commit

Permalink
feat[rust, python]: run expressions in pivot (#4553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Aug 24, 2022
1 parent 4930ea7 commit eb3b53a
Show file tree
Hide file tree
Showing 16 changed files with 239 additions and 64 deletions.
1 change: 1 addition & 0 deletions polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ meta = ["polars-lazy/meta"]
date_offset = ["polars-lazy/date_offset"]
trigonometry = ["polars-lazy/trigonometry"]
sign = ["polars-lazy/sign"]
pivot = ["polars-lazy/pivot"]

test = [
"lazy",
Expand Down
8 changes: 8 additions & 0 deletions polars/polars-core/src/frame/groupby/expr.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use crate::prelude::*;

pub trait PhysicalAggExpr {
#[allow(clippy::ptr_arg)]
fn evaluate<'a>(&self, df: &DataFrame, groups: &'a GroupsProxy) -> Result<Series>;

fn root_name(&self) -> Result<&str>;
}
1 change: 1 addition & 0 deletions polars/polars-core/src/frame/groupby/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::vector_hasher::{get_null_hash_value, AsU64, StrHash};
use crate::POOL;

pub mod aggregations;
pub mod expr;
pub(crate) mod hashing;
mod into_groups;
#[cfg(feature = "rows")]
Expand Down
22 changes: 19 additions & 3 deletions polars/polars-core/src/frame/groupby/pivot.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use rayon::prelude::*;

use super::GroupBy;
use crate::frame::groupby::expr::PhysicalAggExpr;
use crate::frame::groupby::hashing::HASHMAP_INIT_SIZE;
use crate::prelude::*;
use crate::POOL;

#[derive(Copy, Clone)]
#[derive(Clone)]
pub enum PivotAgg {
First,
Sum,
Expand All @@ -15,6 +16,7 @@ pub enum PivotAgg {
Median,
Count,
Last,
Expr(Arc<dyn PhysicalAggExpr + Send + Sync>),
}

fn restore_logical_type(s: &Series, logical_type: &DataType) -> Series {
Expand Down Expand Up @@ -71,6 +73,11 @@ impl DataFrame {
self.pivot_impl(&values, &index, &columns, agg_fn, sort_columns, false)
}

/// Do a pivot operation based on the group key, a pivot column and an aggregation function on the values column.
///
/// # Note
/// Polars'/arrow memory is not ideal for transposing operations like pivots.
/// If you have a relatively large table, consider using a groupby over a pivot.
pub fn pivot_stable<I0, S0, I1, S1, I2, S2>(
&self,
values: I0,
Expand Down Expand Up @@ -272,8 +279,8 @@ impl DataFrame {
let (col_locations, column_agg) = col?;
let (row_locations, n_rows, mut row_index) = row?;

for value_col in values {
let value_col = self.column(value_col)?;
for value_col_name in values {
let value_col = self.column(value_col_name)?;

use PivotAgg::*;
let value_agg = unsafe {
Expand All @@ -286,6 +293,15 @@ impl DataFrame {
Mean => value_col.agg_mean(&groups),
Median => value_col.agg_median(&groups),
Count => groups.group_count().into_series(),
Expr(ref expr) => {
let name = expr.root_name()?;
let mut value_col = value_col.clone();
value_col.rename(name);
let tmp_df = DataFrame::new_no_checks(vec![value_col]);
let mut aggregated = expr.evaluate(&tmp_df, &groups)?;
aggregated.rename(value_col_name);
aggregated
}
}
};

Expand Down
35 changes: 14 additions & 21 deletions polars/polars-core/src/series/arithmetic/borrowed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ pub(crate) fn coerce_lhs_rhs<'a>(
lhs: &'a Series,
rhs: &'a Series,
) -> Result<(Cow<'a, Series>, Cow<'a, Series>)> {
if let Ok(result) = coerce_time_units(lhs, rhs) {
if let Some(result) = coerce_time_units(lhs, rhs) {
return Ok(result);
}
let dtype = match (lhs.dtype(), rhs.dtype()) {
Expand Down Expand Up @@ -354,51 +354,44 @@ pub(crate) fn coerce_lhs_rhs<'a>(
fn coerce_time_units<'a>(
lhs: &'a Series,
rhs: &'a Series,
) -> Result<(Cow<'a, Series>, Cow<'a, Series>)> {
return if let (DataType::Datetime(lu, t), DataType::Duration(ru)) = (lhs.dtype(), rhs.dtype()) {
) -> Option<(Cow<'a, Series>, Cow<'a, Series>)> {
if let (DataType::Datetime(lu, t), DataType::Duration(ru)) = (lhs.dtype(), rhs.dtype()) {
let units = get_time_units(lu, ru);
let left = if *lu == units {
Cow::Borrowed(lhs)
} else {
Cow::Owned(lhs.cast(&DataType::Datetime(units, t.clone()))?)
Cow::Owned(lhs.cast(&DataType::Datetime(units, t.clone())).ok()?)
};
let right = if *ru == units {
Cow::Borrowed(rhs)
} else {
Cow::Owned(rhs.cast(&DataType::Duration(units))?)
Cow::Owned(rhs.cast(&DataType::Duration(units)).ok()?)
};
Ok((left, right))
Some((left, right))
} else if let (DataType::Duration(lu), DataType::Duration(ru)) = (lhs.dtype(), rhs.dtype()) {
let units = get_time_units(lu, ru);
let left = if *lu == units {
Cow::Borrowed(lhs)
} else {
Cow::Owned(lhs.cast(&DataType::Duration(units))?)
Cow::Owned(lhs.cast(&DataType::Duration(units)).ok()?)
};
let right = if *ru == units {
Cow::Borrowed(rhs)
} else {
Cow::Owned(rhs.cast(&DataType::Duration(units))?)
Cow::Owned(rhs.cast(&DataType::Duration(units)).ok()?)
};
Ok((left, right))
Some((left, right))
} else if let (DataType::Date, DataType::Duration(units)) = (lhs.dtype(), rhs.dtype()) {
let left = Cow::Owned(lhs.cast(&DataType::Datetime(*units, None))?);
Ok((left, Cow::Borrowed(rhs)))
let left = Cow::Owned(lhs.cast(&DataType::Datetime(*units, None)).ok()?);
Some((left, Cow::Borrowed(rhs)))
} else if let (DataType::Duration(_), DataType::Datetime(_, _))
| (DataType::Duration(_), DataType::Date) = (lhs.dtype(), rhs.dtype())
{
let (right, left) = coerce_time_units(rhs, lhs)?;
Ok((left, right))
Some((left, right))
} else {
Err(PolarsError::InvalidOperation(
format!(
"Cannot coerce time units for {} {}",
lhs.dtype(),
rhs.dtype()
)
.into(),
))
};
None
}
}

impl ops::Sub for &Series {
Expand Down
1 change: 1 addition & 0 deletions polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ string_justify = ["polars-ops/string_justify"]
arg_where = []
search_sorted = ["polars-ops/search_sorted"]
meta = []
pivot = ["polars-core/rows"]

# no guarantees whatsoever
private = ["polars-time/private"]
Expand Down
16 changes: 0 additions & 16 deletions polars/polars-lazy/src/dsl/eval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,6 @@ use rayon::prelude::*;
use super::*;
use crate::physical_plan::state::ExecutionState;

#[cfg(feature = "list_eval")]
pub(super) fn prepare_eval_expr(mut expr: Expr) -> Expr {
expr.mutate().apply(|e| match e {
Expr::Column(name) => {
*name = Arc::from("");
true
}
Expr::Nth(_) => {
*e = Expr::Column(Arc::from(""));
true
}
_ => true,
});
expr
}

impl Expr {
/// Run an expression over a sliding window that increases `1` slot every iteration.
///
Expand Down
24 changes: 5 additions & 19 deletions polars/polars-lazy/src/dsl/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ use polars_core::series::ops::NullBehavior;
use polars_ops::prelude::*;
use rayon::prelude::*;

#[cfg(feature = "list_eval")]
use crate::dsl::eval::prepare_eval_expr;
use crate::dsl::function_expr::FunctionExpr;
#[cfg(feature = "list_eval")]
use crate::physical_plan::exotic::prepare_eval_expr;
#[cfg(feature = "list_eval")]
use crate::physical_plan::exotic::prepare_expression_for_context;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

Expand Down Expand Up @@ -223,24 +225,8 @@ impl ListNameSpace {
let func = move |s: Series| {
let lst = s.list()?;

let mut lp_arena = Arena::with_capacity(8);
let mut expr_arena = Arena::with_capacity(10);

// create a dummy lazyframe and run a very simple optimization run so that
// type coercion and simplify expression optimizations run.
let column = Series::full_null("", 0, &lst.inner_dtype());
let lf = DataFrame::new_no_checks(vec![column])
.lazy()
.without_optimizations()
.with_simplify_expr(true)
.select([expr.clone()]);
let optimized = lf.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let lp = lp_arena.get(optimized);
let aexpr = lp.get_exprs().pop().unwrap();

let planner = PhysicalPlanner::default();
let phys_expr =
planner.create_physical_expr(aexpr, Context::Default, &mut expr_arena)?;
prepare_expression_for_context("", &expr, &lst.inner_dtype(), Context::Default)?;

let state = ExecutionState::new();

Expand Down
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ mod parquet;
mod python;

mod anonymous_scan;
#[cfg(feature = "pivot")]
pub mod pivot;

use std::borrow::Cow;
use std::sync::Arc;
Expand Down
83 changes: 83 additions & 0 deletions polars/polars-lazy/src/frame/pivot.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
//! Polars lazy does not implement a pivot because it is impossible to know the schema without
//! materializing the whole dataset. This makes a pivot quite a terrible operation for performant
//! workflows. An optimization can never be pushed down passed a pivot.
//!
//! We can do a pivot on an eager `DataFrame` as that is already materialized. The code for the
//! pivot is here, because we want to be able to pass expressions to the pivot operation.
//!

use polars_core::frame::groupby::PivotAgg;
use polars_core::{frame::groupby::expr::PhysicalAggExpr, prelude::*};

use crate::physical_plan::exotic::{prepare_eval_expr, prepare_expression_for_context};
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;

impl PhysicalAggExpr for Expr {
fn evaluate<'a>(&self, df: &DataFrame, groups: &'a GroupsProxy) -> Result<Series> {
let state = ExecutionState::new();
let dtype = df.get_columns()[0].dtype();
let phys_expr = prepare_expression_for_context("", self, dtype, Context::Aggregation)?;
phys_expr
.evaluate_on_groups(df, groups, &state)
.map(|mut ac| ac.aggregated())
}

fn root_name(&self) -> Result<&str> {
Ok("")
}
}

pub fn pivot<I0, S0, I1, S1, I2, S2>(
df: &DataFrame,
values: I0,
index: I1,
columns: I2,
agg_expr: Expr,
sort_columns: bool,
) -> Result<DataFrame>
where
I0: IntoIterator<Item = S0>,
S0: AsRef<str>,
I1: IntoIterator<Item = S1>,
S1: AsRef<str>,
I2: IntoIterator<Item = S2>,
S2: AsRef<str>,
{
// make sure that the root column is replaced
let expr = prepare_eval_expr(agg_expr);
df.pivot(
values,
index,
columns,
PivotAgg::Expr(Arc::new(expr)),
sort_columns,
)
}

pub fn pivot_stable<I0, S0, I1, S1, I2, S2>(
df: &DataFrame,
values: I0,
index: I1,
columns: I2,
agg_expr: Expr,
sort_columns: bool,
) -> Result<DataFrame>
where
I0: IntoIterator<Item = S0>,
S0: AsRef<str>,
I1: IntoIterator<Item = S1>,
S1: AsRef<str>,
I2: IntoIterator<Item = S2>,
S2: AsRef<str>,
{
// make sure that the root column is replaced
let expr = prepare_eval_expr(agg_expr);
df.pivot_stable(
values,
index,
columns,
PivotAgg::Expr(Arc::new(expr)),
sort_columns,
)
}
43 changes: 43 additions & 0 deletions polars/polars-lazy/src/physical_plan/exotic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use polars_core::prelude::*;

use crate::prelude::*;

pub(crate) fn prepare_eval_expr(mut expr: Expr) -> Expr {
expr.mutate().apply(|e| match e {
Expr::Column(name) => {
*name = Arc::from("");
true
}
Expr::Nth(_) => {
*e = Expr::Column(Arc::from(""));
true
}
_ => true,
});
expr
}

pub(crate) fn prepare_expression_for_context(
name: &str,
expr: &Expr,
dtype: &DataType,
ctxt: Context,
) -> Result<Arc<dyn PhysicalExpr>> {
let mut lp_arena = Arena::with_capacity(8);
let mut expr_arena = Arena::with_capacity(10);

// create a dummy lazyframe and run a very simple optimization run so that
// type coercion and simplify expression optimizations run.
let column = Series::full_null(name, 0, dtype);
let lf = DataFrame::new_no_checks(vec![column])
.lazy()
.without_optimizations()
.with_simplify_expr(true)
.select([expr.clone()]);
let optimized = lf.optimize(&mut lp_arena, &mut expr_arena).unwrap();
let lp = lp_arena.get(optimized);
let aexpr = lp.get_exprs().pop().unwrap();

let planner = PhysicalPlanner::default();
planner.create_physical_expr(aexpr, ctxt, &mut expr_arena)
}
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/physical_plan/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
pub mod executors;
#[cfg(any(feature = "list_eval", feature = "pivot"))]
pub(crate) mod exotic;
pub mod expressions;
#[cfg(any(feature = "ipc", feature = "parquet", feature = "csv-file"))]
mod file_cache;
Expand Down
2 changes: 2 additions & 0 deletions py-polars/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ lazy_regex = ["polars/lazy_regex"]
csv-file = ["polars/csv-file"]
object = ["polars/object"]
extract_jsonpath = ["polars/extract_jsonpath"]
pivot = ["polars/pivot"]

all = [
"json",
Expand All @@ -78,6 +79,7 @@ all = [
"extract_jsonpath",
"polars/timezones",
"object",
"pivot",
]

# we cannot conditionaly activate simd
Expand Down

0 comments on commit eb3b53a

Please sign in to comment.