Skip to content

Commit

Permalink
allow groupby multiple columns in window function; #692
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed May 25, 2021
1 parent 1cea374 commit 72ba71c
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 32 deletions.
3 changes: 3 additions & 0 deletions polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ downsample = ["temporal", "dtype-date64"]
sort_multiple = []
# is_in operation
is_in = []
# dont use this
private = []


# opt-in datatypes for Series
dtype-time64-ns = []
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-core/src/frame/hash_join/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use std::hash::Hash;
use std::ops::Deref;
use unsafe_unwrap::UnsafeUnwrap;

#[cfg(feature = "private")]
pub use self::multiple_keys::private_left_join_multiple_keys;

/// If Categorical types are created without a global string cache or under
/// a different global string cache the mapping will be incorrect.
pub(crate) fn check_categorical_src(l: &Series, r: &Series) -> Result<()> {
Expand Down
6 changes: 6 additions & 0 deletions polars/polars-core/src/frame/hash_join/multiple_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,12 @@ pub(crate) fn inner_join_multiple_keys(
.collect()
})
}

#[cfg(feature = "private")]
pub fn private_left_join_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<(u32, Option<u32>)> {
left_join_multiple_keys(a, b)
}

pub(crate) fn left_join_multiple_keys(a: &DataFrame, b: &DataFrame) -> Vec<(u32, Option<u32>)> {
// we assume that the b DataFrame is the shorter relation.
// b will be used for the build phase.
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ rayon = "1.5"
itertools = "0.10"

polars-io = {version = "0.13.4", path = "../polars-io", features = ["lazy", "csv-file"], default-features=false}
polars-core = {version = "0.13.4", path = "../polars-core", features = ["lazy"], default-features=false}
polars-core = {version = "0.13.4", path = "../polars-core", features = ["lazy", "private"], default-features=false}
polars-arrow = {version = "0.13.4", path = "../polars-arrow"}
# uncomment to have datafusion integration
# when uncommenting we both need to point to the same arrow version
Expand Down
8 changes: 4 additions & 4 deletions polars/polars-lazy/src/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ pub enum Expr {
Window {
/// Also has the input. i.e. avg("foo")
function: Box<Expr>,
partition_by: Box<Expr>,
partition_by: Vec<Expr>,
order_by: Option<Box<Expr>>,
},
Wildcard,
Expand Down Expand Up @@ -784,7 +784,7 @@ impl Expr {
/// .lazy()
/// .select(&[
/// col("groups"),
/// sum("values").over(col("groups")),
/// sum("values").over(vec![col("groups")]),
/// ])
/// .collect()?;
/// dbg!(&out);
Expand Down Expand Up @@ -822,10 +822,10 @@ impl Expr {
/// │ 1 ┆ 16 │
/// ╰────────┴────────╯
/// ```
pub fn over(self, partition_by: Expr) -> Self {
pub fn over(self, partition_by: Vec<Expr>) -> Self {
Expr::Window {
function: Box::new(self),
partition_by: Box::new(partition_by),
partition_by,
order_by: None,
}
}
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-lazy/src/frame.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1682,13 +1682,13 @@ mod test {
let _ = df
.clone()
.lazy()
.select(&[avg("values").over(col("groups")).alias("part")])
.select(&[avg("values").over(vec![col("groups")]).alias("part")])
.collect()
.unwrap();
// test if partition aggregation is correct
let out = df
.lazy()
.select(&[col("groups"), sum("values").over(col("groups"))])
.select(&[col("groups"), sum("values").over(vec![col("groups")])])
.collect()
.unwrap();
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion polars/polars-lazy/src/logical_plan/aexpr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ pub enum AExpr {
},
Window {
function: Node,
partition_by: Node,
partition_by: Vec<Node>,
order_by: Option<Node>,
},
Wildcard,
Expand Down
10 changes: 7 additions & 3 deletions polars/polars-lazy/src/logical_plan/conversion.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use crate::prelude::*;
use polars_core::prelude::*;

fn to_aexprs(input: Vec<Expr>, arena: &mut Arena<AExpr>) -> Vec<Node> {
input.into_iter().map(|e| to_aexpr(e, arena)).collect()
}

// converts expression to AExpr, which uses an arena (Vec) for allocation
pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
let v = match expr {
Expand Down Expand Up @@ -87,7 +91,7 @@ pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
output_type,
collect_groups,
} => AExpr::Function {
input: input.into_iter().map(|e| to_aexpr(e, arena)).collect(),
input: to_aexprs(input, arena),
function,
output_type,
collect_groups,
Expand All @@ -113,7 +117,7 @@ pub(crate) fn to_aexpr(expr: Expr, arena: &mut Arena<AExpr>) -> Node {
order_by,
} => AExpr::Window {
function: to_aexpr(*function, arena),
partition_by: to_aexpr(*partition_by, arena),
partition_by: to_aexprs(partition_by, arena),
order_by: order_by.map(|ob| to_aexpr(*ob, arena)),
},
Expr::Slice {
Expand Down Expand Up @@ -563,7 +567,7 @@ pub(crate) fn node_to_exp(node: Node, expr_arena: &Arena<AExpr>) -> Expr {
order_by,
} => {
let function = Box::new(node_to_exp(function, expr_arena));
let partition_by = Box::new(node_to_exp(partition_by, expr_arena));
let partition_by = nodes_to_exprs(&partition_by, expr_arena);
let order_by = order_by.map(|ob| Box::new(node_to_exp(ob, expr_arena)));
Expr::Window {
function,
Expand Down
8 changes: 6 additions & 2 deletions polars/polars-lazy/src/logical_plan/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ impl<'a> Iterator for ExprIter<'a> {
order_by,
} => {
push(function);
push(partition_by);
for e in partition_by {
push(e)
}
if let Some(e) = order_by {
push(e);
}
Expand Down Expand Up @@ -176,7 +178,9 @@ impl AExpr {
order_by,
} => {
push(function);
push(partition_by);
for e in partition_by {
push(e);
}
if let Some(e) = order_by {
push(e);
}
Expand Down
32 changes: 23 additions & 9 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ use crate::logical_plan::Context;
use crate::physical_plan::state::ExecutionState;
use crate::prelude::*;
use polars_core::frame::groupby::GroupBy;
use polars_core::frame::hash_join::private_left_join_multiple_keys;
use polars_core::prelude::*;
use std::sync::Arc;

pub struct WindowExpr {
/// the root column that the Function will be applied on.
/// This will be used to create a smaller DataFrame to prevent taking unneeded columns by index
pub(crate) group_column: Arc<dyn PhysicalExpr>,
pub(crate) group_by: Vec<Arc<dyn PhysicalExpr>>,
pub(crate) apply_column: Arc<String>,
pub(crate) out_name: Option<Arc<String>>,
/// A function Expr. i.e. Mean, Median, Max, etc.
Expand All @@ -33,8 +34,14 @@ impl PhysicalExpr for WindowExpr {
.iter()
.for_each(|s| key.push_str(&format!("{}", s.get_data_ptr())));

let groupby_column = self.group_column.evaluate(df, state)?;
key.push_str(groupby_column.name());
let groupby_columns = self
.group_by
.iter()
.map(|e| e.evaluate(df, state))
.collect::<Result<Vec<_>>>()?;
groupby_columns.iter().for_each(|e| {
key.push_str(e.name());
});

// 1. get the group tuples
// We keep the lock for the entire window expression, we want those to be sequential
Expand Down Expand Up @@ -62,15 +69,15 @@ impl PhysicalExpr for WindowExpr {
let groups = match groups_lock.get_mut(&key) {
Some(groups) => std::mem::take(groups),
None => {
let mut gb = df.groupby_with_series(vec![groupby_column.clone()], true)?;
let mut gb = df.groupby_with_series(groupby_columns.clone(), true)?;
std::mem::take(gb.get_groups_mut())
}
};

// 2. create GroupBy object and apply aggregation
let mut gb = GroupBy::new(
df,
vec![groupby_column.clone()],
groupby_columns.clone(),
groups,
Some(vec![&self.apply_column]),
);
Expand Down Expand Up @@ -111,14 +118,21 @@ impl PhysicalExpr for WindowExpr {
drop(groups_lock);

// 3. get the join tuples and use them to take the new Series
let out_column = out.select_at_idx(1).unwrap();
let out_column = out.select_at_idx(out.width() - 1).unwrap();
let mut join_tuples_lock = state.join_tuples.lock().unwrap();
let opt_join_tuples = match join_tuples_lock.get_mut(&key) {
Some(t) => std::mem::take(t),
None => {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_column.hash_join_left(right)
if groupby_columns.len() == 1 {
// group key from right column
let right = out.select_at_idx(0).unwrap();
groupby_columns[0].hash_join_left(right)
} else {
let df_right =
DataFrame::new_no_checks(out.get_columns()[..out.width() - 1].to_vec());
let df_left = DataFrame::new_no_checks(groupby_columns);
private_left_join_multiple_keys(&df_left, &df_right)
}
}
};

Expand Down
6 changes: 3 additions & 3 deletions polars/polars-lazy/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,8 +396,8 @@ impl DefaultPlanner {
order_by: _,
} => {
// TODO! Order by
let group_column =
self.create_physical_expr(partition_by, Context::Default, expr_arena)?;
let group_by =
self.create_physical_expressions(&partition_by, Context::Default, expr_arena)?;
let mut out_name = None;
let mut apply_columns = aexpr_to_root_names(function, expr_arena);
if apply_columns.len() > 1 {
Expand All @@ -414,7 +414,7 @@ impl DefaultPlanner {
let function = node_to_exp(function, expr_arena);

Ok(Arc::new(WindowExpr {
group_column,
group_by,
apply_column,
out_name,
function,
Expand Down
11 changes: 11 additions & 0 deletions py-polars/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,17 @@

The Rust crate `polars` has its own changelog.

### polars 0.7.18
* feature
- window function by multiple group columns

* bug fix
- fix bug in argsort multiple

### polars 0.7.18
* feature
- argsort multiple columns

### polars 0.7.17
* feature
- support more indexing
Expand Down
10 changes: 5 additions & 5 deletions py-polars/polars/lazy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1071,7 +1071,7 @@ def list(self) -> "Expr":
"""
return wrap_expr(self._pyexpr.list())

def over(self, expr: "Union[str, Expr]") -> "Expr":
def over(self, expr: "Union[str, Expr, List[Expr]]") -> "Expr":
"""
Apply window function over a subgroup.
This is similar to a groupby + aggregation + self join.
Expand All @@ -1080,7 +1080,7 @@ def over(self, expr: "Union[str, Expr]") -> "Expr":
Parameters
----------
expr
Expression that evaluates to a column of groups
Column(s) to group by.
Examples
--------
Expand Down Expand Up @@ -1129,10 +1129,10 @@ def over(self, expr: "Union[str, Expr]") -> "Expr":
```
"""
if isinstance(expr, str):
expr = col(expr)

return wrap_expr(self._pyexpr.over(expr._pyexpr))
pyexprs = _selection_to_pyexpr_list(expr)

return wrap_expr(self._pyexpr.over(pyexprs))

def is_unique(self) -> "Expr":
"""
Expand Down
5 changes: 3 additions & 2 deletions py-polars/src/lazy/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,9 @@ impl PyExpr {
self.clone().inner.is_duplicated().into()
}

pub fn over(&self, partition_by: PyExpr) -> PyExpr {
self.clone().inner.over(partition_by.inner).into()
pub fn over(&self, partition_by: Vec<PyExpr>) -> PyExpr {
let partition_by = partition_by.into_iter().map(|e| e.inner).collect();
self.clone().inner.over(partition_by).into()
}

pub fn _and(&self, expr: PyExpr) -> PyExpr {
Expand Down
24 changes: 24 additions & 0 deletions py-polars/tests/test_lazy.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,27 @@ def test_arange():
def test_arg_sort():
df = pl.DataFrame({"a": [4, 1, 3]})
assert df[col("a").arg_sort()]["a"] == [1, 2, 0]


def test_window_function():
df = pl.DataFrame(
{
"A": [1, 2, 3, 4, 5],
"fruits": ["banana", "banana", "apple", "apple", "banana"],
"B": [5, 4, 3, 2, 1],
"cars": ["beetle", "audi", "beetle", "beetle", "beetle"],
}
)

q = df.lazy().with_columns(
[
pl.sum("A").over("fruits").alias("fruit_sum_A"),
pl.first("B").over("fruits").alias("fruit_first_B"),
pl.max("B").over("cars").alias("cars_max_B"),
]
)
out = q.collect()
assert out["cars_max_B"] == [5, 4, 5, 5, 5]

out = df[[pl.first("B").over(["fruits", "cars"])]]
assert out["B_first"] == [5, 4, 3, 3, 5]

0 comments on commit 72ba71c

Please sign in to comment.