Skip to content

Commit

Permalink
Drop support for Spark 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
liuxiaocs7 committed Apr 7, 2024
1 parent 4fcc5c7 commit 7d755a8
Show file tree
Hide file tree
Showing 7 changed files with 16 additions and 90 deletions.
5 changes: 0 additions & 5 deletions .github/workflows/master.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ jobs:
exclude-tags: [""]
comment: ["normal"]
include:
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
comment: 'verify-on-spark-3.1-binary'
- java: 8
spark: '3.5'
spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
Expand Down
1 change: 1 addition & 0 deletions docs/deployment/migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
## Upgrading from Kyuubi 1.9 to 1.10

* Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
* Since Kyuubi 1.10, the support of Spark engine for Spark 3.1 is removed.
* Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.

## Upgrading from Kyuubi 1.8 to 1.9
Expand Down
2 changes: 1 addition & 1 deletion docs/quick_start/quick_start.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
**Kyuubi** Gateway \ |release| \ - Kyuubi Server
Engine lib - Kyuubi Engine
Beeline - Kyuubi Beeline
**Spark** Engine 3.1 to 3.5 A Spark distribution
**Spark** Engine 3.2 to 3.5 A Spark distribution
**Flink** Engine 1.17 to 1.19 A Flink distribution
**Trino** Engine N/A A Trino cluster allows to access via trino-client v411
**Doris** Engine N/A A Doris cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,9 +381,6 @@ object SparkSQLEngine extends Logging {
}

def main(args: Array[String]): Unit = {
if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.1") {
warn("The support for Spark 3.1 is deprecated, and will be removed in the next version.")
}
val startedTime = System.currentTimeMillis()
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
case Some(t) => t.toLong
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
import org.apache.spark.TaskContext
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.CollectLimitExec
Expand Down Expand Up @@ -158,9 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
val partsToScan =
partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))

// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val sc = SparkSession.active.sparkContext
val sc = collectLimitExec.session.sparkContext
val res = sc.runJob(
childRDD,
(it: Iterator[InternalRow]) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@ import org.apache.spark.internal.Logging
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
Expand All @@ -38,7 +37,6 @@ import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
import org.apache.kyuubi.engine.spark.schema.RowSet
import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
import org.apache.kyuubi.util.reflect.DynMethods
import org.apache.kyuubi.util.reflect.ReflectUtils._

object SparkDatasetHelper extends Logging {

Expand All @@ -48,7 +46,7 @@ object SparkDatasetHelper extends Logging {

def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
executeArrowBatchCollect(finalPhysicalPlan(adaptiveSparkPlan))
executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
// TODO: avoid extra shuffle if `offset` > 0
case collectLimit: CollectLimitExec if offset(collectLimit) > 0 =>
logWarning("unsupported offset > 0, an extra shuffle will be introduced.")
Expand All @@ -57,8 +55,7 @@ object SparkDatasetHelper extends Logging {
doCollectLimit(collectLimit)
case collectLimit: CollectLimitExec if collectLimit.limit < 0 =>
executeArrowBatchCollect(collectLimit.child)
// TODO: replace with pattern match once we drop Spark 3.1 support.
case command: SparkPlan if isCommandResultExec(command) =>
case command: CommandResultExec =>
doCommandResultExec(command)
case localTableScan: LocalTableScanExec =>
doLocalTableScan(localTableScan)
Expand All @@ -76,10 +73,8 @@ object SparkDatasetHelper extends Logging {
*/
def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
val schemaCaptured = plan.schema
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = plan.session.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone
// note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
// arguments are serialized and sent to the executor side for execution.
val maxBatchSizePerBatch = maxBatchSize
Expand Down Expand Up @@ -160,10 +155,8 @@ object SparkDatasetHelper extends Logging {
}

private def doCollectLimit(collectLimit: CollectLimitExec): Array[Array[Byte]] = {
// TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
// drop Spark 3.1 support.
val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
val timeZoneId = collectLimit.session.sessionState.conf.sessionLocalTimeZone
val maxRecordsPerBatch = collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch

val batches = KyuubiArrowConverters.takeAsArrowBatches(
collectLimit,
Expand Down Expand Up @@ -197,8 +190,7 @@ object SparkDatasetHelper extends Logging {

private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
val spark = SparkSession.active
// TODO: replace with `command.rows` once we drop Spark 3.1 support.
val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
val rows = command.asInstanceOf[CommandResultExec].rows
command.longMetric("numOutputRows").add(rows.size)
sendDriverMetrics(spark.sparkContext, command.metrics)
KyuubiArrowConverters.toBatchIterator(
Expand All @@ -223,30 +215,6 @@ object SparkDatasetHelper extends Logging {
spark.sessionState.conf.sessionLocalTimeZone).toArray
}

/**
* This method provides a reflection-based implementation of
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to the Spark runtime
* without patching SPARK-41914.
*
* TODO: Once we drop support for Spark 3.1.x, we can directly call
* [[AdaptiveSparkPlanExec.finalPhysicalPlan]].
*/
def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec): SparkPlan = {
withFinalPlanUpdate(adaptiveSparkPlanExec, identity)
}

/**
* A reflection-based implementation of [[AdaptiveSparkPlanExec.withFinalPlanUpdate]].
*/
private def withFinalPlanUpdate[T](
adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
fun: SparkPlan => T): T = {
val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec, "getFinalPhysicalPlan")
val result = fun(plan)
invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate")
result
}

/**
* offset support was add since Spark-3.4(set SPARK-28330), to ensure backward compatibility with
* earlier versions of Spark, this function uses reflective calls to the "offset".
Expand All @@ -261,24 +229,6 @@ object SparkDatasetHelper extends Logging {
.getOrElse(0)
}

private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = {
// scalastyle:off line.size.limit
// the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the
// physical plan of runnable command is CommandResultExec.
// for instance:
// ```
// scala> spark.sql("show tables").queryExecution.executedPlan
// res0: org.apache.spark.sql.execution.SparkPlan =
// CommandResult <empty>, [namespace#0, tableName#1, isTemporary#2]
// +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default]
//
// scala > spark.sql("show tables").queryExecution.executedPlan.getClass
// res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec
// ```
// scalastyle:on line.size.limit
sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
}

/**
* refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
* operation, so that we can track the arrow-based queries on the UI tab.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation

import java.lang.{Boolean => JBoolean}
import java.sql.Statement
import java.util.{Locale, Set => JSet}
import java.util.Locale

import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
Expand All @@ -43,7 +43,6 @@ import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine}
import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
import org.apache.kyuubi.operation.SparkDataTypeTests
import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils._

class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests
with SparkMetricsTestUtils {
Expand Down Expand Up @@ -188,12 +187,9 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
returnSize.foreach { size =>
val df = spark.sql(s"select * from t_1 limit $size")
val headPlan = df.queryExecution.executedPlan.collectLeaves().head
if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
val finalPhysicalPlan =
SparkDatasetHelper.finalPhysicalPlan(headPlan.asInstanceOf[AdaptiveSparkPlanExec])
assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
}
assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
val finalPhysicalPlan = headPlan.asInstanceOf[AdaptiveSparkPlanExec].finalPhysicalPlan
assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
if (size > 1000) {
runAndCheck(df.queryExecution.executedPlan, 1000)
} else {
Expand Down Expand Up @@ -493,7 +489,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}
}
(keys, values).zipped.foreach { (k, v) =>
if (isStaticConfigKey(k)) {
if (SQLConf.isStaticConfigKey(k)) {
throw new KyuubiException(s"Cannot modify the value of a static config: $k")
}
conf.setConfString(k, v)
Expand Down Expand Up @@ -521,16 +517,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
}
}

/**
* This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to
* adapt Spark 3.1
*
* TODO: Once we drop support for Spark 3.1, we can directly call
* [[SQLConf.isStaticConfigKey()]].
*/
private def isStaticConfigKey(key: String): Boolean =
getField[JSet[String]]((SQLConf.getClass, SQLConf), "staticConfKeys").contains(key)

// the signature of function [[ArrowConverters.fromBatchIterator]] is changed in SPARK-43528
// (since Spark 3.5)
private lazy val fromBatchIteratorMethod = DynMethods.builder("fromBatchIterator")
Expand Down

0 comments on commit 7d755a8

Please sign in to comment.