Skip to content

Commit

Permalink
don't cache in-expression window functions (#3840)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Jun 28, 2022
1 parent fdd5529 commit be5ed33
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 5 deletions.
7 changes: 5 additions & 2 deletions polars/polars-lazy/src/physical_plan/expressions/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,13 @@ impl PhysicalExpr for BinaryExpr {
}

fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let mut state = state.clone();
// don't cache window functions as they run in parallel
state.cache_window = false;
let (lhs, rhs) = POOL.install(|| {
rayon::join(
|| self.left.evaluate(df, state),
|| self.right.evaluate(df, state),
|| self.left.evaluate(df, &state),
|| self.right.evaluate(df, &state),
)
});
apply_operator_owned(lhs?, rhs?, self.op)
Expand Down
9 changes: 6 additions & 3 deletions polars/polars-lazy/src/physical_plan/expressions/ternary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ impl PhysicalExpr for TernaryExpr {
&self.expr
}
fn evaluate(&self, df: &DataFrame, state: &ExecutionState) -> Result<Series> {
let mask_series = self.predicate.evaluate(df, state)?;
let mut state = state.clone();
// don't cache window functions as they run in parallel
state.cache_window = false;
let mask_series = self.predicate.evaluate(df, &state)?;
let mut mask = mask_series.bool()?.clone();

let op_truthy = || self.truthy.evaluate(df, state);
let op_falsy = || self.falsy.evaluate(df, state);
let op_truthy = || self.truthy.evaluate(df, &state);
let op_falsy = || self.falsy.evaluate(df, &state);

let (truthy, falsy) = POOL.install(|| rayon::join(op_truthy, op_falsy));
let mut truthy = truthy?;
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-lazy/src/physical_plan/expressions/window.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,9 @@ impl PhysicalExpr for WindowExpr {
// and every partition run the cache should be empty so we expect a max of 1.
debug_assert!(gt_map.len() <= 1);
if let Some(gt) = gt_map.get_mut(&cache_key) {
// We take now, but it is important that we set this before we return!
// a next windows function may get this cached key and get an empty if this
// does not happen
(std::mem::take(gt), true, cache_key)
} else {
(create_groups()?, false, cache_key)
Expand Down
24 changes: 24 additions & 0 deletions py-polars/tests/db-benchmark/various.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,27 @@
)
assert computed[0, "min"] == minimum
assert computed[0, "max"] == maximum


def test_windows_not_cached() -> None:
ldf = (
pl.DataFrame(
[
pl.Series("key", ["a", "a", "b", "b"]),
pl.Series("val", [2, 2, 1, 3]),
]
)
.lazy()
.filter(
(pl.col("key").cumcount().over("key") == 0)
| (pl.col("val").shift(1).over("key").is_not_null())
| (pl.col("val") != pl.col("val").shift(1).over("key"))
)
)
# this might fail if they are cached
for i in range(1000):
ldf.collect()


if __name__ == "__main__":
test_windows_not_cached()

0 comments on commit be5ed33

Please sign in to comment.