Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Only push predicates depending on the subset columns past unique() #14668

Merged
merged 5 commits into from
Feb 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion crates/polars-plan/src/logical_plan/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,11 @@ impl LogicalPlan {
input._format(f, sub_indent)
},
Distinct { input, options } => {
write!(f, "{:indent$}UNIQUE BY {:?}", "", options.subset)?;
write!(
f,
"{:indent$}UNIQUE[maintain_order: {:?}, keep_strategy: {:?}] BY {:?}",
"", options.maintain_order, options.keep_strategy, options.subset
)?;
input._format(f, sub_indent)
},
Slice { input, offset, len } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -404,24 +404,15 @@ impl<'a> PredicatePushDown<'a> {
input,
options
} => {

if matches!(options.keep_strategy, UniqueKeepStrategy::Any | UniqueKeepStrategy::None) {
// currently the distinct operation only keeps the first occurrences.
// this may have influence on the pushed down predicates. If the pushed down predicates
// contain a binary expression (thus depending on values in multiple columns)
// the final result may differ if it is pushed down.

let mut root_count = 0;

// if this condition is called more than once, its a binary or ternary operation.
let condition = |_| {
Copy link
Collaborator Author

@nameexhaustion nameexhaustion Feb 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I understand this wanted to block predicates that had more than 1 column input, but because mut root_count is not reset per predicate it would end up blocking all predicates after the first call.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeap. That had to reset.

if root_count == 0 {
root_count += 1;
false
} else {
true
}
if let Some(ref subset) = options.subset {
// Predicates on the subset can pass.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since unique() is equivalent to group_by().gather(index), so similar to group_by we can pass predicates on the subset (<=>group key) columns

let subset = subset.clone();
let mut names_set = PlHashSet::<&str>::with_capacity(subset.len());
for name in subset.iter() {
names_set.insert(name.as_str());
};

let condition = |name: Arc<str>| !names_set.contains(name.as_ref());
let local_predicates =
transfer_to_local_by_name(expr_arena, &mut acc_predicates, condition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ pub(super) fn predicate_is_sort_boundary(node: Node, expr_arena: &Arena<AExpr>)
has_aexpr(node, expr_arena, matches)
}

/// Transfer a predicate from `acc_predicates` that will be pushed down
/// to a local_predicates vec based on a condition.
/// Evaluates a condition on the column name inputs of every predicate, where if
/// the condition evaluates to true on any column name the predicate is
/// transferred to local.
pub(super) fn transfer_to_local_by_name<F>(
expr_arena: &Arena<AExpr>,
acc_predicates: &mut PlHashMap<Arc<str>, Node>,
Expand All @@ -132,7 +133,7 @@ where
for name in root_names {
if condition(name) {
remove_keys.push(key.clone());
continue;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would insert the same key multiple times

break;
}
}
}
Expand Down
8 changes: 7 additions & 1 deletion crates/polars-plan/src/logical_plan/tree_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,13 @@ impl<'a> TreeFmtNode<'a> {
.collect(),
),
NL(h, Distinct { input, options }) => ND(
wh(h, &format!("UNIQUE BY {:?}", options.subset)),
wh(
h,
&format!(
"UNIQUE[maintain_order: {:?}, keep_strategy: {:?}] BY {:?}",
options.maintain_order, options.keep_strategy, options.subset
),
),
vec![NL(None, input)],
),
NL(h, LogicalPlan::Slice { input, offset, len }) => ND(
Expand Down
14 changes: 14 additions & 0 deletions py-polars/tests/unit/operations/unique/test_unique.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,20 @@ def test_unique_predicate_pd() -> None:
expected = pl.DataFrame({"x": ["abc"], "y": ["xxx"], "z": [True]})
assert_frame_equal(result, expected)

# Issue #14595: filter should not naively be pushed past unique()
for maintain_order in (True, False):
for keep in ("first", "last", "any", "none"):
q = (
lf.unique("x", maintain_order=maintain_order, keep=keep) # type: ignore[arg-type]
.filter(pl.col("x") == "abc")
.filter(pl.col("z"))
)
plan = q.explain()
assert r'FILTER col("z")' in plan
# We can push filters if they only depend on the subset columns of unique()
assert r'SELECTION: "[(col(\"x\")) == (String(abc))]"' in plan
assert_frame_equal(q.collect(predicate_pushdown=False), q.collect())


def test_unique_on_list_df() -> None:
assert pl.DataFrame(
Expand Down
Loading