Open
Description
Describe the bug
The following test (can be copied into CometExpressionSuite
) fails.
The bug appears to be related to the handling of NaN
values.
test("SPARK-32038: NormalizeFloatingNumbers should work on distinct aggregate") {
withTempView("view") {
val nan1 = java.lang.Float.intBitsToFloat(0x7f800001)
val nan2 = java.lang.Float.intBitsToFloat(0x7fffffff)
Seq(
("mithunr", Float.NaN),
("mithunr", nan1),
("mithunr", nan2),
("abellina", 1.0f),
("abellina", 2.0f)).toDF("uid", "score").createOrReplaceTempView("view")
// the query uses RangePartitioning, which is not supported in native shuffle
withSQLConf(CometConf.COMET_SHUFFLE_MODE.key -> "jvm") {
val df = spark.sql("select uid, count(distinct score) from view group by 1 order by 1 asc")
checkSparkAnswer(df)
}
}
}
All of the values for mithunr
are NaN
, so the distinct count should be 1 not 3.
!== Correct Answer - 2 == == Spark Answer - 2 ==
struct<uid:string,count(DISTINCT score):bigint> struct<uid:string,count(DISTINCT score):bigint>
[abellina,2] [abellina,2]
![mithunr,1] [mithunr,3]
Spark plan:
+- == Initial Plan ==
Sort [uid#7 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, [plan_id=31]
+- HashAggregate(keys=[uid#7], functions=[count(distinct score#15)], output=[uid#7, count(DISTINCT score)#12L])
+- Exchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, [plan_id=28]
+- HashAggregate(keys=[uid#7], functions=[partial_count(distinct score#15)], output=[uid#7, count#18L])
+- HashAggregate(keys=[uid#7, score#15], functions=[], output=[uid#7, score#15])
+- Exchange hashpartitioning(uid#7, score#15, 10), ENSURE_REQUIREMENTS, [plan_id=24]
+- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#15], functions=[], output=[uid#7, score#15])
+- LocalTableScan [uid#7, score#8]
Comet plan:
+- == Initial Plan ==
CometSort [uid#7, count(DISTINCT score)#12L], [uid#7 ASC NULLS FIRST]
+- CometColumnarExchange rangepartitioning(uid#7 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=188]
+- HashAggregate(keys=[uid#7], functions=[count(distinct score#32)], output=[uid#7, count(DISTINCT score)#12L])
+- CometColumnarExchange hashpartitioning(uid#7, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=186]
+- HashAggregate(keys=[uid#7], functions=[partial_count(distinct score#32)], output=[uid#7, count#35L])
+- CometHashAggregate [uid#7, score#32], [uid#7, score#32]
+- CometColumnarExchange hashpartitioning(uid#7, score#32, 10), ENSURE_REQUIREMENTS, CometColumnarShuffle, [plan_id=163]
+- HashAggregate(keys=[uid#7, knownfloatingpointnormalized(normalizenanandzero(score#8)) AS score#32], functions=[], output=[uid#7, score#32])
+- LocalTableScan [uid#7, score#8]
Steps to reproduce
No response
Expected behavior
No response
Additional context
No response