Skip to content

Commit

Permalink
fix partition boundary (#3223)
Browse files Browse the repository at this point in the history
  • Loading branch information
ritchie46 committed Apr 23, 2022
1 parent 9e4145f commit 58ae5de
Showing 1 changed file with 61 additions and 38 deletions.
99 changes: 61 additions & 38 deletions polars/polars-lazy/src/physical_plan/planner/lp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,51 +318,74 @@ impl DefaultPlanner {
partitionable = false;
}

for agg in &aggs {
// make sure that we don't have a binary expr in the expr tree
let matches = |e: &AExpr| {
matches!(
e,
AExpr::SortBy { .. }
| AExpr::Filter { .. }
| AExpr::BinaryExpr { .. }
| AExpr::Function { .. }
)
};
if aexpr_to_root_nodes(*agg, expr_arena).len() != 1
|| has_aexpr(*agg, expr_arena, matches)
{
partitionable = false;
break;
}

let agg = node_to_expr(*agg, expr_arena);
if partitionable {
for agg in &aggs {
let aexpr = expr_arena.get(*agg);
let depth = (&*expr_arena).iter(*agg).count();

#[cfg(feature = "object")]
{
let name = expr_to_root_column_name(&agg).unwrap();
let dtype = input_schema.get(&name).unwrap();
// has_aexpr(*agg)

if let DataType::Object(_) = dtype {
// col()
// lit() etc.
if depth == 1 {
partitionable = false;
break;
}
}

// check if the aggregation type is partitionable
match agg {
Expr::Agg(AggExpr::Min(_))
| Expr::Agg(AggExpr::Max(_))
| Expr::Agg(AggExpr::Sum(_))
| Expr::Agg(AggExpr::Mean(_))
// first need to implement this correctly
// | Expr::Agg(AggExpr::Count(_))
| Expr::Agg(AggExpr::Last(_))
| Expr::Agg(AggExpr::List(_))
| Expr::Agg(AggExpr::First(_)) => {}
_ => {
// it should end with an aggregation
if matches!(aexpr, AExpr::Alias(_, _)) {
// col().agg().alias() is allowed: count of 3
// col().alias() is not allowed: count of 4
if depth <= 2 {
partitionable = false;
break;
}
}

// check if the aggregation type is partitionable
// only simple aggregation like col().sum
// that can be divided in to the aggregation of their partitions are allowed
if !((&*expr_arena).iter(*agg).all(|(_, ae)| {
use AExpr::*;
match ae {
// only allowed expressions
Agg(agg_e) => {
matches!(
agg_e,
AAggExpr::Min(_)
| AAggExpr::Max(_)
| AAggExpr::Sum(_)
| AAggExpr::Mean(_)
| AAggExpr::Last(_)
| AAggExpr::First(_)
)
}
Not(_) | IsNotNull(_) | IsNull(_) | Column(_) | Alias(_, _) => {
true
}
_ => false,
}
}) &&
// we only allow expressions that end with an aggregation
matches!(aexpr, AExpr::Alias(_, _) | AExpr::Agg(_)))
{
partitionable = false;
break
break;
}

#[cfg(feature = "object")]
{
for name in aexpr_to_root_names(*agg, expr_arena) {
let dtype = input_schema.get(&name).unwrap();

if let DataType::Object(_) = dtype {
partitionable = false;
break;
}
}
if !partitionable {
break;
}
}
}
}
Expand Down

0 comments on commit 58ae5de

Please sign in to comment.