Skip to content

[WIP] Remove COMET_SHUFFLE_FALLBACK_TO_COLUMNAR config #1736

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
@@ -289,13 +289,6 @@ object CometConf extends ShimCometConf {
.checkValues(Set("native", "jvm", "auto"))
.createWithDefault("auto")

val COMET_SHUFFLE_FALLBACK_TO_COLUMNAR: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.fallbackToColumnar")
.doc("Whether to try falling back to columnar shuffle when native shuffle is not supported")
.internal()
.booleanConf
.createWithDefault(false)

val COMET_EXEC_BROADCAST_FORCE_ENABLED: ConfigEntry[Boolean] =
conf(s"$COMET_EXEC_CONFIG_PREFIX.broadcast.enabled")
.doc(
38 changes: 38 additions & 0 deletions dev/diffs/3.5.5.diff
Original file line number Diff line number Diff line change
@@ -332,6 +332,44 @@ index 7ee18df3756..64f01a68048 100644
withTable("tbl") {
sql(
"""
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 47a311c71d5..342e71cfdd4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -24,8 +24,9 @@ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression
import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
import org.apache.spark.sql.catalyst.plans.logical.{Window => LogicalWindow}
import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
+import org.apache.spark.sql.comet.execution.shuffle.CometShuffleExchangeExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec}
+import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, Exchange, ShuffleExchangeExec, ShuffleExchangeLike}
import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.expressions.{Aggregator, MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
@@ -1187,10 +1188,12 @@ class DataFrameWindowFunctionsSuite extends QueryTest
}

def isShuffleExecByRequirement(
- plan: ShuffleExchangeExec,
+ plan: ShuffleExchangeLike,
desiredClusterColumns: Seq[String]): Boolean = plan match {
case ShuffleExchangeExec(op: HashPartitioning, _, ENSURE_REQUIREMENTS, _) =>
partitionExpressionsColumns(op.expressions) === desiredClusterColumns
+ case CometShuffleExchangeExec(op: HashPartitioning, _, _, ENSURE_REQUIREMENTS, _, _) =>
+ partitionExpressionsColumns(op.expressions) === desiredClusterColumns
case _ => false
}

@@ -1213,7 +1216,7 @@ class DataFrameWindowFunctionsSuite extends QueryTest
val shuffleByRequirement = windowed.queryExecution.executedPlan.exists {
case w: WindowExec =>
w.child.exists {
- case s: ShuffleExchangeExec => isShuffleExecByRequirement(s, Seq("key1", "key2"))
+ case s: ShuffleExchangeLike => isShuffleExecByRequirement(s, Seq("key1", "key2"))
case _ => false
}
case _ => false
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index f32b32ffc5a..447d7c6416e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
13 changes: 2 additions & 11 deletions spark/src/main/scala/org/apache/comet/rules/CometExecRule.scala
Original file line number Diff line number Diff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.window.WindowExec
import org.apache.spark.sql.types.{DoubleType, FloatType}

import org.apache.comet.{CometConf, ExtendedExplainInfo}
import org.apache.comet.CometConf.{COMET_ANSI_MODE_ENABLED, COMET_SHUFFLE_FALLBACK_TO_COLUMNAR}
import org.apache.comet.CometConf.COMET_ANSI_MODE_ENABLED
import org.apache.comet.CometSparkSessionExtensions.{createMessage, getCometBroadcastNotEnabledReason, getCometShuffleNotEnabledReason, isANSIEnabled, isCometBroadCastForceEnabled, isCometExecEnabled, isCometJVMShuffleMode, isCometLoaded, isCometNativeShuffleMode, isCometScan, isCometShuffleEnabled, isSpark40Plus, shouldApplySparkToColumnar, withInfo}
import org.apache.comet.serde.OperatorOuterClass.Operator
import org.apache.comet.serde.QueryPlanSerde
@@ -491,16 +491,9 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
None
}

// this is a temporary workaround because some Spark SQL tests fail
// when we enable COMET_SHUFFLE_FALLBACK_TO_COLUMNAR due to valid bugs
// that we had not previously seen
val tryColumnarNext =
!nativePrecondition || (nativePrecondition && nativeShuffle.isEmpty &&
COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.get(conf))

val nativeOrColumnarShuffle = if (nativeShuffle.isDefined) {
nativeShuffle
} else if (tryColumnarNext) {
} else {
// Columnar shuffle for regular Spark operators (not Comet) and Comet operators
// (if configured).
// If the child of ShuffleExchangeExec is also a ShuffleExchangeExec, we should not
@@ -526,8 +519,6 @@ case class CometExecRule(session: SparkSession) extends Rule[SparkPlan] {
} else {
None
}
} else {
None
}

if (nativeOrColumnarShuffle.isDefined) {
Original file line number Diff line number Diff line change
@@ -78,7 +78,6 @@ abstract class CometTestBase
conf.set(CometConf.COMET_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_ENABLED.key, "true")
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_NATIVE_SCAN_ENABLED.key, "true")
conf.set(CometConf.COMET_SCAN_ALLOW_INCOMPATIBLE.key, "true")
Original file line number Diff line number Diff line change
@@ -272,7 +272,6 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> dppEnabled.toString,
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_SHUFFLE_FALLBACK_TO_COLUMNAR.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
Loading
Oops, something went wrong.