Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32753][SQL] Only copy tags to node with no tags #732

Merged
merged 4 commits into from
Feb 19, 2021
Merged

Conversation

rshkv
Copy link

@rshkv rshkv commented Feb 17, 2021

What changes were proposed in this pull request?

This is a cherry-pick of SPARK-32753 / apache#29593.

It fixes a correctness bug that causes duplicate rows or empty rows (as we've seen internally). It reproduces on apache/spark 3.0.1 when aggregating on a column, then repartitioning on the same. E.g. the query below. More context on the internal ticket. (There is no upstream release of this yet. The fix merged into branch-3.0 on Sep 8. Release 3.0.1 was cut Aug 28.).

scala> df.show()
+----+-----+                                                                    
| tag| data|
+----+-----+
|tag1|data1|
|tag2|data2|
+----+-----+

scala> val repartitioned = df
    .groupBy("tag")
    .agg(collect_set("data"))
    .repartition(col("tag"))

The written data is:

scala> repartitioned.write.json("json_out")

scala> spark.read.json("json_out").show()
+-----------------+----+
|collect_set(data)| tag|
+-----------------+----+
|               []|tag2|
|               []|tag1|
+-----------------+----+

Reason for the bug is here.

How was this patch tested?

Upstream introduced a unit test against the shape of the adaptive plan. I also tested my repro against this which otherwise causes empty rows (upstream only reported duplicate rows - seems we're lucky in that the fix covers us as well).

Only copy tags to node with no tags when transforming plans.

cloud-fan [made a good point](apache#29593 (comment)) that it doesn't make sense to append tags to existing nodes when nodes are removed. That will cause such bugs as duplicate rows when deduplicating and repartitioning by the same column with AQE.

```
spark.range(10).union(spark.range(10)).createOrReplaceTempView("v1")
val df = spark.sql("select id from v1 group by id distribute by id")
println(df.collect().toArray.mkString(","))
println(df.queryExecution.executedPlan)

// With AQE
[4],[0],[3],[2],[1],[7],[6],[8],[5],[9],[4],[0],[3],[2],[1],[7],[6],[8],[5],[9]
AdaptiveSparkPlan(isFinalPlan=true)
+- CustomShuffleReader local
   +- ShuffleQueryStage 0
      +- Exchange hashpartitioning(id#183L, 10), true
         +- *(3) HashAggregate(keys=[id#183L], functions=[], output=[id#183L])
            +- Union
               :- *(1) Range (0, 10, step=1, splits=2)
               +- *(2) Range (0, 10, step=1, splits=2)

// Without AQE
[4],[7],[0],[6],[8],[3],[2],[5],[1],[9]
*(4) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
+- Exchange hashpartitioning(id#206L, 10), true
   +- *(3) HashAggregate(keys=[id#206L], functions=[], output=[id#206L])
      +- Union
         :- *(1) Range (0, 10, step=1, splits=2)
         +- *(2) Range (0, 10, step=1, splits=2)
```

It's too expensive to detect node removal so we make a compromise only to copy tags to node with no tags.

Yes. Fix a bug.

Add test.

Closes apache#29593 from manuzhang/spark-32753.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
### What changes were proposed in this pull request?
Fix indentation and clean up view in the test added by apache#29593.

### Why are the changes needed?
Address review comments in apache#29665.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Updated test.

Closes apache#29682 from manuzhang/spark-32753-followup.

Authored-by: manuzhang <owenzhang1990@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
@@ -314,7 +319,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product {
}
}
// If the transform function replaces this node with a new one, carry over the tags.
newNode.tags ++= this.tags
newNode.copyTagsFrom(this)
Copy link
Author

@rshkv rshkv Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slight difference from upstream PR: I had to introduce copyTagsFrom here and above. We'd bypass it for node transformations otherwise, which makes this PR useless because we still copy tags when removing nodes.

Upstream didn't need that change because branch-3.0 already use copyTagsFrom courtesy of apache@e04f696.


// Extra test for palantir/spark because the upstream one added in SPARK-32753 didn't cover it
// TODO(rshkv): Remove after we rebase on 3.0.2
test("SPARK-32753 (palantir/spark): Don't fail on aggregate collect") {
Copy link
Author

@rshkv rshkv Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The repro still failed after the cherry-pick because we didn't have 8efee47 and bypassed copyTagsFrom. So I just shoved the repro in here.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not exactly the missing rows, but it was the other repro I had on the internal ticket.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone reading who hasn't followed the entire thread, the idea here is that df.show() actually throws a TreeNodeException without this fix.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, thank you for clarifying

Copy link

@jdcasale jdcasale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as I understand, when a node is removed by AQE and we take the tags on that node and propagate them to a replacement node that has its own tags, an old tag (that was meant to be removed) can smash over a new tag (that we need to keep for correct behavior) and we get this inconsistent state. By only copying tags over when there are no tags on the new node, we ensure that we don't override anything important. I don't fully understand conceptually why copying tags is useful when there are no tags on the target node, (and why it's ok to drop that information when the target node has it's own tags) but upstream seems to be satisfied with this fix. (and it fixes the problem we're observing)

My guess is that either we could turn off tag propagation altogether and would get correct behavior, or that the fact that this fix works is an implicit externality of the current structure of the execution paths that make use of this code, and that this implied contract could break at any time. I don't really think we have much of a choice but to take this fix and hope that nothing breaks down the road.

Explanation in upstream PFR: apache#29593 (comment)


// Extra test for palantir/spark because the upstream one added in SPARK-32753 didn't cover it
// TODO(rshkv): Remove after we rebase on 3.0.2
test("SPARK-32753 (palantir/spark): Don't fail on aggregate collect") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For anyone reading who hasn't followed the entire thread, the idea here is that df.show() actually throws a TreeNodeException without this fix.

@rshkv
Copy link
Author

rshkv commented Feb 19, 2021

Yeah, that's how I understand as well. The TreeNodes are replaced when transformed by rules, either by a copy (with rules applied to children) or by a different node. Sometimes that replacement is effectively a removal where you return the child node.

The tag copying only happens from this node that the rule is being applied to the replacement node. So previously, as you say, if you had a child with tags, you copied your own tags over the child's. I suppose AQE is one of these removal situations.

From looking at the values in the debugger, the tags seem to contain logical plan information. So a removed node would pass on its logical plan information, incl. attribute references, to one that shouldn't have it.

@rshkv rshkv merged commit bcb5098 into master Feb 19, 2021
@rshkv rshkv deleted the wr/spark-32753 branch February 19, 2021 19:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants