From 42c619a5ac7aa7f8ce86edda0730a2982b62dc03 Mon Sep 17 00:00:00 2001 From: liuxiao Date: Mon, 8 Apr 2024 19:52:27 +0800 Subject: [PATCH] [KYUUBI #6250] Drop support for Spark 3.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # :mag: Description ## Issue References ๐Ÿ”— This pull request fixes #6250 ## Describe Your Solution ๐Ÿ”ง Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change. ## Types of changes :bookmark: - [ ] Bugfix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [x] Breaking change (fix or feature that would cause existing functionality to change) ## Test Plan ๐Ÿงช Pass CI --- # Checklist ๐Ÿ“ - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html) **Be nice. Be informative.** Closes #6273 from liuxiaocs7/issue-6250. Closes #6250 c6ba1e88a [liuxiao] remove unused import db887ef9b [liuxiao] inline method 769da013b [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala 21dbd37a7 [liuxiao] remove unused import e869d571e [liuxiao] update for miss 7d755a879 [liuxiao] Drop support for Spark 3.1 Lead-authored-by: liuxiao Co-authored-by: Cheng Pan Signed-off-by: Cheng Pan --- .github/workflows/master.yml | 5 -- docs/deployment/migration-guide.md | 1 + docs/quick_start/quick_start.rst | 2 +- .../kyuubi/engine/spark/SparkSQLEngine.scala | 3 - .../spark/operation/ExecuteStatement.scala | 5 +- .../spark/operation/FetchOrcStatement.scala | 34 +------ .../arrow/KyuubiArrowConverters.scala | 7 +- .../spark/sql/kyuubi/SparkDatasetHelper.scala | 89 ++++--------------- .../engine/spark/WithSparkSQLEngine.scala | 3 +- .../SparkArrowbasedOperationSuite.scala | 42 ++------- 10 files changed, 33 insertions(+), 158 deletions(-) diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml index 1dfcc873e57..712bc1b90c4 100644 --- a/.github/workflows/master.yml +++ b/.github/workflows/master.yml @@ -56,11 +56,6 @@ jobs: exclude-tags: [""] comment: ["normal"] include: - - java: 8 - spark: '3.5' - spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6' - exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest' - comment: 'verify-on-spark-3.1-binary' - java: 8 spark: '3.5' spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6' diff --git a/docs/deployment/migration-guide.md b/docs/deployment/migration-guide.md index 4767f154de7..37e1c9a9d77 100644 --- a/docs/deployment/migration-guide.md +++ b/docs/deployment/migration-guide.md @@ -20,6 +20,7 @@ ## Upgrading from Kyuubi 1.9 to 1.10 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead. +* Since Kyuubi 1.10, the support of Spark engine for Spark 3.1 is removed. * Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed. ## Upgrading from Kyuubi 1.8 to 1.9 diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst index a77f7ee20a6..0b954daecac 100644 --- a/docs/quick_start/quick_start.rst +++ b/docs/quick_start/quick_start.rst @@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component. **Kyuubi** Gateway \ |release| \ - Kyuubi Server Engine lib - Kyuubi Engine Beeline - Kyuubi Beeline - **Spark** Engine 3.1 to 3.5 A Spark distribution + **Spark** Engine 3.2 to 3.5 A Spark distribution **Flink** Engine 1.17 to 1.19 A Flink distribution **Trino** Engine N/A A Trino cluster allows to access via trino-client v411 **Doris** Engine N/A A Doris cluster diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala index 6dd438ffdc9..d1331cd0284 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala @@ -381,9 +381,6 @@ object SparkSQLEngine extends Logging { } def main(args: Array[String]): Unit = { - if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.1") { - warn("The support for Spark 3.1 is deprecated, and will be removed in the next version.") - } val startedTime = System.currentTimeMillis() val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match { case Some(t) => t.toLong diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala index bf68f18f064..a52d32be9cb 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala @@ -185,14 +185,13 @@ class ExecuteStatement( // Rename all col name to avoid duplicate columns val colName = range(0, result.schema.size).map(x => "col" + x) - val codec = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") "zstd" else "zlib" // df.write will introduce an extra shuffle for the outermost limit, and hurt performance if (resultMaxRows > 0) { result.toDF(colName: _*).limit(resultMaxRows).write - .option("compression", codec).format("orc").save(saveFileName.get) + .option("compression", "zstd").format("orc").save(saveFileName.get) } else { result.toDF(colName: _*).write - .option("compression", codec).format("orc").save(saveFileName.get) + .option("compression", "zstd").format("orc").save(saveFileName.get) } info(s"Save result to ${saveFileName.get}") fetchOrcStatement = Some(new FetchOrcStatement(spark)) diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala index 861539b95b2..64a9855f955 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala @@ -35,10 +35,7 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer import org.apache.spark.sql.types.StructType -import org.apache.kyuubi.KyuubiException -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator} -import org.apache.kyuubi.util.reflect.DynConstructors class FetchOrcStatement(spark: SparkSession) { @@ -62,7 +59,7 @@ class FetchOrcStatement(spark: SparkSession) { val fullSchema = orcSchema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema) - val deserializer = getOrcDeserializer(orcSchema, colId) + val deserializer = new OrcDeserializer(orcSchema, colId) orcIter = new OrcFileIterator(list) val iterRow = orcIter.map(value => unsafeProjection(deserializer.deserialize(value))) @@ -73,35 +70,6 @@ class FetchOrcStatement(spark: SparkSession) { def close(): Unit = { orcIter.close() } - - private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = { - try { - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") { - // SPARK-34535 changed the constructor signature of OrcDeserializer - DynConstructors.builder() - .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]]) - .build[OrcDeserializer]() - .newInstance( - orcSchema, - colId) - } else { - DynConstructors.builder() - .impl( - classOf[OrcDeserializer], - classOf[StructType], - classOf[StructType], - classOf[Array[Int]]) - .build[OrcDeserializer]() - .newInstance( - new StructType, - orcSchema, - colId) - } - } catch { - case e: Throwable => - throw new KyuubiException("Failed to create OrcDeserializer", e) - } - } } class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] { diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala index 4a54180cc67..04f4ede6cff 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala @@ -30,7 +30,6 @@ import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer} import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema} import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper} import org.apache.spark.sql.catalyst.expressions.UnsafeRow import org.apache.spark.sql.execution.CollectLimitExec @@ -158,9 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { val partsToScan = partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts)) - // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we - // drop Spark 3.1 support. - val sc = SparkSession.active.sparkContext + val sc = collectLimitExec.session.sparkContext val res = sc.runJob( childRDD, (it: Iterator[InternalRow]) => { @@ -347,6 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging { largeVarTypes) } - // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.1/3.2 + // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.2 final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption() } diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala index 7af51abfe59..2dbfe7348a3 100644 --- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala +++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala @@ -23,11 +23,10 @@ import org.apache.spark.SparkContext import org.apache.spark.internal.Logging import org.apache.spark.network.util.{ByteUnit, JavaUtils} import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils -import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} @@ -38,7 +37,6 @@ import org.apache.kyuubi.engine.spark.KyuubiSparkUtil import org.apache.kyuubi.engine.spark.schema.RowSet import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded import org.apache.kyuubi.util.reflect.DynMethods -import org.apache.kyuubi.util.reflect.ReflectUtils._ object SparkDatasetHelper extends Logging { @@ -48,7 +46,7 @@ object SparkDatasetHelper extends Logging { def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = { case adaptiveSparkPlan: AdaptiveSparkPlanExec => - executeArrowBatchCollect(finalPhysicalPlan(adaptiveSparkPlan)) + executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan) // TODO: avoid extra shuffle if `offset` > 0 case collectLimit: CollectLimitExec if offset(collectLimit) > 0 => logWarning("unsupported offset > 0, an extra shuffle will be introduced.") @@ -57,9 +55,8 @@ object SparkDatasetHelper extends Logging { doCollectLimit(collectLimit) case collectLimit: CollectLimitExec if collectLimit.limit < 0 => executeArrowBatchCollect(collectLimit.child) - // TODO: replace with pattern match once we drop Spark 3.1 support. - case command: SparkPlan if isCommandResultExec(command) => - doCommandResultExec(command) + case commandResult: CommandResultExec => + doCommandResultExec(commandResult) case localTableScan: LocalTableScanExec => doLocalTableScan(localTableScan) case plan: SparkPlan => @@ -76,10 +73,8 @@ object SparkDatasetHelper extends Logging { */ def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = { val schemaCaptured = plan.schema - // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we - // drop Spark 3.1 support. - val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch - val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone + val maxRecordsPerBatch = plan.session.sessionState.conf.arrowMaxRecordsPerBatch + val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone // note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input // arguments are serialized and sent to the executor side for execution. val maxBatchSizePerBatch = maxBatchSize @@ -160,10 +155,8 @@ object SparkDatasetHelper extends Logging { } private def doCollectLimit(collectLimit: CollectLimitExec): Array[Array[Byte]] = { - // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we - // drop Spark 3.1 support. - val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone - val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch + val timeZoneId = collectLimit.session.sessionState.conf.sessionLocalTimeZone + val maxRecordsPerBatch = collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch val batches = KyuubiArrowConverters.takeAsArrowBatches( collectLimit, @@ -191,19 +184,13 @@ object SparkDatasetHelper extends Logging { result.toArray } - private lazy val commandResultExecRowsMethod = DynMethods.builder("rows") - .impl("org.apache.spark.sql.execution.CommandResultExec") - .build() - - private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = { - val spark = SparkSession.active - // TODO: replace with `command.rows` once we drop Spark 3.1 support. - val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command) - command.longMetric("numOutputRows").add(rows.size) - sendDriverMetrics(spark.sparkContext, command.metrics) + private def doCommandResultExec(commandResult: CommandResultExec): Array[Array[Byte]] = { + val spark = commandResult.session + commandResult.longMetric("numOutputRows").add(commandResult.rows.size) + sendDriverMetrics(spark.sparkContext, commandResult.metrics) KyuubiArrowConverters.toBatchIterator( - rows.iterator, - command.schema, + commandResult.rows.iterator, + commandResult.schema, spark.sessionState.conf.arrowMaxRecordsPerBatch, maxBatchSize, -1, @@ -211,7 +198,7 @@ object SparkDatasetHelper extends Logging { } private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = { - val spark = SparkSession.active + val spark = localTableScan.session localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size) sendDriverMetrics(spark.sparkContext, localTableScan.metrics) KyuubiArrowConverters.toBatchIterator( @@ -224,31 +211,7 @@ object SparkDatasetHelper extends Logging { } /** - * This method provides a reflection-based implementation of - * [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to the Spark runtime - * without patching SPARK-41914. - * - * TODO: Once we drop support for Spark 3.1.x, we can directly call - * [[AdaptiveSparkPlanExec.finalPhysicalPlan]]. - */ - def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec): SparkPlan = { - withFinalPlanUpdate(adaptiveSparkPlanExec, identity) - } - - /** - * A reflection-based implementation of [[AdaptiveSparkPlanExec.withFinalPlanUpdate]]. - */ - private def withFinalPlanUpdate[T]( - adaptiveSparkPlanExec: AdaptiveSparkPlanExec, - fun: SparkPlan => T): T = { - val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec, "getFinalPhysicalPlan") - val result = fun(plan) - invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate") - result - } - - /** - * offset support was add since Spark-3.4(set SPARK-28330), to ensure backward compatibility with + * offset support was add in SPARK-28330(3.4.0), to ensure backward compatibility with * earlier versions of Spark, this function uses reflective calls to the "offset". */ private def offset(collectLimitExec: CollectLimitExec): Int = { @@ -261,24 +224,6 @@ object SparkDatasetHelper extends Logging { .getOrElse(0) } - private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = { - // scalastyle:off line.size.limit - // the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the - // physical plan of runnable command is CommandResultExec. - // for instance: - // ``` - // scala> spark.sql("show tables").queryExecution.executedPlan - // res0: org.apache.spark.sql.execution.SparkPlan = - // CommandResult , [namespace#0, tableName#1, isTemporary#2] - // +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default] - // - // scala > spark.sql("show tables").queryExecution.executedPlan.getClass - // res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec - // ``` - // scalastyle:on line.size.limit - sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec" - } - /** * refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based * operation, so that we can track the arrow-based queries on the UI tab. diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala index 3b98c2efb16..e6b140704bf 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala @@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession import org.apache.kyuubi.{KyuubiFunSuite, Utils} import org.apache.kyuubi.config.KyuubiConf -import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION trait WithSparkSQLEngine extends KyuubiFunSuite { protected var spark: SparkSession = _ @@ -35,7 +34,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite { // Affected by such configuration' default value // engine.initialize.sql='SHOW DATABASES' // SPARK-35378 - protected lazy val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") 1 else 0 + protected val initJobId: Int = 1 override def beforeAll(): Unit = { startSparkEngine() diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala index 4e041482400..ba245f50a32 100644 --- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala +++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation import java.lang.{Boolean => JBoolean} import java.sql.Statement -import java.util.{Locale, Set => JSet} +import java.util.Locale import org.apache.spark.{KyuubiSparkContextHelper, TaskContext} import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart} @@ -43,7 +43,6 @@ import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine} import org.apache.kyuubi.engine.spark.session.SparkSessionImpl import org.apache.kyuubi.operation.SparkDataTypeTests import org.apache.kyuubi.util.reflect.{DynFields, DynMethods} -import org.apache.kyuubi.util.reflect.ReflectUtils._ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests with SparkMetricsTestUtils { @@ -188,12 +187,9 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp returnSize.foreach { size => val df = spark.sql(s"select * from t_1 limit $size") val headPlan = df.queryExecution.executedPlan.collectLeaves().head - if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") { - assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec]) - val finalPhysicalPlan = - SparkDatasetHelper.finalPhysicalPlan(headPlan.asInstanceOf[AdaptiveSparkPlanExec]) - assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec]) - } + assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec]) + val finalPhysicalPlan = headPlan.asInstanceOf[AdaptiveSparkPlanExec].finalPhysicalPlan + assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec]) if (size > 1000) { runAndCheck(df.queryExecution.executedPlan, 1000) } else { @@ -298,11 +294,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp val listener = new JobCountListener val l2 = new SQLMetricsListener val nodeName = spark.sql("SHOW TABLES").queryExecution.executedPlan.getClass.getName - if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") { - assert(nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec") - } else { - assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec") - } + assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec") withJdbcStatement("table_1") { statement => statement.executeQuery("CREATE TABLE table_1 (id bigint) USING parquet") withSparkListener(listener) { @@ -314,15 +306,8 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp } } - if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") { - // Note that before Spark 3.2, a LocalTableScan SparkPlan will be submitted, and the issue of - // preventing LocalTableScan from triggering a job submission was addressed in [KYUUBI #4710]. - assert(l2.queryExecution.executedPlan.getClass.getName == - "org.apache.spark.sql.execution.LocalTableScanExec") - } else { - assert(l2.queryExecution.executedPlan.getClass.getName == - "org.apache.spark.sql.execution.CommandResultExec") - } + assert(l2.queryExecution.executedPlan.getClass.getName == + "org.apache.spark.sql.execution.CommandResultExec") assert(listener.numJobs == 0) } @@ -378,7 +363,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp test("post CommandResultExec driver-side metrics") { spark.sql("show tables").show(truncate = false) - assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.2") val expectedMetrics = Map( 0L -> (("CommandResult", Map("number of output rows" -> "2")))) withTables("table_1", "table_2") { @@ -493,7 +477,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp } } (keys, values).zipped.foreach { (k, v) => - if (isStaticConfigKey(k)) { + if (SQLConf.isStaticConfigKey(k)) { throw new KyuubiException(s"Cannot modify the value of a static config: $k") } conf.setConfString(k, v) @@ -521,16 +505,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp } } - /** - * This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to - * adapt Spark 3.1 - * - * TODO: Once we drop support for Spark 3.1, we can directly call - * [[SQLConf.isStaticConfigKey()]]. - */ - private def isStaticConfigKey(key: String): Boolean = - getField[JSet[String]]((SQLConf.getClass, SQLConf), "staticConfKeys").contains(key) - // the signature of function [[ArrowConverters.fromBatchIterator]] is changed in SPARK-43528 // (since Spark 3.5) private lazy val fromBatchIteratorMethod = DynMethods.builder("fromBatchIterator")