Skip to content

Commit

Permalink
[SPARK-25876][K8S] Simplify kubernetes configuration types.
Browse files Browse the repository at this point in the history
There are a few issues with the current configuration types used in
the kubernetes backend:

- they use type parameters for role-specific specialization, which makes
  type signatures really noisy throughout the code base.

- they break encapsulation by forcing the code that creates the config
  object to remove the configuration from SparkConf before creating the
  k8s-specific wrapper.

- they don't provide an easy way for tests to have default values for
  fields they do not use.

This change fixes those problems by:

- creating a base config type with role-specific specialization using
  inheritance

- encapsulating the logic of parsing SparkConf into k8s-specific views
  inside the k8s config classes

- providing some helper code for tests to easily override just the part
  of the configs they want.

Most of the change relates to the above, especially cleaning up the
tests. While doing that, I also made some smaller changes elsewhere:

- removed unnecessary type parameters in KubernetesVolumeSpec

- simplified the error detection logic in KubernetesVolumeUtils; all
  the call sites would just throw the first exception collected by
  that class, since they all called "get" on the "Try" object. Now
  the unnecessary wrapping is gone and the exception is just thrown
  where it occurs.

- removed a lot of unnecessary mocking from tests.

- changed the kerberos-related code so that less logic needs to live
  in the driver builder. In spirit it should be part of the upcoming
  work in this series of cleanups, but it made parts of this change
  simpler.

Tested with existing unit tests and integration tests.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#22959 from vanzin/SPARK-25876.
  • Loading branch information
Marcelo Vanzin authored and jackylee-ch committed Feb 18, 2019
1 parent 81bddf9 commit 5d05f59
Show file tree
Hide file tree
Showing 40 changed files with 777 additions and 1,512 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ private[spark] object Config extends Logging {
.doc("Comma separated list of the Kubernetes secrets used " +
"to access private image registries.")
.stringConf
.createOptional
.toSequence
.createWithDefault(Nil)

val KUBERNETES_AUTH_DRIVER_CONF_PREFIX =
"spark.kubernetes.authenticate.driver"
Expand Down Expand Up @@ -112,16 +113,16 @@ private[spark] object Config extends Logging {
.stringConf
.createOptional

val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
// For testing only.
val KUBERNETES_DRIVER_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.driver.resourceNamePrefix")
.internal()
.stringConf
.createWithDefault("spark")
.createOptional

val KUBERNETES_PYSPARK_PY_FILES =
ConfigBuilder("spark.kubernetes.python.pyFiles")
.doc("The PyFiles that are distributed via client arguments")
val KUBERNETES_EXECUTOR_POD_NAME_PREFIX =
ConfigBuilder("spark.kubernetes.executor.podNamePrefix")
.doc("Prefix to use in front of the executor pod names.")
.internal()
.stringConf
.createOptional
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,93 +16,53 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable
import java.util.Locale

import io.fabric8.kubernetes.api.model.{LocalObjectReference, LocalObjectReferenceBuilder, Pod}
import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.security.KubernetesHadoopDelegationTokenManager
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.util.Utils


private[spark] sealed trait KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: MainAppResource,
mainClass: String,
appName: String,
appArgs: Seq[String],
pyFiles: Seq[String] = Nil) extends KubernetesRoleSpecificConf {

require(mainAppResource != null, "Main resource must be provided.")

}

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
*/
private[spark] case class KubernetesExecutorSpecificConf(
executorId: String,
driverPod: Option[Pod])
extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for HADOOP_CONF_DIR customization
*/
private[spark] case class HadoopConfSpec(
hadoopConfDir: Option[String],
hadoopConfigMapName: Option[String])

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
sparkConf: SparkConf,
roleSpecificConf: T,
appResourceNamePrefix: String,
appId: String,
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
hadoopConfSpec: Option[HadoopConfSpec]) {
private[spark] abstract class KubernetesConf(val sparkConf: SparkConf) {

def hadoopConfigMapName: String = s"$appResourceNamePrefix-hadoop-config"
val resourceNamePrefix: String
def labels: Map[String, String]
def environment: Map[String, String]
def annotations: Map[String, String]
def secretEnvNamesToKeyRefs: Map[String, String]
def secretNamesToMountPaths: Map[String, String]
def volumes: Seq[KubernetesVolumeSpec]

def krbConfigMapName: String = s"$appResourceNamePrefix-krb5-file"
def appName: String = get("spark.app.name", "spark")

def tokenManager(conf: SparkConf, hConf: Configuration): KubernetesHadoopDelegationTokenManager =
new KubernetesHadoopDelegationTokenManager(conf, hConf)
def hadoopConfigMapName: String = s"$resourceNamePrefix-hadoop-config"

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)
def krbConfigMapName: String = s"$resourceNamePrefix-krb5-file"

def imagePullPolicy(): String = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
def namespace: String = get(KUBERNETES_NAMESPACE)

def imagePullSecrets(): Seq[LocalObjectReference] = {
def imagePullPolicy: String = get(CONTAINER_IMAGE_PULL_POLICY)

def imagePullSecrets: Seq[LocalObjectReference] = {
sparkConf
.get(IMAGE_PULL_SECRETS)
.map(_.split(","))
.getOrElse(Array.empty[String])
.map(_.trim)
.map { secret =>
new LocalObjectReferenceBuilder().withName(secret).build()
}
}

def nodeSelector(): Map[String, String] =
def nodeSelector: Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

def contains(config: ConfigEntry[_]): Boolean = sparkConf.contains(config)

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

def get(conf: String): String = sparkConf.get(conf)
Expand All @@ -112,125 +72,139 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
def getOption(key: String): Option[String] = sparkConf.getOption(key)
}

private[spark] class KubernetesDriverConf(
sparkConf: SparkConf,
val appId: String,
val mainAppResource: MainAppResource,
val mainClass: String,
val appArgs: Array[String],
val pyFiles: Seq[String])
extends KubernetesConf(sparkConf) {

override val resourceNamePrefix: String = {
val custom = if (Utils.isTesting) get(KUBERNETES_DRIVER_POD_NAME_PREFIX) else None
custom.getOrElse(KubernetesConf.getResourceNamePrefix(appName))
}

override def labels: Map[String, String] = {
val presetLabels = Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)

presetLabels.keys.foreach { key =>
require(
!driverCustomLabels.contains(key),
s"Label with key $key is not allowed as it is reserved for Spark bookkeeping operations.")
}

driverCustomLabels ++ presetLabels
}

override def environment: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
}

override def annotations: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
}

override def secretNamesToMountPaths: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
}

override def secretEnvNamesToKeyRefs: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
}

override def volumes: Seq[KubernetesVolumeSpec] = {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
}
}

private[spark] class KubernetesExecutorConf(
sparkConf: SparkConf,
val appId: String,
val executorId: String,
val driverPod: Option[Pod])
extends KubernetesConf(sparkConf) {

override val resourceNamePrefix: String = {
get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX).getOrElse(
KubernetesConf.getResourceNamePrefix(appName))
}

override def labels: Map[String, String] = {
val presetLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)

val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)

presetLabels.keys.foreach { key =>
require(
!executorCustomLabels.contains(key),
s"Custom executor labels cannot contain $key as it is reserved for Spark.")
}

executorCustomLabels ++ presetLabels
}

override def environment: Map[String, String] = sparkConf.getExecutorEnv.toMap

override def annotations: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
}

override def secretNamesToMountPaths: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
}

override def secretEnvNamesToKeyRefs: Map[String, String] = {
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
}

override def volumes: Seq[KubernetesVolumeSpec] = {
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
}

}

private[spark] object KubernetesConf {
def createDriverConf(
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: MainAppResource,
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String],
hadoopConfDir: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(!driverCustomLabels.contains(SPARK_APP_ID_LABEL), "Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
require(!driverCustomLabels.contains(SPARK_ROLE_LABEL), "Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations.")
val driverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverSecretEnvNamesToKeyRefs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
val driverVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX).map(_.get)
// Also parse executor volumes in order to verify configuration
// before the driver pod is created
KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

val hadoopConfigMapName = sparkConf.get(KUBERNETES_HADOOP_CONF_CONFIG_MAP)
KubernetesUtils.requireNandDefined(
hadoopConfDir,
hadoopConfigMapName,
"Do not specify both the `HADOOP_CONF_DIR` in your ENV and the ConfigMap " +
"as the creation of an additional ConfigMap, when one is already specified is extraneous" )
val hadoopConfSpec =
if (hadoopConfDir.isDefined || hadoopConfigMapName.isDefined) {
Some(HadoopConfSpec(hadoopConfDir, hadoopConfigMapName))
} else {
None
}
val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
maybePyFiles: Option[String]): KubernetesDriverConf = {
// Parse executor volumes in order to verify configuration before the driver pod is created.
KubernetesVolumeUtils.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)


KubernetesConf(
sparkConf.clone(),
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs, pyFiles),
appResourceNamePrefix,
appId,
driverLabels,
driverAnnotations,
driverSecretNamesToMountPaths,
driverSecretEnvNamesToKeyRefs,
driverEnvs,
driverVolumes,
hadoopConfSpec)
val pyFiles = maybePyFiles.map(Utils.stringToSeq).getOrElse(Nil)
new KubernetesDriverConf(sparkConf.clone(), appId, mainAppResource, mainClass, appArgs,
pyFiles)
}

def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
" Spark.")
require(
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
val executorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf, KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap
val executorVolumes = KubernetesVolumeUtils.parseVolumesWithPrefix(
sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX).map(_.get)

// If no prefix is defined then we are in pure client mode
// (not the one used by cluster mode inside the container)
val appResourceNamePrefix = {
if (sparkConf.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key).isEmpty) {
getResourceNamePrefix(getAppName(sparkConf))
} else {
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
}
}
driverPod: Option[Pod]): KubernetesExecutorConf = {
new KubernetesExecutorConf(sparkConf.clone(), appId, executorId, driverPod)
}

KubernetesConf(
sparkConf.clone(),
KubernetesExecutorSpecificConf(executorId, driverPod),
appResourceNamePrefix,
appId,
executorLabels,
executorAnnotations,
executorMountSecrets,
executorEnvSecrets,
executorEnv,
executorVolumes,
None)
def getResourceNamePrefix(appName: String): String = {
val launchTime = System.currentTimeMillis()
s"$appName-$launchTime"
.trim
.toLowerCase(Locale.ROOT)
.replaceAll("\\s+", "-")
.replaceAll("\\.", "-")
.replaceAll("[^a-z0-9\\-]", "")
.replaceAll("-+", "-")
}
}
Loading

0 comments on commit 5d05f59

Please sign in to comment.