From 96e06f5e31550a91ef47a7ca66a09ef1509f6f9a Mon Sep 17 00:00:00 2001 From: ringtail Date: Tue, 2 Jun 2020 17:21:58 +0800 Subject: [PATCH] add .scalafmt.conf and refactor code --- .scalafmt.conf | 29 +++ .../spark/deploy/k8s/KubernetesConf.scala | 184 +++++++----------- .../spark/deploy/k8s/KubernetesUtils.scala | 32 ++- .../k8s/features/BasicDriverFeatureStep.scala | 40 ++-- .../features/BasicExecutorFeatureStep.scala | 51 +++-- .../cluster/k8s/ExecutorPodsAllocator.scala | 80 +++----- .../KubernetesClusterSchedulerBackend.scala | 29 +-- .../BasicDriverFeatureStepSuite.scala | 69 +++---- .../BasicExecutorFeatureStepSuite.scala | 69 ++----- ...ubernetesCredentialsFeatureStepSuite.scala | 98 +++------- .../DriverServiceFeatureStepSuite.scala | 87 +++------ .../features/EnvSecretsFeatureStepSuite.scala | 10 +- .../KubernetesFeaturesTestUtils.scala | 21 +- .../features/LocalDirsFeatureStepSuite.scala | 29 ++- .../MountSecretsFeatureStepSuite.scala | 16 +- .../MountVolumesFeatureStepSuite.scala | 71 ++----- .../bindings/JavaDriverFeatureStepSuite.scala | 23 +-- .../PythonDriverFeatureStepSuite.scala | 29 +-- .../bindings/RDriverFeatureStepSuite.scala | 19 +- .../submit/KubernetesDriverBuilderSuite.scala | 112 ++++------- .../k8s/KubernetesExecutorBuilderSuite.scala | 69 +++---- 21 files changed, 447 insertions(+), 720 deletions(-) create mode 100644 .scalafmt.conf diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000000000..8ffdf3c58c199 --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +align = none +align.openParenDefnSite = false +align.openParenCallSite = false +align.tokens = [] +optIn = { + configStyleArguments = false +} +danglingParentheses = false +docstrings = JavaDoc +maxColumn = 98 +version = 2.5.3 + diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala index be0e1b14b6877..ecd5b880b4dc8 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesConf.scala @@ -16,60 +16,59 @@ */ package org.apache.spark.deploy.k8s +import scala.collection.mutable + import io.fabric8.kubernetes.api.model.{ LocalObjectReference, LocalObjectReferenceBuilder, Pod, Toleration } + import org.apache.spark.SparkConf import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.deploy.k8s.submit._ +import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._ import org.apache.spark.internal.config.ConfigEntry -import scala.collection.mutable - private[spark] sealed trait KubernetesRoleSpecificConf /* * Structure containing metadata for Kubernetes logic that builds a Spark driver. */ private[spark] case class KubernetesDriverSpecificConf( - mainAppResource: Option[MainAppResource], - mainClass: String, - appName: String, - appArgs: Seq[String] -) extends KubernetesRoleSpecificConf + mainAppResource: Option[MainAppResource], + mainClass: String, + appName: String, + appArgs: Seq[String]) + extends KubernetesRoleSpecificConf /* * Structure containing metadata for Kubernetes logic that builds a Spark executor. */ -private[spark] case class KubernetesExecutorSpecificConf(executorId: String, - driverPod: Option[Pod]) +private[spark] case class KubernetesExecutorSpecificConf( + executorId: String, + driverPod: Option[Pod]) extends KubernetesRoleSpecificConf /** - * Structure containing metadata for Kubernetes logic to build Spark pods. - */ + * Structure containing metadata for Kubernetes logic to build Spark pods. + */ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( - sparkConf: SparkConf, - roleSpecificConf: T, - appResourceNamePrefix: String, - appId: String, - roleLabels: Map[String, String], - roleAnnotations: Map[String, String], - roleSecretNamesToMountPaths: Map[String, String], - roleSecretEnvNamesToKeyRefs: Map[String, String], - roleEnvs: Map[String, String], - roleVolumes: Iterable[ - KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf] - ], - driverTolerations: Seq[Toleration], - executorTolerations: Seq[Toleration], - sparkFiles: Seq[String] -) { + sparkConf: SparkConf, + roleSpecificConf: T, + appResourceNamePrefix: String, + appId: String, + roleLabels: Map[String, String], + roleAnnotations: Map[String, String], + roleSecretNamesToMountPaths: Map[String, String], + roleSecretEnvNamesToKeyRefs: Map[String, String], + roleEnvs: Map[String, String], + roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]], + driverTolerations: Seq[Toleration], + executorTolerations: Seq[Toleration], + sparkFiles: Seq[String]) { def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE) @@ -109,10 +108,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( } def nodeSelector(): Map[String, String] = - KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_NODE_SELECTOR_PREFIX - ) + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX) def get[T](config: ConfigEntry[T]): T = sparkConf.get(config) @@ -125,16 +121,16 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf]( } private[spark] object KubernetesConf { + def createDriverConf( - sparkConf: SparkConf, - appName: String, - appResourceNamePrefix: String, - appId: String, - mainAppResource: Option[MainAppResource], - mainClass: String, - appArgs: Array[String], - maybePyFiles: Option[String] - ): KubernetesConf[KubernetesDriverSpecificConf] = { + sparkConf: SparkConf, + appName: String, + appResourceNamePrefix: String, + appId: String, + mainAppResource: Option[MainAppResource], + mainClass: String, + appArgs: Array[String], + maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = { val sparkConfWithMainAppJar = sparkConf.clone() val additionalFiles = mutable.ArrayBuffer.empty[String] mainAppResource.foreach { @@ -155,10 +151,7 @@ private[spark] object KubernetesConf { maybePyFiles.foreach { maybePyFiles => additionalFiles.appendAll(maybePyFiles.split(",")) } - sparkConfWithMainAppJar.set( - KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, - res - ) + sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res) case RMainAppResource(res) => additionalFiles += res sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res) @@ -166,41 +159,31 @@ private[spark] object KubernetesConf { sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4) } - val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_DRIVER_LABEL_PREFIX - ) + val driverCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX) require( !driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " + s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations." - ) + "operations.") require( !driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " + s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " + - "operations." - ) + "operations.") val driverLabels = driverCustomLabels ++ Map( SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE - ) - val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_DRIVER_ANNOTATION_PREFIX - ) + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) + val driverAnnotations = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX) val driverSecretNamesToMountPaths = KubernetesUtils .parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX) val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, - KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX - ) - val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_DRIVER_ENV_PREFIX - ) + KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX) + val driverEnvs = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX) val driverVolumes = KubernetesVolumeUtils .parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX) .map(_.get) @@ -211,10 +194,8 @@ private[spark] object KubernetesConf { .map(_.get) // parse driver tolerations from sparkConf - val driverTolerationsMap = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_DRIVER_TOLERATION_PREFIX - ) + val driverTolerationsMap = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_TOLERATION_PREFIX) val driverTolerations = KubernetesUtils.parseTolerations(driverTolerationsMap) @@ -226,12 +207,7 @@ private[spark] object KubernetesConf { KubernetesConf( sparkConfWithMainAppJar, - KubernetesDriverSpecificConf( - mainAppResource, - mainClass, - appName, - appArgs - ), + KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs), appResourceNamePrefix, appId, driverLabels, @@ -242,51 +218,38 @@ private[spark] object KubernetesConf { driverVolumes, driverTolerations, Seq.empty[Toleration], - sparkFiles - ) + sparkFiles) } def createExecutorConf( - sparkConf: SparkConf, - executorId: String, - appId: String, - driverPod: Option[Pod] - ): KubernetesConf[KubernetesExecutorSpecificConf] = { - val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_EXECUTOR_LABEL_PREFIX - ) + sparkConf: SparkConf, + executorId: String, + appId: String, + driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = { + val executorCustomLabels = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX) require( !executorCustomLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark." - ) + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") require( !executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + - " Spark." - ) + " Spark.") require( !executorCustomLabels.contains(SPARK_ROLE_LABEL), - s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark." - ) + s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.") val executorLabels = Map( SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> appId, - SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE - ) ++ + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ executorCustomLabels - val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_EXECUTOR_ANNOTATION_PREFIX - ) - val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_EXECUTOR_SECRETS_PREFIX - ) + val executorAnnotations = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX) + val executorMountSecrets = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX) val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs( sparkConf, - KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX - ) + KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX) val executorEnv = sparkConf.getExecutorEnv.toMap val executorVolumes = KubernetesVolumeUtils .parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX) @@ -296,18 +259,16 @@ private[spark] object KubernetesConf { // (not the one used by cluster mode inside the container) val appResourceNamePrefix = { if (sparkConf - .getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key) - .isEmpty) { + .getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key) + .isEmpty) { getResourceNamePrefix(getAppName(sparkConf)) } else { sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) } } - val executorTolerationsMap = KubernetesUtils.parsePrefixedKeyValuePairs( - sparkConf, - KUBERNETES_EXECUTOR_TOLERATION_PREFIX - ) + val executorTolerationsMap = + KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_TOLERATION_PREFIX) val executorTolerations = KubernetesUtils.parseTolerations(executorTolerationsMap) @@ -325,7 +286,6 @@ private[spark] object KubernetesConf { executorVolumes, Seq.empty[Toleration], executorTolerations, - Seq.empty[String] - ) + Seq.empty[String]) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index cd36d8e30f1bf..d648829ef0b59 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -17,37 +17,35 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.Toleration + import org.apache.spark.SparkConf import org.apache.spark.util.Utils private[spark] object KubernetesUtils { /** - * Extract and parse Spark configuration properties with a given name prefix and - * return the result as a Map. Keys must not have more than one value. - * - * @param sparkConf Spark configuration - * @param prefix the given property name prefix - * @return a Map storing the configuration property keys and values - */ - def parsePrefixedKeyValuePairs(sparkConf: SparkConf, - prefix: String): Map[String, String] = { + * Extract and parse Spark configuration properties with a given name prefix and + * return the result as a Map. Keys must not have more than one value. + * + * @param sparkConf Spark configuration + * @param prefix the given property name prefix + * @return a Map storing the configuration property keys and values + */ + def parsePrefixedKeyValuePairs(sparkConf: SparkConf, prefix: String): Map[String, String] = { sparkConf.getAllWithPrefix(prefix).toMap } - def requireNandDefined(opt1: Option[_], - opt2: Option[_], - errMessage: String): Unit = { + def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } } /** - * For the given collection of file URIs, resolves them as follows: - * - File URIs with scheme local:// resolve to just the path of the URI. - * - Otherwise, the URIs are returned as-is. - */ + * For the given collection of file URIs, resolves them as follows: + * - File URIs with scheme local:// resolve to just the path of the URI. + * - Otherwise, the URIs are returned as-is. + */ def resolveFileUrisAndPath(fileUris: Iterable[String]): Iterable[String] = { fileUris.map { uri => resolveFileUri(uri) @@ -59,7 +57,7 @@ private[spark] object KubernetesUtils { val fileScheme = Option(fileUri.getScheme).getOrElse("file") fileScheme match { case "local" => fileUri.getPath - case _ => uri + case _ => uri } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala index d878e9bc0f607..18d904b94834a 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala @@ -16,20 +16,20 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ +import scala.collection.mutable + import io.fabric8.kubernetes.api.model._ + import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s._ import org.apache.spark.internal.config._ import org.apache.spark.ui.SparkUI -import scala.collection.JavaConverters._ -import scala.collection.mutable - -private[spark] class BasicDriverFeatureStep( - conf: KubernetesConf[KubernetesDriverSpecificConf] - ) extends KubernetesFeatureConfigStep { +private[spark] class BasicDriverFeatureStep(conf: KubernetesConf[KubernetesDriverSpecificConf]) + extends KubernetesFeatureConfigStep { private val driverPodName = conf .get(KUBERNETES_DRIVER_POD_NAME) @@ -37,27 +37,24 @@ private[spark] class BasicDriverFeatureStep( private val driverContainerImage = conf .get(DRIVER_CONTAINER_IMAGE) - .getOrElse( - throw new SparkException("Must specify the driver container image") - ) + .getOrElse(throw new SparkException("Must specify the driver container image")) // CPU settings private val driverCpuCores = conf.get("spark.driver.cores", "1") private val driverLimitCores = conf.get(KUBERNETES_DRIVER_LIMIT_CORES) // node name of driver - private val driverNodeName = conf.get(KUBERNETES_DRIVER_NODE_NAME).getOrElse("") + private val driverNodeName = + conf.get(KUBERNETES_DRIVER_NODE_NAME).getOrElse("") // Memory settings private val driverMemoryMiB = conf.get(DRIVER_MEMORY) + private val memoryOverheadMiB = conf .get(DRIVER_MEMORY_OVERHEAD) .getOrElse( - math.max( - (conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB - ) - ) + math + .max((conf.get(MEMORY_OVERHEAD_FACTOR) * driverMemoryMiB).toInt, MEMORY_OVERHEAD_MIN_MIB)) private val driverMemoryWithOverheadMiB = driverMemoryMiB + memoryOverheadMiB override def configurePod(pod: SparkPod): SparkPod = { @@ -106,11 +103,9 @@ private[spark] class BasicDriverFeatureStep( .addAllToEnv(driverCustomEnvs.asJava) .addNewEnv() .withName(ENV_DRIVER_BIND_ADDRESS) - .withValueFrom( - new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build() - ) + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) .endEnv() .withNewResources() .addToRequests("cpu", driverCpuQuantity) @@ -143,8 +138,7 @@ private[spark] class BasicDriverFeatureStep( KUBERNETES_DRIVER_POD_NAME.key -> driverPodName, "spark.app.id" -> conf.appId, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> conf.appResourceNamePrefix, - KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true" - ) + KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true") val resolvedSparkJars = KubernetesUtils.resolveFileUrisAndPath(conf.sparkJars()) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala index 622183582bfe6..7645fd103d988 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStep.scala @@ -16,29 +16,30 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model._ + import org.apache.spark.SparkException +import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s._ import org.apache.spark.internal.config._ import org.apache.spark.rpc.RpcEndpointAddress import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils -import scala.collection.JavaConverters._ - private[spark] class BasicExecutorFeatureStep( - kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf] - ) extends KubernetesFeatureConfigStep { + kubernetesConf: KubernetesConf[KubernetesExecutorSpecificConf]) + extends KubernetesFeatureConfigStep { // Consider moving some of these fields to KubernetesConf or KubernetesExecutorSpecificConf private val executorExtraClasspath = kubernetesConf.get(EXECUTOR_CLASS_PATH) + private val executorContainerImage = kubernetesConf .get(EXECUTOR_CONTAINER_IMAGE) - .getOrElse( - throw new SparkException("Must specify the executor container image") - ) + .getOrElse(throw new SparkException("Must specify the executor container image")) + private val blockManagerPort = kubernetesConf.sparkConf .getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT) @@ -47,9 +48,9 @@ private[spark] class BasicExecutorFeatureStep( private val driverUrl = RpcEndpointAddress( kubernetesConf.get("spark.driver.host"), kubernetesConf.sparkConf.getInt("spark.driver.port", DEFAULT_DRIVER_PORT), - CoarseGrainedSchedulerBackend.ENDPOINT_NAME - ).toString + CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString private val executorMemoryMiB = kubernetesConf.get(EXECUTOR_MEMORY) + private val executorMemoryString = kubernetesConf.get(EXECUTOR_MEMORY.key, EXECUTOR_MEMORY.defaultValueString) @@ -58,10 +59,9 @@ private[spark] class BasicExecutorFeatureStep( .getOrElse( math.max( (kubernetesConf.get(MEMORY_OVERHEAD_FACTOR) * executorMemoryMiB).toInt, - MEMORY_OVERHEAD_MIN_MIB - ) - ) + MEMORY_OVERHEAD_MIN_MIB)) private val executorMemoryWithOverhead = executorMemoryMiB + memoryOverheadMiB + private val executorMemoryTotal = kubernetesConf.sparkConf .getOption(APP_RESOURCE_TYPE.key) .map { res => @@ -79,12 +79,14 @@ private[spark] class BasicExecutorFeatureStep( private val executorCores = kubernetesConf.sparkConf.getInt("spark.executor.cores", 1) + private val executorCoresRequest = if (kubernetesConf.sparkConf.contains(KUBERNETES_EXECUTOR_REQUEST_CORES)) { kubernetesConf.get(KUBERNETES_EXECUTOR_REQUEST_CORES).get } else { executorCores.toString } + private val executorLimitCores = kubernetesConf.get(KUBERNETES_EXECUTOR_LIMIT_CORES) @@ -116,8 +118,7 @@ private[spark] class BasicExecutorFeatureStep( val subsOpts = Utils.substituteAppNExecIds( opts, kubernetesConf.appId, - kubernetesConf.roleSpecificConf.executorId - ) + kubernetesConf.roleSpecificConf.executorId) val delimitedOpts = Utils.splitCommandString(subsOpts) delimitedOpts.zipWithIndex.map { case (opt, index) => @@ -135,25 +136,20 @@ private[spark] class BasicExecutorFeatureStep( (ENV_APPLICATION_ID, kubernetesConf.appId), // This is to set the SPARK_CONF_DIR to be /opt/spark/conf (ENV_SPARK_CONF_DIR, SPARK_CONF_DIR_INTERNAL), - (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId) - ) ++ + (ENV_EXECUTOR_ID, kubernetesConf.roleSpecificConf.executorId)) ++ kubernetesConf.roleEnvs) .map( env => new EnvVarBuilder() .withName(env._1) .withValue(env._2) - .build() - ) ++ Seq( + .build()) ++ Seq( new EnvVarBuilder() .withName(ENV_EXECUTOR_POD_IP) - .withValueFrom( - new EnvVarSourceBuilder() - .withNewFieldRef("v1", "status.podIP") - .build() - ) - .build() - ) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq + .withValueFrom(new EnvVarSourceBuilder() + .withNewFieldRef("v1", "status.podIP") + .build()) + .build()) ++ executorExtraJavaOptionsEnv ++ executorExtraClasspathEnv.toSeq val requiredPorts = Seq((BLOCK_MANAGER_PORT_NAME, blockManagerPort)) .map { case (name, port) => @@ -197,8 +193,7 @@ private[spark] class BasicExecutorFeatureStep( .withKind(pod.getKind) .withName(pod.getMetadata.getName) .withUid(pod.getMetadata.getUid) - .build() - ) + .build()) val executorPod = new PodBuilder(pod.pod) .editOrNewMetadata() .withName(name) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 51fe426963869..35fba4b992f35 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -18,24 +18,25 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} +import scala.collection.mutable + import io.fabric8.kubernetes.api.model.PodBuilder import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.KubernetesConf import org.apache.spark.internal.Logging import org.apache.spark.util.{Clock, Utils} -import org.apache.spark.{SparkConf, SparkException} - -import scala.collection.mutable private[spark] class ExecutorPodsAllocator( - conf: SparkConf, - executorBuilder: KubernetesExecutorBuilder, - kubernetesClient: KubernetesClient, - snapshotsStore: ExecutorPodsSnapshotsStore, - clock: Clock - ) extends Logging { + conf: SparkConf, + executorBuilder: KubernetesExecutorBuilder, + kubernetesClient: KubernetesClient, + snapshotsStore: ExecutorPodsSnapshotsStore, + clock: Clock) + extends Logging { private val EXECUTOR_ID_COUNTER = new AtomicLong(0L) @@ -61,14 +62,9 @@ private[spark] class ExecutorPodsAllocator( kubernetesClient .pods() .withName(name) - .get() - ).getOrElse( - throw new SparkException( - s"No pod was found named $kubernetesDriverPodName in the cluster in the " + - s"namespace $namespace (this was supposed to be the driver pod.)." - ) - ) - ) + .get()).getOrElse(throw new SparkException( + s"No pod was found named $kubernetesDriverPodName in the cluster in the " + + s"namespace $namespace (this was supposed to be the driver pod.)."))) // Executor IDs that have been requested from Kubernetes but have not been detected in any // snapshot yet. Mapped to the timestamp when they were created. @@ -86,8 +82,9 @@ private[spark] class ExecutorPodsAllocator( def setMinRegisteredExecutorsRatio(ratio: Double): Unit = minRegisteredExecutorsRatio = ratio - private def onNewSnapshots(applicationId: String, - snapshots: Seq[ExecutorPodsSnapshot]): Unit = { + private def onNewSnapshots( + applicationId: String, + snapshots: Seq[ExecutorPodsSnapshot]): Unit = { newlyCreatedExecutors --= snapshots.flatMap(_.executorPods.keys) // For all executors we've created against the API but have not seen in a snapshot // yet - check the current time. If the current time has exceeded some threshold, @@ -103,8 +100,7 @@ private[spark] class ExecutorPodsAllocator( s"Executor with id $execId was not detected in the Kubernetes" + s" cluster after $podCreationTimeout milliseconds despite the fact that a" + " previous allocation attempt tried to create it. The executor may have been" + - " deleted but the application missed the deletion event." - ) + " deleted but the application missed the deletion event.") Utils.tryLogNonFatalError { kubernetesClient .pods() @@ -117,8 +113,7 @@ private[spark] class ExecutorPodsAllocator( } else { logDebug( s"Executor with id $execId was not found in the Kubernetes cluster since it" + - s" was created ${currentTime - timeCreated} milliseconds ago." - ) + s" was created ${currentTime - timeCreated} milliseconds ago.") } } @@ -144,8 +139,7 @@ private[spark] class ExecutorPodsAllocator( logDebug( s"Currently have $currentRunningExecutors running executors and" + s" $currentPendingExecutors pending executors. $newlyCreatedExecutors executors" + - s" have been requested but are pending appearance in the cluster." - ) + s" have been requested but are pending appearance in the cluster.") // if (newlyCreatedExecutors.isEmpty // && currentPendingExecutors == 0 @@ -153,21 +147,16 @@ private[spark] class ExecutorPodsAllocator( if (newlyCreatedExecutors.isEmpty && currentRunningExecutors >= currentRequestExecutors * minRegisteredExecutorsRatio && currentRequestExecutors < totalExpectedExecutors.get()) { - val numExecutorsToAllocate = math.min( - currentTotalExpectedExecutors - currentRequestExecutors, - podAllocationSize - ) - logInfo( - s"Going to request $numExecutorsToAllocate executors from Kubernetes." - ) + val numExecutorsToAllocate = + math.min(currentTotalExpectedExecutors - currentRequestExecutors, podAllocationSize) + logInfo(s"Going to request $numExecutorsToAllocate executors from Kubernetes.") for (_ <- 0 until numExecutorsToAllocate) { val newExecutorId = EXECUTOR_ID_COUNTER.incrementAndGet() val executorConf = KubernetesConf.createExecutorConf( conf, newExecutorId.toString, applicationId, - driverPod - ) + driverPod) val executorPod = executorBuilder.buildFromFeatures(executorConf) val podWithAttachedContainer = new PodBuilder(executorPod.pod) .editOrNewSpec() @@ -177,27 +166,20 @@ private[spark] class ExecutorPodsAllocator( kubernetesClient.pods().create(podWithAttachedContainer) newlyCreatedExecutors(newExecutorId) = clock.getTimeMillis() - logInfo( - s"Requested executor async with id $newExecutorId from Kubernetes." - ) - logDebug( - s"Requested executor with id $newExecutorId from Kubernetes." - ) + logInfo(s"Requested executor async with id $newExecutorId from Kubernetes.") + logDebug(s"Requested executor with id $newExecutorId from Kubernetes.") } } else if (currentRunningExecutors >= currentTotalExpectedExecutors) { // TODO handle edge cases if we end up with more running executors than expected. logDebug( "Current number of running executors is equal to the number of requested" + - " executors. Not scaling up further." - ) + " executors. Not scaling up further.") } else if (newlyCreatedExecutors.nonEmpty || currentPendingExecutors != 0) { - logDebug( - s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + - s" executors to begin running before requesting for more executors. # of executors in" + - s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" + - s" created but we have not observed as being present in the cluster yet:" + - s" ${newlyCreatedExecutors.size}." - ) + logDebug(s"Still waiting for ${newlyCreatedExecutors.size + currentPendingExecutors}" + + s" executors to begin running before requesting for more executors. # of executors in" + + s" pending status in the cluster: $currentPendingExecutors. # of executors that we have" + + s" created but we have not observed as being present in the cluster yet:" + + s" ${newlyCreatedExecutors.size}.") } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index d58b561945b8b..fce20f535af03 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -18,27 +18,28 @@ package org.apache.spark.scheduler.cluster.k8s import java.util.concurrent.ExecutorService +import scala.concurrent.{ExecutionContext, Future} + import io.fabric8.kubernetes.client.KubernetesClient + import org.apache.spark.deploy.k8s.Config.KUBERNETES_ALLOCATION_BATCH_SIZE import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.{RpcAddress, RpcEnv} -import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl} +import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, SchedulerBackendUtils} import org.apache.spark.util.{ThreadUtils, Utils} -import scala.concurrent.{ExecutionContext, Future} - private[spark] class KubernetesClusterSchedulerBackend( - scheduler: TaskSchedulerImpl, - rpcEnv: RpcEnv, - kubernetesClient: KubernetesClient, - requestExecutorsService: ExecutorService, - snapshotsStore: ExecutorPodsSnapshotsStore, - podAllocator: ExecutorPodsAllocator, - lifecycleEventHandler: ExecutorPodsLifecycleManager, - watchEvents: ExecutorPodsWatchSnapshotSource, - pollEvents: ExecutorPodsPollingSnapshotSource) - extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { + scheduler: TaskSchedulerImpl, + rpcEnv: RpcEnv, + kubernetesClient: KubernetesClient, + requestExecutorsService: ExecutorService, + snapshotsStore: ExecutorPodsSnapshotsStore, + podAllocator: ExecutorPodsAllocator, + lifecycleEventHandler: ExecutorPodsLifecycleManager, + watchEvents: ExecutorPodsWatchSnapshotSource, + pollEvents: ExecutorPodsPollingSnapshotSource) + extends CoarseGrainedSchedulerBackend(scheduler, rpcEnv) { private val podAllocationSize = conf.get(KUBERNETES_ALLOCATION_BATCH_SIZE) @@ -144,7 +145,7 @@ private[spark] class KubernetesClusterSchedulerBackend( } private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + extends DriverEndpoint(rpcEnv, sparkProperties) { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala index 7bc8eb320d906..5bed12adc22c9 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStepSuite.scala @@ -16,27 +16,21 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.{ ContainerPort, ContainerPortBuilder, LocalObjectReferenceBuilder, Toleration } + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ -import org.apache.spark.deploy.k8s.submit.{ - JavaMainAppResource, - PythonMainAppResource -} -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} +import org.apache.spark.deploy.k8s.submit.{JavaMainAppResource, PythonMainAppResource} import org.apache.spark.ui.SparkUI -import org.apache.spark.{SparkConf, SparkFunSuite} - -import scala.collection.JavaConverters._ class BasicDriverFeatureStepSuite extends SparkFunSuite { @@ -50,20 +44,19 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"") private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" - private val DRIVER_ANNOTATIONS = Map( - CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE - ) + private val DRIVER_ANNOTATIONS = Map(CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE) private val DRIVER_CUSTOM_ENV1 = "customDriverEnv1" private val DRIVER_CUSTOM_ENV2 = "customDriverEnv2" - private val DRIVER_ENVS = Map( - DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1, - DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2 - ) + + private val DRIVER_ENVS = + Map(DRIVER_CUSTOM_ENV1 -> DRIVER_CUSTOM_ENV1, DRIVER_CUSTOM_ENV2 -> DRIVER_CUSTOM_ENV2) private val TEST_IMAGE_PULL_SECRETS = Seq("my-secret-1", "my-secret-2") + private val TEST_IMAGE_PULL_SECRET_OBJECTS = TEST_IMAGE_PULL_SECRETS.map { secret => new LocalObjectReferenceBuilder().withName(secret).build() } + private val emptyDriverSpecificConf = KubernetesDriverSpecificConf(None, APP_NAME, MAIN_CLASS, APP_ARGS) @@ -89,8 +82,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val featureStep = new BasicDriverFeatureStep(kubernetesConf) val basePod = SparkPod.initialPod() @@ -98,15 +90,12 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert(configuredPod.container.getName === DRIVER_CONTAINER_NAME) assert(configuredPod.container.getImage === "spark-driver:latest") - assert( - configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY - ) + assert(configuredPod.container.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY) val expectedPortNames = Set( containerPort(DRIVER_PORT_NAME, DEFAULT_DRIVER_PORT), containerPort(BLOCK_MANAGER_PORT_NAME, DEFAULT_BLOCKMANAGER_PORT), - containerPort(UI_PORT_NAME, SparkUI.DEFAULT_PORT) - ) + containerPort(UI_PORT_NAME, SparkUI.DEFAULT_PORT)) val foundPortNames = configuredPod.container.getPorts.asScala.toSet assert(expectedPortNames === foundPortNames) @@ -119,17 +108,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { assert( configuredPod.pod.getSpec().getImagePullSecrets.asScala === - TEST_IMAGE_PULL_SECRET_OBJECTS - ) + TEST_IMAGE_PULL_SECRET_OBJECTS) assert( configuredPod.container.getEnv.asScala.exists( envVar => envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) && envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") && - envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP") - ) - ) + envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP"))) val resourceRequirements = configuredPod.container.getResources val requests = resourceRequirements.getRequests.asScala @@ -148,8 +134,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod", "spark.app.id" -> APP_ID, KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, - "spark.kubernetes.submitInDriver" -> "true" - ) + "spark.kubernetes.submitInDriver" -> "true") assert(featureStep.getAdditionalPodSystemProperties() === expectedSparkConf) } @@ -166,8 +151,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Some(JavaMainAppResource("")), APP_NAME, PY_MAIN_CLASS, - APP_ARGS - ), + APP_ARGS), RESOURCE_NAME_PREFIX, APP_ID, DRIVER_LABELS, @@ -178,16 +162,14 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val pythonKubernetesConf = KubernetesConf( pythonSparkConf, KubernetesDriverSpecificConf( Some(PythonMainAppResource("")), APP_NAME, PY_MAIN_CLASS, - APP_ARGS - ), + APP_ARGS), RESOURCE_NAME_PREFIX, APP_ID, DRIVER_LABELS, @@ -198,8 +180,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val javaFeatureStep = new BasicDriverFeatureStep(javaKubernetesConf) val pythonFeatureStep = new BasicDriverFeatureStep(pythonKubernetesConf) val basePod = SparkPod.initialPod() @@ -230,8 +211,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - allFiles - ) + allFiles) val step = new BasicDriverFeatureStep(kubernetesConf) val additionalProperties = step.getAdditionalPodSystemProperties() @@ -241,8 +221,7 @@ class BasicDriverFeatureStepSuite extends SparkFunSuite { KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX, "spark.kubernetes.submitInDriver" -> "true", "spark.jars" -> "/opt/spark/jar1.jar,hdfs:///opt/spark/jar2.jar", - "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt" - ) + "spark.files" -> "https://localhost:9000/file1.txt,/opt/spark/file2.txt") assert(additionalProperties === expectedSparkConf) } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala index e56e13a1f9280..9217beb776b30 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/BasicExecutorFeatureStepSuite.scala @@ -23,11 +23,7 @@ import org.mockito.MockitoAnnotations import org.scalatest.{BeforeAndAfter, BeforeAndAfterEach} import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesExecutorSpecificConf, - SparkPod -} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesExecutorSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.rpc.RpcEndpointAddress @@ -41,11 +37,11 @@ class BasicExecutorFeatureStepSuite private val APP_ID = "app-id" private val DRIVER_HOSTNAME = "localhost" private val DRIVER_PORT = 7098 + private val DRIVER_ADDRESS = RpcEndpointAddress( DRIVER_HOSTNAME, DRIVER_PORT.toInt, - CoarseGrainedSchedulerBackend.ENDPOINT_NAME - ) + CoarseGrainedSchedulerBackend.ENDPOINT_NAME) private val DRIVER_POD_NAME = "driver-pod" private val DRIVER_POD_UID = "driver-uid" @@ -54,10 +50,12 @@ class BasicExecutorFeatureStepSuite private val LABELS = Map("label1key" -> "label1value") private val ANNOTATIONS = Map("annotation1key" -> "annotation1value") private val TEST_IMAGE_PULL_SECRETS = Seq("my-1secret-1", "my-secret-2") + private val TEST_IMAGE_PULL_SECRET_OBJECTS = TEST_IMAGE_PULL_SECRETS.map { secret => new LocalObjectReferenceBuilder().withName(secret).build() } + private val DRIVER_POD = new PodBuilder() .withNewMetadata() .withName(DRIVER_POD_NAME) @@ -100,17 +98,13 @@ class BasicExecutorFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) val executor = step.configurePod(SparkPod.initialPod()) // The executor pod name and default labels. assert(executor.pod.getMetadata.getName === s"$RESOURCE_NAME_PREFIX-exec-1") assert(executor.pod.getMetadata.getLabels.asScala === LABELS) - assert( - executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS - ) + assert(executor.pod.getSpec.getImagePullSecrets.asScala === TEST_IMAGE_PULL_SECRET_OBJECTS) // There is exactly 1 container with no volume mounts and default memory limits. // Default memory limit is 1024M + 384M (minimum overhead constant). @@ -120,8 +114,7 @@ class BasicExecutorFeatureStepSuite assert( executor.container.getResources.getLimits .get("memory") - .getAmount === "1408Mi" - ) + .getAmount === "1408Mi") // The pod has no node selector, volumes. assert(executor.pod.getSpec.getNodeSelector.isEmpty) @@ -150,22 +143,17 @@ class BasicExecutorFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) assert( step .configurePod(SparkPod.initialPod()) .pod .getSpec .getHostname - .length === 63 - ) + .length === 63) } - test( - "classpath and extra java options get translated into environment variables" - ) { + test("classpath and extra java options get translated into environment variables") { val conf = baseConf.clone() conf.set(org.apache.spark.internal.config.EXECUTOR_JAVA_OPTIONS, "foo=bar") conf.set(org.apache.spark.internal.config.EXECUTOR_CLASS_PATH, "bar=baz") @@ -184,19 +172,12 @@ class BasicExecutorFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) val executor = step.configurePod(SparkPod.initialPod()) checkEnv( executor, - Map( - "SPARK_JAVA_OPT_0" -> "foo=bar", - ENV_CLASSPATH -> "bar=baz", - "qux" -> "quux" - ) - ) + Map("SPARK_JAVA_OPT_0" -> "foo=bar", ENV_CLASSPATH -> "bar=baz", "qux" -> "quux")) checkOwnerReferences(executor.pod, DRIVER_POD_UID) } @@ -219,33 +200,24 @@ class BasicExecutorFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) val executor = step.configurePod(SparkPod.initialPod()) // This is checking that basic executor + executorMemory = 1408 + 42 = 1450 assert( executor.container.getResources.getRequests .get("memory") - .getAmount === "1450Mi" - ) + .getAmount === "1450Mi") } // There is always exactly one controller reference, and it points to the driver pod. - private def checkOwnerReferences(executor: Pod, - driverPodUid: String): Unit = { + private def checkOwnerReferences(executor: Pod, driverPodUid: String): Unit = { assert(executor.getMetadata.getOwnerReferences.size() === 1) - assert( - executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid - ) - assert( - executor.getMetadata.getOwnerReferences.get(0).getController === true - ) + assert(executor.getMetadata.getOwnerReferences.get(0).getUid === driverPodUid) + assert(executor.getMetadata.getOwnerReferences.get(0).getController === true) } // Check that the expected environment variables are present. - private def checkEnv(executorPod: SparkPod, - additionalEnvVars: Map[String, String]): Unit = { + private def checkEnv(executorPod: SparkPod, additionalEnvVars: Map[String, String]): Unit = { val defaultEnvs = Map( ENV_EXECUTOR_ID -> "1", ENV_DRIVER_URL -> DRIVER_ADDRESS.toString, @@ -253,8 +225,7 @@ class BasicExecutorFeatureStepSuite ENV_EXECUTOR_MEMORY -> "1g", ENV_APPLICATION_ID -> APP_ID, ENV_SPARK_CONF_DIR -> SPARK_CONF_DIR_INTERNAL, - ENV_EXECUTOR_POD_IP -> null - ) ++ additionalEnvVars + ENV_EXECUTOR_POD_IP -> null) ++ additionalEnvVars assert(executorPod.container.getEnv.size() === defaultEnvs.size) val mapEnvs = executorPod.container.getEnv.asScala.map { x => diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala index f313d2454d8f2..0ced1bf6574f1 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverKubernetesCredentialsFeatureStepSuite.scala @@ -18,32 +18,21 @@ package org.apache.spark.deploy.k8s.features import java.io.File +import scala.collection.JavaConverters._ + import com.google.common.base.Charsets import com.google.common.io.{BaseEncoding, Files} -import io.fabric8.kubernetes.api.model.{ - ContainerBuilder, - HasMetadata, - PodBuilder, - Secret, - Toleration -} +import io.fabric8.kubernetes.api.model.{Secret, Toleration} import org.mockito.{Mock, MockitoAnnotations} import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.util.Utils -class DriverKubernetesCredentialsFeatureStepSuite - extends SparkFunSuite - with BeforeAndAfter { +class DriverKubernetesCredentialsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val KUBERNETES_RESOURCE_NAME_PREFIX = "spark" private val APP_ID = "k8s-app" @@ -76,14 +65,12 @@ class DriverKubernetesCredentialsFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert( kubernetesCredentialsStep - .configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD - ) + .configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalPodSystemProperties().isEmpty) assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty) } @@ -92,20 +79,16 @@ class DriverKubernetesCredentialsFeatureStepSuite val submissionSparkConf = new SparkConf(false) .set( s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$OAUTH_TOKEN_FILE_CONF_SUFFIX", - "/mnt/secrets/my-token.txt" - ) + "/mnt/secrets/my-token.txt") .set( s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", - "/mnt/secrets/my-key.pem" - ) + "/mnt/secrets/my-key.pem") .set( s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", - "/mnt/secrets/my-cert.pem" - ) + "/mnt/secrets/my-cert.pem") .set( s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", - "/mnt/secrets/my-ca.pem" - ) + "/mnt/secrets/my-ca.pem") val kubernetesConf = KubernetesConf( submissionSparkConf, driverSpecificConf, @@ -119,15 +102,13 @@ class DriverKubernetesCredentialsFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) assert( kubernetesCredentialsStep - .configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD - ) + .configurePod(BASE_DRIVER_POD) === BASE_DRIVER_POD) assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().isEmpty) val resolvedProperties = kubernetesCredentialsStep.getAdditionalPodSystemProperties() @@ -142,22 +123,16 @@ class DriverKubernetesCredentialsFeatureStepSuite val clientKeyFile = writeCredentials("key.pem", "key") val clientCertFile = writeCredentials("cert.pem", "cert") val submissionSparkConf = new SparkConf(false) - .set( - s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX", - "token" - ) + .set(s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$OAUTH_TOKEN_CONF_SUFFIX", "token") .set( s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_KEY_FILE_CONF_SUFFIX", - clientKeyFile.getAbsolutePath - ) + clientKeyFile.getAbsolutePath) .set( s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX", - clientCertFile.getAbsolutePath - ) + clientCertFile.getAbsolutePath) .set( s"$KUBERNETES_AUTH_DRIVER_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX", - caCertFile.getAbsolutePath - ) + caCertFile.getAbsolutePath) val kubernetesConf = KubernetesConf( submissionSparkConf, driverSpecificConf, @@ -171,8 +146,7 @@ class DriverKubernetesCredentialsFeatureStepSuite Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val kubernetesCredentialsStep = new DriverKubernetesCredentialsFeatureStep(kubernetesConf) val resolvedProperties = @@ -186,55 +160,39 @@ class DriverKubernetesCredentialsFeatureStepSuite s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CLIENT_CERT_FILE_CONF_SUFFIX" -> DRIVER_CREDENTIALS_CLIENT_CERT_PATH, s"$KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX.$CA_CERT_FILE_CONF_SUFFIX" -> - DRIVER_CREDENTIALS_CA_CERT_PATH - ) + DRIVER_CREDENTIALS_CA_CERT_PATH) assert(resolvedProperties === expectedSparkConf) - assert( - kubernetesCredentialsStep.getAdditionalKubernetesResources().size === 1 - ) + assert(kubernetesCredentialsStep.getAdditionalKubernetesResources().size === 1) val credentialsSecret = kubernetesCredentialsStep .getAdditionalKubernetesResources() .head .asInstanceOf[Secret] assert( credentialsSecret.getMetadata.getName === - s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials" - ) + s"$KUBERNETES_RESOURCE_NAME_PREFIX-kubernetes-credentials") val decodedSecretData = credentialsSecret.getData.asScala.map { data => - ( - data._1, - new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8) - ) + (data._1, new String(BaseEncoding.base64().decode(data._2), Charsets.UTF_8)) } val expectedSecretData = Map( DRIVER_CREDENTIALS_CA_CERT_SECRET_NAME -> "ca-cert", DRIVER_CREDENTIALS_OAUTH_TOKEN_SECRET_NAME -> "token", DRIVER_CREDENTIALS_CLIENT_KEY_SECRET_NAME -> "key", - DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert" - ) + DRIVER_CREDENTIALS_CLIENT_CERT_SECRET_NAME -> "cert") assert(decodedSecretData === expectedSecretData) val driverPod = kubernetesCredentialsStep.configurePod(BASE_DRIVER_POD) val driverPodVolumes = driverPod.pod.getSpec.getVolumes.asScala assert(driverPodVolumes.size === 1) - assert( - driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME - ) + assert(driverPodVolumes.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) assert(driverPodVolumes.head.getSecret != null) assert( - driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName - ) + driverPodVolumes.head.getSecret.getSecretName === credentialsSecret.getMetadata.getName) val driverContainerVolumeMount = driverPod.container.getVolumeMounts.asScala assert(driverContainerVolumeMount.size === 1) - assert( - driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME - ) - assert( - driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR - ) + assert(driverContainerVolumeMount.head.getName === DRIVER_CREDENTIALS_SECRET_VOLUME_NAME) + assert(driverContainerVolumeMount.head.getMountPath === DRIVER_CREDENTIALS_SECRETS_BASE_DIR) } - private def writeCredentials(credentialsFileName: String, - credentialsContents: String): File = { + private def writeCredentials(credentialsFileName: String, credentialsContents: String): File = { val credentialsFile = new File(credentialsTempDirectory, credentialsFileName) Files.write(credentialsContents, credentialsFile, Charsets.UTF_8) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala index e7ed14e835b38..50105544a306e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/DriverServiceFeatureStepSuite.scala @@ -16,18 +16,15 @@ */ package org.apache.spark.deploy.k8s.features +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.{Service, Toleration} import org.mockito.{Mock, MockitoAnnotations} import org.mockito.Mockito.when import org.scalatest.BeforeAndAfter -import scala.collection.JavaConverters._ import org.apache.spark.{SparkConf, SparkFunSuite} -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.util.Clock @@ -41,6 +38,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val LONG_RESOURCE_NAME_PREFIX = "a" * (DriverServiceFeatureStep.MAX_SERVICE_NAME_LENGTH - DriverServiceFeatureStep.DRIVER_SVC_POSTFIX.length + 1) + private val DRIVER_LABELS = Map("label1key" -> "label1value", "label2key" -> "label2value") @@ -72,20 +70,16 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) assert( configurationStep.configurePod(SparkPod.initialPod()) === SparkPod - .initialPod() - ) + .initialPod()) assert(configurationStep.getAdditionalKubernetesResources().size === 1) assert( configurationStep .getAdditionalKubernetesResources() .head - .isInstanceOf[Service] - ) + .isInstanceOf[Service]) val driverService = configurationStep .getAdditionalKubernetesResources() .head @@ -94,8 +88,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { 9000, 8080, s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", - driverService - ) + driverService) } test("Hostname and ports are set according to the service name.") { @@ -116,9 +109,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX + DriverServiceFeatureStep.DRIVER_SVC_POSTFIX val expectedHostName = s"$expectedServiceName.my-namespace.svc" @@ -141,9 +132,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) - ) + Seq.empty[String])) val resolvedService = configurationStep .getAdditionalKubernetesResources() .head @@ -152,18 +141,12 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { DEFAULT_DRIVER_PORT, DEFAULT_BLOCKMANAGER_PORT, s"$SHORT_RESOURCE_NAME_PREFIX${DriverServiceFeatureStep.DRIVER_SVC_POSTFIX}", - resolvedService - ) + resolvedService) val additionalProps = configurationStep.getAdditionalPodSystemProperties() + assert(additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString) assert( - additionalProps("spark.driver.port") === DEFAULT_DRIVER_PORT.toString - ) - assert( - additionalProps( - org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key - ) - === DEFAULT_BLOCKMANAGER_PORT.toString - ) + additionalProps(org.apache.spark.internal.config.DRIVER_BLOCK_MANAGER_PORT.key) + === DEFAULT_BLOCKMANAGER_PORT.toString) } test("Long prefixes should switch to using a generated name.") { @@ -182,10 +165,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ), - clock - ) + Seq.empty[String]), + clock) val driverService = configurationStep .getAdditionalKubernetesResources() .head @@ -215,10 +196,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ), - clock - ) + Seq.empty[String]), + clock) fail("The driver bind address should not be allowed.") } catch { case e: Throwable => @@ -226,8 +205,7 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { e.getMessage === s"requirement failed: ${DriverServiceFeatureStep.DRIVER_BIND_ADDRESS_KEY} is" + " not supported in Kubernetes mode, as the driver's bind address is managed" + - " and set to the driver pod's IP address." - ) + " and set to the driver pod's IP address.") } sparkConf.remove(org.apache.spark.internal.config.DRIVER_BIND_ADDRESS) sparkConf.set(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS, "host") @@ -246,10 +224,8 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ), - clock - ) + Seq.empty[String]), + clock) fail("The driver host address should not be allowed.") } catch { case e: Throwable => @@ -257,15 +233,15 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { e.getMessage === s"requirement failed: ${DriverServiceFeatureStep.DRIVER_HOST_KEY} is" + " not supported in Kubernetes mode, as the driver's hostname will be managed via" + - " a Kubernetes service." - ) + " a Kubernetes service.") } } - private def verifyService(driverPort: Int, - blockManagerPort: Int, - expectedServiceName: String, - service: Service): Unit = { + private def verifyService( + driverPort: Int, + blockManagerPort: Int, + expectedServiceName: String, + service: Service): Unit = { assert(service.getMetadata.getName === expectedServiceName) assert(service.getSpec.getClusterIP === "None") assert(service.getSpec.getSelector.asScala === DRIVER_LABELS) @@ -279,10 +255,11 @@ class DriverServiceFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { assert(driverServicePorts(1).getTargetPort.getIntVal === blockManagerPort) } - private def verifySparkConfHostNames(driverSparkConf: Map[String, String], - expectedHostName: String): Unit = { + private def verifySparkConfHostNames( + driverSparkConf: Map[String, String], + expectedHostName: String): Unit = { assert( - driverSparkConf(org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName - ) + driverSparkConf( + org.apache.spark.internal.config.DRIVER_HOST_ADDRESS.key) === expectedHostName) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala index b82dd42f24661..df3aa20b03729 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/EnvSecretsFeatureStepSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{PodBuilder, Toleration} + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ @@ -32,8 +33,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite { val baseDriverPod = SparkPod.initialPod() val envVarsToKeys = Map( ENV_NAME_BAR -> s"${KEY_REF_NAME_BAR}:${KEY_REF_KEY_BAR}", - ENV_NAME_FOO -> s"${KEY_REF_NAME_FOO}:${KEY_REF_KEY_FOO}" - ) + ENV_NAME_FOO -> s"${KEY_REF_NAME_FOO}:${KEY_REF_KEY_FOO}") val sparkConf = new SparkConf(false) val kubernetesConf = KubernetesConf( sparkConf, @@ -48,8 +48,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val step = new EnvSecretsFeatureStep(kubernetesConf) val driverContainerWithEnvSecrets = @@ -61,8 +60,7 @@ class EnvSecretsFeatureStepSuite extends SparkFunSuite { expectedVars.foreach { envName => assert( KubernetesFeaturesTestUtils - .containerHasEnvVar(driverContainerWithEnvSecrets, envName) - ) + .containerHasEnvVar(driverContainerWithEnvSecrets, envName)) } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala index f90380e30e52a..0a44a3436f710 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/KubernetesFeaturesTestUtils.scala @@ -29,10 +29,10 @@ import org.apache.spark.deploy.k8s.SparkPod object KubernetesFeaturesTestUtils { def getMockConfigStepForStepType[T <: KubernetesFeatureConfigStep]( - stepType: String, stepClass: Class[T]): T = { + stepType: String, + stepClass: Class[T]): T = { val mockStep = mock(stepClass) - when(mockStep.getAdditionalKubernetesResources()).thenReturn( - getSecretsForStepType(stepType)) + when(mockStep.getAdditionalKubernetesResources()).thenReturn(getSecretsForStepType(stepType)) when(mockStep.getAdditionalPodSystemProperties()) .thenReturn(Map(stepType -> stepType)) @@ -51,13 +51,14 @@ object KubernetesFeaturesTestUtils { mockStep } - def getSecretsForStepType[T <: KubernetesFeatureConfigStep](stepType: String) - : Seq[HasMetadata] = { - Seq(new SecretBuilder() - .withNewMetadata() - .withName(stepType) - .endMetadata() - .build()) + def getSecretsForStepType[T <: KubernetesFeatureConfigStep]( + stepType: String): Seq[HasMetadata] = { + Seq( + new SecretBuilder() + .withNewMetadata() + .withName(stepType) + .endMetadata() + .build()) } def containerHasEnvVar(container: Container, envVarName: String): Boolean = { diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala index aedc5305d6c4b..f5ced82224d4a 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/LocalDirsFeatureStepSuite.scala @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.api.model.{ } import org.mockito.Mockito import org.scalatest.BeforeAndAfter + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{ KubernetesConf, @@ -35,6 +36,7 @@ import org.apache.spark.deploy.k8s.{ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { private val defaultLocalDir = "/var/data/default-local-dir" private var sparkConf: SparkConf = _ + private var kubernetesConf: KubernetesConf[_ <: KubernetesRoleSpecificConf] = _ @@ -54,8 +56,7 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) } test("Resolve to default local dir if neither env nor configuration are set") { @@ -71,24 +72,21 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .withName(s"spark-local-dir-1") .withNewEmptyDir() .endEmptyDir() - .build() - ) + .build()) assert(configuredPod.container.getVolumeMounts.size === 1) assert( configuredPod.container.getVolumeMounts.get(0) === new VolumeMountBuilder() .withName(s"spark-local-dir-1") .withMountPath(defaultLocalDir) - .build() - ) + .build()) assert(configuredPod.container.getEnv.size === 1) assert( configuredPod.container.getEnv.get(0) === new EnvVarBuilder() .withName("SPARK_LOCAL_DIRS") .withValue(defaultLocalDir) - .build() - ) + .build()) } test("Use configured local dirs split on comma if provided.") { @@ -106,38 +104,33 @@ class LocalDirsFeatureStepSuite extends SparkFunSuite with BeforeAndAfter { .withName(s"spark-local-dir-1") .withNewEmptyDir() .endEmptyDir() - .build() - ) + .build()) assert( configuredPod.pod.getSpec.getVolumes.get(1) === new VolumeBuilder() .withName(s"spark-local-dir-2") .withNewEmptyDir() .endEmptyDir() - .build() - ) + .build()) assert(configuredPod.container.getVolumeMounts.size === 2) assert( configuredPod.container.getVolumeMounts.get(0) === new VolumeMountBuilder() .withName(s"spark-local-dir-1") .withMountPath("/var/data/my-local-dir-1") - .build() - ) + .build()) assert( configuredPod.container.getVolumeMounts.get(1) === new VolumeMountBuilder() .withName(s"spark-local-dir-2") .withMountPath("/var/data/my-local-dir-2") - .build() - ) + .build()) assert(configuredPod.container.getEnv.size === 1) assert( configuredPod.container.getEnv.get(0) === new EnvVarBuilder() .withName("SPARK_LOCAL_DIRS") .withValue("/var/data/my-local-dir-1,/var/data/my-local-dir-2") - .build() - ) + .build()) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala index b44c38aee963b..557cd4c4ab449 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountSecretsFeatureStepSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.{PodBuilder, Toleration} + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.{ KubernetesConf, @@ -49,8 +50,7 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) val step = new MountSecretsFeatureStep(kubernetesConf) val driverPodWithSecretsMounted = step.configurePod(baseDriverPod).pod @@ -58,18 +58,12 @@ class MountSecretsFeatureStepSuite extends SparkFunSuite { step.configurePod(baseDriverPod).container Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => - assert( - SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName) - ) + assert(SecretVolumeUtils.podHasVolume(driverPodWithSecretsMounted, volumeName)) } Seq(s"$SECRET_FOO-volume", s"$SECRET_BAR-volume").foreach { volumeName => assert( - SecretVolumeUtils.containerHasVolume( - driverContainerWithSecretsMounted, - volumeName, - SECRET_MOUNT_PATH - ) - ) + SecretVolumeUtils + .containerHasVolume(driverContainerWithSecretsMounted, volumeName, SECRET_MOUNT_PATH)) } } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala index be3e8a4aa086f..dbfb2bf928fa0 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/MountVolumesFeatureStepSuite.scala @@ -17,15 +17,16 @@ package org.apache.spark.deploy.k8s.features import io.fabric8.kubernetes.api.model.Toleration + import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ class MountVolumesFeatureStepSuite extends SparkFunSuite { private val sparkConf = new SparkConf(false) + private val emptyKubernetesConf = KubernetesConf( sparkConf = sparkConf, - roleSpecificConf = - KubernetesDriverSpecificConf(None, "app-name", "main", Seq.empty), + roleSpecificConf = KubernetesDriverSpecificConf(None, "app-name", "main", Seq.empty), appResourceNamePrefix = "resource", appId = "app-id", roleLabels = Map.empty, @@ -36,16 +37,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { roleVolumes = Nil, driverTolerations = Seq.empty[Toleration], executorTolerations = Seq.empty[Toleration], - sparkFiles = Nil - ) + sparkFiles = Nil) test("Mounts hostPath volumes") { val volumeConf = KubernetesVolumeSpec( "testVolume", "/tmp", false, - KubernetesHostPathVolumeConf("/hostPath/tmp") - ) + KubernetesHostPathVolumeConf("/hostPath/tmp")) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -56,25 +55,16 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { configuredPod.pod.getSpec.getVolumes .get(0) .getHostPath - .getPath === "/hostPath/tmp" - ) + .getPath === "/hostPath/tmp") assert(configuredPod.container.getVolumeMounts.size() === 1) - assert( - configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp" - ) - assert( - configuredPod.container.getVolumeMounts.get(0).getName === "testVolume" - ) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) } test("Mounts pesistentVolumeClaims") { - val volumeConf = KubernetesVolumeSpec( - "testVolume", - "/tmp", - true, - KubernetesPVCVolumeConf("pvcClaim") - ) + val volumeConf = + KubernetesVolumeSpec("testVolume", "/tmp", true, KubernetesPVCVolumeConf("pvcClaim")) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -85,12 +75,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { configuredPod.pod.getSpec.getVolumes.get(0).getPersistentVolumeClaim assert(pvcClaim.getClaimName === "pvcClaim") assert(configuredPod.container.getVolumeMounts.size() === 1) - assert( - configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp" - ) - assert( - configuredPod.container.getVolumeMounts.get(0).getName === "testVolume" - ) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === true) } @@ -100,8 +86,7 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { "testVolume", "/tmp", false, - KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G")) - ) + KubernetesEmptyDirVolumeConf(Some("Memory"), Some("6G"))) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -112,22 +97,14 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(emptyDir.getMedium === "Memory") assert(emptyDir.getSizeLimit.getAmount === "6G") assert(configuredPod.container.getVolumeMounts.size() === 1) - assert( - configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp" - ) - assert( - configuredPod.container.getVolumeMounts.get(0).getName === "testVolume" - ) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) } test("Mounts emptyDir with no options") { - val volumeConf = KubernetesVolumeSpec( - "testVolume", - "/tmp", - false, - KubernetesEmptyDirVolumeConf(None, None) - ) + val volumeConf = + KubernetesVolumeSpec("testVolume", "/tmp", false, KubernetesEmptyDirVolumeConf(None, None)) val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumeConf :: Nil) val step = new MountVolumesFeatureStep(kubernetesConf) @@ -138,12 +115,8 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { assert(emptyDir.getMedium === "") assert(emptyDir.getSizeLimit.getAmount === null) assert(configuredPod.container.getVolumeMounts.size() === 1) - assert( - configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp" - ) - assert( - configuredPod.container.getVolumeMounts.get(0).getName === "testVolume" - ) + assert(configuredPod.container.getVolumeMounts.get(0).getMountPath === "/tmp") + assert(configuredPod.container.getVolumeMounts.get(0).getName === "testVolume") assert(configuredPod.container.getVolumeMounts.get(0).getReadOnly === false) } @@ -152,14 +125,12 @@ class MountVolumesFeatureStepSuite extends SparkFunSuite { "hpVolume", "/tmp", false, - KubernetesHostPathVolumeConf("/hostPath/tmp") - ) + KubernetesHostPathVolumeConf("/hostPath/tmp")) val pvcVolumeConf = KubernetesVolumeSpec( "checkpointVolume", "/checkpoints", true, - KubernetesPVCVolumeConf("pvcClaim") - ) + KubernetesPVCVolumeConf("pvcClaim")) val volumesConf = hpVolumeConf :: pvcVolumeConf :: Nil val kubernetesConf = emptyKubernetesConf.copy(roleVolumes = volumesConf) val step = new MountVolumesFeatureStep(kubernetesConf) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala index fc0e5be067349..020a4d900fb0e 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/JavaDriverFeatureStepSuite.scala @@ -16,17 +16,14 @@ */ package org.apache.spark.deploy.k8s.features.bindings +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.Toleration + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.PythonMainAppResource -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} -import org.apache.spark.{SparkConf, SparkFunSuite} - -import scala.collection.JavaConverters._ class JavaDriverFeatureStepSuite extends SparkFunSuite { @@ -39,8 +36,7 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite { Some(PythonMainAppResource("local:///main.jar")), "test-class", "java-runner", - Seq("5 7") - ), + Seq("5 7")), appResourceNamePrefix = "", appId = "", roleLabels = Map.empty, @@ -51,8 +47,7 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite { roleVolumes = Nil, driverTolerations = Seq.empty[Toleration], executorTolerations = Seq.empty[Toleration], - sparkFiles = Seq.empty[String] - ) + sparkFiles = Seq.empty[String]) val step = new JavaDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod @@ -67,8 +62,6 @@ class JavaDriverFeatureStepSuite extends SparkFunSuite { "--class", "test-class", "spark-internal", - "5 7" - ) - ) + "5 7")) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala index 2ee1ed90d6e8e..0c743686c1091 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/PythonDriverFeatureStepSuite.scala @@ -16,18 +16,15 @@ */ package org.apache.spark.deploy.k8s.features.bindings +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.Toleration + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.PythonMainAppResource -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} -import org.apache.spark.{SparkConf, SparkFunSuite} - -import scala.collection.JavaConverters._ class PythonDriverFeatureStepSuite extends SparkFunSuite { @@ -49,8 +46,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { Some(PythonMainAppResource("local:///main.py")), "test-app", "python-runner", - Seq("5", "7", "9") - ), + Seq("5", "7", "9")), appResourceNamePrefix = "", appId = "", roleLabels = Map.empty, @@ -61,8 +57,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleVolumes = Nil, driverTolerations = Seq.empty[Toleration], executorTolerations = Seq.empty[Toleration], - sparkFiles = Seq.empty[String] - ) + sparkFiles = Seq.empty[String]) val step = new PythonDriverFeatureStep(kubernetesConf) val driverPod = step.configurePod(baseDriverPod).pod @@ -88,8 +83,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { Some(PythonMainAppResource("local:///main.py")), "test-class-py", "python-runner", - Seq.empty[String] - ), + Seq.empty[String]), appResourceNamePrefix = "", appId = "", roleLabels = Map.empty, @@ -100,8 +94,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { roleVolumes = Nil, driverTolerations = Seq.empty[Toleration], executorTolerations = Seq.empty[Toleration], - sparkFiles = Seq.empty[String] - ) + sparkFiles = Seq.empty[String]) val step = new PythonDriverFeatureStep(kubernetesConf) val driverContainerwithPySpark = step.configurePod(baseDriverPod).container val args = driverContainerwithPySpark.getArgs.asScala @@ -112,9 +105,7 @@ class PythonDriverFeatureStepSuite extends SparkFunSuite { "--properties-file", SPARK_CONF_PATH, "--class", - "test-class-py" - ) - ) + "test-class-py")) val envs = driverContainerwithPySpark.getEnv.asScala .map(env => (env.getName, env.getValue)) .toMap diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala index 524d1ea0697c6..b7d1f0385e478 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/features/bindings/RDriverFeatureStepSuite.scala @@ -16,18 +16,15 @@ */ package org.apache.spark.deploy.k8s.features.bindings +import scala.collection.JavaConverters._ + import io.fabric8.kubernetes.api.model.Toleration + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.{KubernetesConf, KubernetesDriverSpecificConf, SparkPod} import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants._ import org.apache.spark.deploy.k8s.submit.RMainAppResource -import org.apache.spark.deploy.k8s.{ - KubernetesConf, - KubernetesDriverSpecificConf, - SparkPod -} -import org.apache.spark.{SparkConf, SparkFunSuite} - -import scala.collection.JavaConverters._ class RDriverFeatureStepSuite extends SparkFunSuite { @@ -43,8 +40,7 @@ class RDriverFeatureStepSuite extends SparkFunSuite { Some(RMainAppResource(mainResource)), "test-app", "r-runner", - Seq("5", "7", "9") - ), + Seq("5", "7", "9")), appResourceNamePrefix = "", appId = "", roleLabels = Map.empty, @@ -55,8 +51,7 @@ class RDriverFeatureStepSuite extends SparkFunSuite { roleVolumes = Seq.empty, driverTolerations = Seq.empty[Toleration], executorTolerations = Seq.empty[Toleration], - sparkFiles = Seq.empty[String] - ) + sparkFiles = Seq.empty[String]) val step = new RDriverFeatureStep(kubernetesConf) val driverContainerwithR = step.configurePod(baseDriverPod).container diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala index 5c1905bbb957c..489e1f72ab411 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilderSuite.scala @@ -17,12 +17,9 @@ package org.apache.spark.deploy.k8s.submit import io.fabric8.kubernetes.api.model.Toleration + +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ -import org.apache.spark.deploy.k8s.features.bindings.{ - JavaDriverFeatureStep, - PythonDriverFeatureStep, - RDriverFeatureStep -} import org.apache.spark.deploy.k8s.features.{ BasicDriverFeatureStep, DriverKubernetesCredentialsFeatureStep, @@ -33,7 +30,11 @@ import org.apache.spark.deploy.k8s.features.{ MountSecretsFeatureStep, _ } -import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.features.bindings.{ + JavaDriverFeatureStep, + PythonDriverFeatureStep, + RDriverFeatureStep +} class KubernetesDriverBuilderSuite extends SparkFunSuite { @@ -51,61 +52,51 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, - classOf[BasicDriverFeatureStep] - ) + classOf[BasicDriverFeatureStep]) private val credentialsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( CREDENTIALS_STEP_TYPE, - classOf[DriverKubernetesCredentialsFeatureStep] - ) + classOf[DriverKubernetesCredentialsFeatureStep]) private val serviceStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SERVICE_STEP_TYPE, - classOf[DriverServiceFeatureStep] - ) + classOf[DriverServiceFeatureStep]) private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( LOCAL_DIRS_STEP_TYPE, - classOf[LocalDirsFeatureStep] - ) + classOf[LocalDirsFeatureStep]) private val secretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, - classOf[MountSecretsFeatureStep] - ) + classOf[MountSecretsFeatureStep]) private val javaStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( JAVA_STEP_TYPE, - classOf[JavaDriverFeatureStep] - ) + classOf[JavaDriverFeatureStep]) private val pythonStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( PYSPARK_STEP_TYPE, - classOf[PythonDriverFeatureStep] - ) + classOf[PythonDriverFeatureStep]) private val rStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( R_STEP_TYPE, - classOf[RDriverFeatureStep] - ) + classOf[RDriverFeatureStep]) private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, - classOf[EnvSecretsFeatureStep] - ) + classOf[EnvSecretsFeatureStep]) private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, - classOf[MountVolumesFeatureStep] - ) + classOf[MountVolumesFeatureStep]) private val builderUnderTest: KubernetesDriverBuilder = new KubernetesDriverBuilder( @@ -118,8 +109,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { _ => mountVolumesStep, _ => pythonStep, _ => rStep, - _ => javaStep - ) + _ => javaStep) test("Apply fundamental steps all the time.") { val conf = KubernetesConf( @@ -128,8 +118,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Some(JavaMainAppResource("example.jar")), "test-app", "main", - Seq.empty - ), + Seq.empty), "prefix", "appId", Map.empty, @@ -140,16 +129,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE - ) + JAVA_STEP_TYPE) } test("Apply secrets step if secrets are present.") { @@ -166,8 +153,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -176,8 +162,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, ENV_SECRETS_STEP_TYPE, - JAVA_STEP_TYPE - ) + JAVA_STEP_TYPE) } test("Apply Java step if main resource is none.") { @@ -194,16 +179,14 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - JAVA_STEP_TYPE - ) + JAVA_STEP_TYPE) } test("Apply Python step if main resource is python.") { @@ -213,8 +196,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Some(PythonMainAppResource("example.py")), "test-app", "main", - Seq.empty - ), + Seq.empty), "prefix", "appId", Map.empty, @@ -225,25 +207,19 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - PYSPARK_STEP_TYPE - ) + PYSPARK_STEP_TYPE) } test("Apply volumes step if mounts are present.") { - val volumeSpec = KubernetesVolumeSpec( - "volume", - "/tmp", - false, - KubernetesHostPathVolumeConf("/path") - ) + val volumeSpec = + KubernetesVolumeSpec("volume", "/tmp", false, KubernetesHostPathVolumeConf("/path")) val conf = KubernetesConf( new SparkConf(false), KubernetesDriverSpecificConf(None, "test-app", "main", Seq.empty), @@ -257,8 +233,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { volumeSpec :: Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, @@ -266,8 +241,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, MOUNT_VOLUMES_STEP_TYPE, - JAVA_STEP_TYPE - ) + JAVA_STEP_TYPE) } test("Apply R step if main resource is R.") { @@ -277,8 +251,7 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Some(RMainAppResource("example.R")), "test-app", "main", - Seq.empty - ), + Seq.empty), "prefix", "appId", Map.empty, @@ -289,30 +262,25 @@ class KubernetesDriverBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, CREDENTIALS_STEP_TYPE, SERVICE_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - R_STEP_TYPE - ) + R_STEP_TYPE) } - private def validateStepTypesApplied(resolvedSpec: KubernetesDriverSpec, - stepTypes: String*): Unit = { + private def validateStepTypesApplied( + resolvedSpec: KubernetesDriverSpec, + stepTypes: String*): Unit = { assert(resolvedSpec.systemProperties.size === stepTypes.size) stepTypes.foreach { stepType => - assert( - resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType - ) + assert(resolvedSpec.pod.pod.getMetadata.getLabels.get(stepType) === stepType) assert( resolvedSpec.driverKubernetesResources.containsSlice( - KubernetesFeaturesTestUtils.getSecretsForStepType(stepType) - ) - ) + KubernetesFeaturesTestUtils.getSecretsForStepType(stepType))) assert(resolvedSpec.systemProperties(stepType) === stepType) } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala index 95d4abbac9366..e9c4c401e7403 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilderSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.scheduler.cluster.k8s import io.fabric8.kubernetes.api.model.{PodBuilder, Toleration} + +import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s._ import org.apache.spark.deploy.k8s.features._ -import org.apache.spark.{SparkConf, SparkFunSuite} class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val BASIC_STEP_TYPE = "basic" @@ -31,44 +32,39 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { private val basicFeatureStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( BASIC_STEP_TYPE, - classOf[BasicExecutorFeatureStep] - ) + classOf[BasicExecutorFeatureStep]) + private val mountSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( SECRETS_STEP_TYPE, - classOf[MountSecretsFeatureStep] - ) + classOf[MountSecretsFeatureStep]) + private val envSecretsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( ENV_SECRETS_STEP_TYPE, - classOf[EnvSecretsFeatureStep] - ) + classOf[EnvSecretsFeatureStep]) + private val localDirsStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( LOCAL_DIRS_STEP_TYPE, - classOf[LocalDirsFeatureStep] - ) + classOf[LocalDirsFeatureStep]) + private val mountVolumesStep = KubernetesFeaturesTestUtils.getMockConfigStepForStepType( MOUNT_VOLUMES_STEP_TYPE, - classOf[MountVolumesFeatureStep] - ) + classOf[MountVolumesFeatureStep]) private val builderUnderTest = new KubernetesExecutorBuilder( _ => basicFeatureStep, _ => mountSecretsStep, _ => envSecretsStep, _ => localDirsStep, - _ => mountVolumesStep - ) + _ => mountVolumesStep) test("Basic steps are consistently applied.") { val conf = KubernetesConf( new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", - Some(new PodBuilder().build()) - ), + KubernetesExecutorSpecificConf("executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, @@ -79,22 +75,17 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, - LOCAL_DIRS_STEP_TYPE - ) + LOCAL_DIRS_STEP_TYPE) } test("Apply secrets step if secrets are present.") { val conf = KubernetesConf( new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", - Some(new PodBuilder().build()) - ), + KubernetesExecutorSpecificConf("executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, @@ -105,30 +96,21 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, SECRETS_STEP_TYPE, - ENV_SECRETS_STEP_TYPE - ) + ENV_SECRETS_STEP_TYPE) } test("Apply volumes step if mounts are present.") { - val volumeSpec = KubernetesVolumeSpec( - "volume", - "/tmp", - false, - KubernetesHostPathVolumeConf("/checkpoint") - ) + val volumeSpec = + KubernetesVolumeSpec("volume", "/tmp", false, KubernetesHostPathVolumeConf("/checkpoint")) val conf = KubernetesConf( new SparkConf(false), - KubernetesExecutorSpecificConf( - "executor-id", - Some(new PodBuilder().build()) - ), + KubernetesExecutorSpecificConf("executor-id", Some(new PodBuilder().build())), "prefix", "appId", Map.empty, @@ -139,18 +121,15 @@ class KubernetesExecutorBuilderSuite extends SparkFunSuite { volumeSpec :: Nil, Seq.empty[Toleration], Seq.empty[Toleration], - Seq.empty[String] - ) + Seq.empty[String]) validateStepTypesApplied( builderUnderTest.buildFromFeatures(conf), BASIC_STEP_TYPE, LOCAL_DIRS_STEP_TYPE, - MOUNT_VOLUMES_STEP_TYPE - ) + MOUNT_VOLUMES_STEP_TYPE) } - private def validateStepTypesApplied(resolvedPod: SparkPod, - stepTypes: String*): Unit = { + private def validateStepTypesApplied(resolvedPod: SparkPod, stepTypes: String*): Unit = { assert(resolvedPod.pod.getMetadata.getLabels.size === stepTypes.size) stepTypes.foreach { stepType => assert(resolvedPod.pod.getMetadata.getLabels.get(stepType) === stepType)