Skip to content

Commit

Permalink
fix(rust, python): fix duplicate caches in cse and prevent quadratic … (
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 16, 2022
1 parent c876500 commit c981a38
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 7 deletions.
36 changes: 29 additions & 7 deletions polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cse.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Common Subplan Elimination

use std::collections::BTreeMap;
use std::collections::{BTreeMap, BTreeSet};
use std::hash::{BuildHasher, Hash, Hasher};

use polars_core::prelude::*;
Expand Down Expand Up @@ -50,6 +50,10 @@ pub(super) fn collect_trails(
collect_trails(*input_right, lp_arena, trails, id, true)?;
}
Union { inputs, .. } => {
if inputs.len() > 200 {
// don't even bother with cse on this many inputs
return None;
}
let new_trail = trails.get(id).unwrap().clone();

let last_i = inputs.len() - 1;
Expand Down Expand Up @@ -246,16 +250,17 @@ fn longest_subgraph(
trail_b: &Trail,
lp_arena: &Arena<ALogicalPlan>,
expr_arena: &Arena<AExpr>,
) -> Option<(Node, Node)> {
) -> Option<(Node, Node, bool)> {
if trail_a.is_empty() || trail_b.is_empty() {
return None;
}
let mut prev_node_a = Node(0);
let mut prev_node_b = Node(0);
let mut is_equal;
let mut i = 0;
let mut entirely_equal = trail_a.len() == trail_b.len();

// iterates from the leafs upwards
// iterates from the leaves upwards
for (node_a, node_b) in trail_a.iter().rev().zip(trail_b.iter().rev()) {
// we never include the root that splits a trail
// e.g. don't want to cache the join/union, but
Expand All @@ -269,6 +274,7 @@ fn longest_subgraph(
is_equal = lp_node_equal(a, b, expr_arena);

if !is_equal {
entirely_equal = false;
break;
}

Expand All @@ -278,7 +284,7 @@ fn longest_subgraph(
}
// previous node was equal
if i > 0 {
Some((prev_node_a, prev_node_b))
Some((prev_node_a, prev_node_b, entirely_equal))
} else {
None
}
Expand All @@ -301,14 +307,27 @@ pub(crate) fn elim_cmn_subplans(

// search from the leafs upwards and find the longest shared subplans
let mut trail_ends = vec![];
// if i matches j
// we don't need to search with j as they are equal
// this is very important as otherwise we get quadratic behavior
let mut to_skip = BTreeSet::new();

for i in 0..trails.len() {
if to_skip.contains(&i) {
continue;
}
let trail_i = &trails[i];

// we only look forwards, then we traverse all combinations
for trail_j in trails.iter().skip(i + 1) {
if let Some(res) = longest_subgraph(trail_i, trail_j, lp_arena, expr_arena) {
trail_ends.push(res)
for (j, trail_j) in trails.iter().enumerate().skip(i + 1) {
if let Some((a, b, all_equal)) =
longest_subgraph(trail_i, trail_j, lp_arena, expr_arena)
{
// then we can skip `j` as we already searched with trail `i` which is equal
if all_equal {
to_skip.insert(j);
}
trail_ends.push((a, b))
}
}
}
Expand Down Expand Up @@ -359,6 +378,9 @@ pub(crate) fn elim_cmn_subplans(
let node2 = combination.1 .0;

let cache_id = match (cache_mapping.get(&node1), cache_mapping.get(&node2)) {
// (Some(_), Some(_)) => {
// continue
// }
(Some(h), _) => *h,
(_, Some(h)) => *h,
_ => {
Expand Down
18 changes: 18 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import re

import polars as pl


Expand All @@ -13,3 +15,19 @@ def test_cse_rename_cross_join_5405() -> None:
"A": [1, 2, 1, 2],
"D": [5, None, None, 6],
}


def test_union_duplicates() -> None:
n_dfs = 10
df_lazy = pl.DataFrame({}).lazy()
lazy_dfs = [df_lazy for _ in range(n_dfs)]
assert (
len(
re.findall(
r".*CACHE\[id: .*, count: 9].*",
pl.concat(lazy_dfs).describe_optimized_plan(),
flags=re.MULTILINE,
)
)
== 10
)

0 comments on commit c981a38

Please sign in to comment.