Skip to content

Commit

Permalink
fix[rust]: fix trails collection and comparison in cse optimzer (#4927)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Sep 22, 2022
1 parent 0c07902 commit 289fb00
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 5 deletions.
2 changes: 2 additions & 0 deletions polars/polars-lazy/src/logical_plan/optimizer/cache_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ fn get_upper_projections(
}
}

/// This will ensure that all equal caches communicate the amount of columns
/// they need to project.
pub(crate) fn set_cache_states(
root: Node,
lp_arena: &mut Arena<ALogicalPlan>,
Expand Down
23 changes: 18 additions & 5 deletions polars/polars-lazy/src/logical_plan/optimizer/cse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,17 +51,24 @@ pub(crate) fn collect_trails(
Union { inputs, .. } => {
let new_trail = trails.get(id).unwrap().clone();

for input in inputs.iter() {
let last_i = inputs.len() - 1;

for (i, input) in inputs.iter().enumerate() {
collect_trails(*input, lp_arena, trails, id, true)?;
*id += 1;
trails.insert(*id, new_trail.clone());

// don't add a trail on the last iteration as that would only add a Union
// without any inputs
if i != last_i {
*id += 1;
trails.insert(*id, new_trail.clone());
}
}
}
ExtContext { .. } => {
// block for now.
}
lp => {
// other nodes have only a single
// other nodes have only a single input
let nodes = &mut [None];
lp.copy_inputs(nodes);
if let Some(input) = nodes[0] {
Expand Down Expand Up @@ -258,6 +265,7 @@ fn lp_node_equal(a: &ALogicalPlan, b: &ALogicalPlan, expr_arena: &Arena<AExpr>)
}
}

/// Iterate from two leaf location upwards and find the latest matching node.
fn longest_subgraph(
trail_a: &Trail,
trail_b: &Trail,
Expand All @@ -274,6 +282,12 @@ fn longest_subgraph(

// iterates from the leafs upwards
for (node_a, node_b) in trail_a.iter().rev().zip(trail_b.iter().rev()) {
// we never include the root that splits a trail
// e.g. don't want to cache the join/union, but
// we want to cache the similar inputs
if *node_a == *node_b {
break;
}
let a = lp_arena.get(*node_a);
let b = lp_arena.get(*node_b);

Expand Down Expand Up @@ -313,7 +327,6 @@ pub(crate) fn elim_cmn_subplans(
// search from the leafs upwards and find the longest shared subplans
let mut trail_ends = vec![];

// let mut equal_trails = vec![];
for i in 0..trails.len() {
let trail_i = &trails[i];

Expand Down
50 changes: 50 additions & 0 deletions polars/polars-lazy/src/tests/cse.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::collections::BTreeSet;

use super::*;

fn cached_before_root(q: LazyFrame) {
Expand Down Expand Up @@ -90,3 +92,51 @@ fn test_cse_cache_union_projection_pd() -> PolarsResult<()> {

Ok(())
}

#[test]
fn test_cse_union2_4925() -> PolarsResult<()> {
let lf1 = df![
"ts" => [1],
"sym" => ["a"],
"c" => [true],
]?
.lazy();

let lf2 = df![
"ts" => [1],
"d" => [3],
]?
.lazy();

let lf1 = concat(&[lf1.clone(), lf1], false, false)?;
let lf2 = concat(&[lf2.clone(), lf2], false, false)?;

let q = lf1.inner_join(lf2, col("ts"), col("ts")).select([
col("ts"),
col("sym"),
col("d") / col("c"),
]);

let (mut expr_arena, mut lp_arena) = get_arenas();
let lp = q.optimize(&mut lp_arena, &mut expr_arena).unwrap();

// ensure we get two different caches
// and ensure that every cache only has 1 hit.
let cache_ids = (&lp_arena)
.iter(lp)
.flat_map(|(_, lp)| {
use ALogicalPlan::*;
match lp {
Cache { id, count, .. } => {
assert_eq!(*count, 1);
Some(*id)
}
_ => None,
}
})
.collect::<BTreeSet<_>>();

assert_eq!(cache_ids.len(), 2);

Ok(())
}
9 changes: 9 additions & 0 deletions polars/polars-lazy/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,15 @@ pub(crate) fn agg_source_paths(
ParquetScan { path, .. } => {
paths.insert(path.clone());
}
#[cfg(feature = "ipc")]
IpcScan { path, .. } => {
paths.insert(path.clone());
}
// always block parallel on anonymous sources
// as we cannot know if they will lock or not.
AnonymousScan { .. } => {
paths.insert("anonymous".into());
}
_ => {}
}
})
Expand Down

0 comments on commit 289fb00

Please sign in to comment.