Skip to content

Commit

Permalink
fix(rust, python): fix cse for nested caches (#5412)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Nov 3, 2022
1 parent 39e061c commit 386af6a
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 101 deletions.
235 changes: 134 additions & 101 deletions polars/polars-lazy/polars-plan/src/logical_plan/optimizer/cache_states.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ fn get_upper_projections(
let parent = lp_arena.get(parent);

use ALogicalPlan::*;
// during projection pushdown all accumulated p
// during projection pushdown all accumulated
match parent {
Projection { expr, .. } | HStack { exprs: expr, .. } => {
Projection { expr, .. } => {
let mut out = Vec::with_capacity(expr.len());
for node in expr {
out.extend(aexpr_to_leaf_names_iter(*node, expr_arena));
Expand All @@ -36,117 +36,150 @@ pub(super) fn set_cache_states(
scratch: &mut Vec<Node>,
has_caches: bool,
) {
scratch.clear();

// per cache id holds:
// a Vec: with (parent, child) pairs
// a Set: with the union of column names.
let mut cache_schema_and_children = BTreeMap::new();

let mut loop_count = 0;
let mut stack = Vec::with_capacity(4);
stack.push((root, None, None, None));

// first traversal
// collect the union of columns per cache id.
// and find the cache parents
while let Some((node, mut cache_id, mut parent, mut previous_cache)) = stack.pop() {
let lp = lp_arena.get(node);
lp.copy_inputs(scratch);

use ALogicalPlan::*;
match lp {
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Join { options, .. } if has_caches && options.allow_parallel => {
if let Join { options, .. } = lp_arena.get_mut(node) {
options.allow_parallel = false;
}
}
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Union { options, .. } if has_caches && options.parallel => {
if let Union { options, .. } = lp_arena.get_mut(node) {
options.parallel = false;

// we loop because there can be nested caches and we must run the projection pushdown
// optimization between cache nodes.
loop {
scratch.clear();
stack.clear();

// per cache id holds:
// a Vec: with children of the node
// a Set: with the union of projected column names.
// a Set: with the union of hstack column names.
let mut cache_schema_and_children = BTreeMap::new();

stack.push((root, None, None, None, 0));

// the depth of the caches in a single tree branch
let mut max_cache_depth = 0;

// first traversal
// collect the union of columns per cache id.
// and find the cache parents
while let Some((
current_node,
mut cache_id,
mut parent,
mut previous_cache,
mut caches_seen,
)) = stack.pop()
{
let lp = lp_arena.get(current_node);
lp.copy_inputs(scratch);

use ALogicalPlan::*;
match lp {
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Join { options, .. } if has_caches && options.allow_parallel => {
if let Join { options, .. } = lp_arena.get_mut(current_node) {
options.allow_parallel = false;
}
}
}
Cache { input, id, .. } => {
if let Some(cache_id) = cache_id {
previous_cache = Some(cache_id)
// don't allow parallelism as caches need eachothers work
// also self-referencing plans can deadlock on the files they lock
Union { options, .. } if has_caches && options.parallel => {
if let Union { options, .. } = lp_arena.get_mut(current_node) {
options.parallel = false;
}
}
if let Some(parent_node) = parent {
// projection pushdown has already run and blocked on cache nodes
// the pushed down columns are projected just above this cache
// if there were no pushed down column, we just take the current
// nodes schema
// we never want to naively take parents, as a join or aggregate for instance
// change the schema

let entry = cache_schema_and_children.entry(*id).or_insert_with(|| {
(
Vec::new(),
PlIndexSet::with_capacity_and_hasher(0, Default::default()),
)
});
entry.0.push(*input);

if let Some(names) = get_upper_projections(parent_node, lp_arena, expr_arena) {
entry.1.extend(names);
Cache { input, id, .. } => {
caches_seen += 1;

// no need to run the same cache optimization twice
if loop_count > caches_seen {
continue;
}
// There was no projection and we must take
// all columns
else {
let schema = lp.schema(lp_arena);
entry
.1
.extend(schema.iter_names().map(|name| Arc::from(name.as_str())));

max_cache_depth = std::cmp::max(caches_seen, max_cache_depth);
if let Some(cache_id) = cache_id {
previous_cache = Some(cache_id)
}
if let Some(parent_node) = parent {
// projection pushdown has already run and blocked on cache nodes
// the pushed down columns are projected just above this cache
// if there were no pushed down column, we just take the current
// nodes schema
// we never want to naively take parents, as a join or aggregate for instance
// change the schema

let (children, union_names) =
cache_schema_and_children.entry(*id).or_insert_with(|| {
(
Vec::new(),
PlIndexSet::with_capacity_and_hasher(0, Default::default()),
)
});
children.push(*input);

if let Some(names) =
get_upper_projections(parent_node, lp_arena, expr_arena)
{
union_names.extend(names);
}
// There was no explicit projection and we must take
// all columns
else {
let schema = lp.schema(lp_arena);
union_names
.extend(schema.iter_names().map(|name| Arc::from(name.as_str())));
}
}
cache_id = Some(*id);
}
cache_id = Some(*id);
_ => {}
}
_ => {}
}

parent = Some(node);
for n in scratch.iter() {
stack.push((*n, cache_id, parent, previous_cache))
parent = Some(current_node);
for n in scratch.iter() {
stack.push((*n, cache_id, parent, previous_cache, caches_seen))
}
scratch.clear();
}
scratch.clear();
}

// second pass
// we create a subtree where we project the columns
// just before the cache. Then we do another projection pushdown
// and finally remove that last projection and stitch the subplan
// back to the cache node again
if !cache_schema_and_children.is_empty() {
let pd = projection_pushdown::ProjectionPushDown {};
for (_cache_id, (children, columns)) in cache_schema_and_children {
if !columns.is_empty() {
let projection = columns
.into_iter()
.map(|name| expr_arena.add(AExpr::Column(name)))
.collect::<Vec<_>>();

for child in children {
let child_lp = lp_arena.get(child).clone();
let new_child = lp_arena.add(child_lp);

let lp = ALogicalPlanBuilder::new(new_child, expr_arena, lp_arena)
.project(projection.clone())
.build();

let lp = pd.optimize(lp, lp_arena, expr_arena).unwrap();
// remove the projection added by the optimization
let lp = if let ALogicalPlan::Projection { input, .. }
| ALogicalPlan::LocalProjection { input, .. } = lp
{
lp_arena.take(input)
} else {
lp
};
lp_arena.replace(child, lp);
// second pass
// we create a subtree where we project the columns
// just before the cache. Then we do another projection pushdown
// and finally remove that last projection and stitch the subplan
// back to the cache node again
if !cache_schema_and_children.is_empty() {
let pd = projection_pushdown::ProjectionPushDown {};
for (_cache_id, (children, columns)) in cache_schema_and_children {
if !columns.is_empty() {
let projection = columns
.into_iter()
.map(|name| expr_arena.add(AExpr::Column(name)))
.collect::<Vec<_>>();

for child in children {
let child_lp = lp_arena.get(child).clone();
let new_child = lp_arena.add(child_lp);

let lp = ALogicalPlanBuilder::new(new_child, expr_arena, lp_arena)
.project(projection.clone())
.build();

let lp = pd.optimize(lp, lp_arena, expr_arena).unwrap();
// remove the projection added by the optimization
let lp = if let ALogicalPlan::Projection { input, .. }
| ALogicalPlan::LocalProjection { input, .. } = lp
{
lp_arena.take(input)
} else {
lp
};
lp_arena.replace(child, lp);
}
}
}
}

if loop_count >= max_cache_depth {
break;
}
loop_count += 1;
}
}
30 changes: 30 additions & 0 deletions polars/polars-lazy/src/tests/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2049,3 +2049,33 @@ fn test_partitioned_gb_ternary() -> PolarsResult<()> {

Ok(())
}

#[test]
#[cfg(feature = "cross_join")]
fn test_cse_columns_projections() -> PolarsResult<()> {
let right = df![
"A" => [1, 2],
"B" => [3, 4],
"D" => [5, 6]
]?
.lazy();

let left = df![
"C" => [3, 4],
]?
.lazy();

let left = left.cross_join(right.clone().select([col("A")]));
let q = left.join(
right.rename(["B"], ["C"]),
[col("A"), col("C")],
[col("A"), col("C")],
JoinType::Left,
);

let out = q.collect()?;

assert_eq!(out.get_column_names(), &["C", "A", "D"]);

Ok(())
}
15 changes: 15 additions & 0 deletions py-polars/tests/unit/test_cse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import polars as pl


def test_cse_rename_cross_join_5405() -> None:
right = pl.DataFrame({"A": [1, 2], "B": [3, 4], "D": [5, 6]}).lazy()

left = pl.DataFrame({"C": [3, 4]}).lazy().join(right.select("A"), how="cross")

out = left.join(right.rename({"B": "C"}), on=["A", "C"], how="left")

assert out.collect(common_subplan_elimination=True).to_dict(False) == {
"C": [3, 3, 4, 4],
"A": [1, 2, 1, 2],
"D": [5, None, None, 6],
}

0 comments on commit 386af6a

Please sign in to comment.