Skip to content

Commit

Permalink
Merge pull request apache#14 from ringtail/feature/spark-lint
Browse files Browse the repository at this point in the history
add .scalafmt.conf and refactor code
  • Loading branch information
ringtail committed Jun 5, 2020
2 parents 91006e7 + 96e06f5 commit bb81f3c
Show file tree
Hide file tree
Showing 21 changed files with 447 additions and 720 deletions.
29 changes: 29 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

align = none
align.openParenDefnSite = false
align.openParenCallSite = false
align.tokens = []
optIn = {
configStyleArguments = false
}
danglingParentheses = false
docstrings = JavaDoc
maxColumn = 98
version = 2.5.3

Original file line number Diff line number Diff line change
Expand Up @@ -16,60 +16,59 @@
*/
package org.apache.spark.deploy.k8s

import scala.collection.mutable

import io.fabric8.kubernetes.api.model.{
LocalObjectReference,
LocalObjectReferenceBuilder,
Pod,
Toleration
}

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.deploy.k8s.submit._
import org.apache.spark.deploy.k8s.submit.KubernetesClientApplication._
import org.apache.spark.internal.config.ConfigEntry

import scala.collection.mutable

private[spark] sealed trait KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark driver.
*/
private[spark] case class KubernetesDriverSpecificConf(
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String]
) extends KubernetesRoleSpecificConf
mainAppResource: Option[MainAppResource],
mainClass: String,
appName: String,
appArgs: Seq[String])
extends KubernetesRoleSpecificConf

/*
* Structure containing metadata for Kubernetes logic that builds a Spark executor.
*/
private[spark] case class KubernetesExecutorSpecificConf(executorId: String,
driverPod: Option[Pod])
private[spark] case class KubernetesExecutorSpecificConf(
executorId: String,
driverPod: Option[Pod])
extends KubernetesRoleSpecificConf

/**
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
* Structure containing metadata for Kubernetes logic to build Spark pods.
*/
private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
sparkConf: SparkConf,
roleSpecificConf: T,
appResourceNamePrefix: String,
appId: String,
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[
KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]
],
driverTolerations: Seq[Toleration],
executorTolerations: Seq[Toleration],
sparkFiles: Seq[String]
) {
sparkConf: SparkConf,
roleSpecificConf: T,
appResourceNamePrefix: String,
appId: String,
roleLabels: Map[String, String],
roleAnnotations: Map[String, String],
roleSecretNamesToMountPaths: Map[String, String],
roleSecretEnvNamesToKeyRefs: Map[String, String],
roleEnvs: Map[String, String],
roleVolumes: Iterable[KubernetesVolumeSpec[_ <: KubernetesVolumeSpecificConf]],
driverTolerations: Seq[Toleration],
executorTolerations: Seq[Toleration],
sparkFiles: Seq[String]) {

def namespace(): String = sparkConf.get(KUBERNETES_NAMESPACE)

Expand Down Expand Up @@ -109,10 +108,7 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
}

def nodeSelector(): Map[String, String] =
KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_NODE_SELECTOR_PREFIX
)
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_NODE_SELECTOR_PREFIX)

def get[T](config: ConfigEntry[T]): T = sparkConf.get(config)

Expand All @@ -125,16 +121,16 @@ private[spark] case class KubernetesConf[T <: KubernetesRoleSpecificConf](
}

private[spark] object KubernetesConf {

def createDriverConf(
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String]
): KubernetesConf[KubernetesDriverSpecificConf] = {
sparkConf: SparkConf,
appName: String,
appResourceNamePrefix: String,
appId: String,
mainAppResource: Option[MainAppResource],
mainClass: String,
appArgs: Array[String],
maybePyFiles: Option[String]): KubernetesConf[KubernetesDriverSpecificConf] = {
val sparkConfWithMainAppJar = sparkConf.clone()
val additionalFiles = mutable.ArrayBuffer.empty[String]
mainAppResource.foreach {
Expand All @@ -155,52 +151,39 @@ private[spark] object KubernetesConf {
maybePyFiles.foreach { maybePyFiles =>
additionalFiles.appendAll(maybePyFiles.split(","))
}
sparkConfWithMainAppJar.set(
KUBERNETES_PYSPARK_MAIN_APP_RESOURCE,
res
)
sparkConfWithMainAppJar.set(KUBERNETES_PYSPARK_MAIN_APP_RESOURCE, res)
case RMainAppResource(res) =>
additionalFiles += res
sparkConfWithMainAppJar.set(KUBERNETES_R_MAIN_APP_RESOURCE, res)
}
sparkConfWithMainAppJar.setIfMissing(MEMORY_OVERHEAD_FACTOR, 0.4)
}

val driverCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_LABEL_PREFIX
)
val driverCustomLabels =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_LABEL_PREFIX)
require(
!driverCustomLabels.contains(SPARK_APP_ID_LABEL),
"Label with key " +
s"$SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations."
)
"operations.")
require(
!driverCustomLabels.contains(SPARK_ROLE_LABEL),
"Label with key " +
s"$SPARK_ROLE_LABEL is not allowed as it is reserved for Spark bookkeeping " +
"operations."
)
"operations.")
val driverLabels = driverCustomLabels ++ Map(
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE
)
val driverAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_ANNOTATION_PREFIX
)
SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE)
val driverAnnotations =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ANNOTATION_PREFIX)
val driverSecretNamesToMountPaths = KubernetesUtils
.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_SECRETS_PREFIX)
val driverSecretEnvNamesToKeyRefs =
KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX
)
val driverEnvs = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_ENV_PREFIX
)
KUBERNETES_DRIVER_SECRET_KEY_REF_PREFIX)
val driverEnvs =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_ENV_PREFIX)
val driverVolumes = KubernetesVolumeUtils
.parseVolumesWithPrefix(sparkConf, KUBERNETES_DRIVER_VOLUMES_PREFIX)
.map(_.get)
Expand All @@ -211,10 +194,8 @@ private[spark] object KubernetesConf {
.map(_.get)

// parse driver tolerations from sparkConf
val driverTolerationsMap = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_DRIVER_TOLERATION_PREFIX
)
val driverTolerationsMap =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_DRIVER_TOLERATION_PREFIX)

val driverTolerations =
KubernetesUtils.parseTolerations(driverTolerationsMap)
Expand All @@ -226,12 +207,7 @@ private[spark] object KubernetesConf {

KubernetesConf(
sparkConfWithMainAppJar,
KubernetesDriverSpecificConf(
mainAppResource,
mainClass,
appName,
appArgs
),
KubernetesDriverSpecificConf(mainAppResource, mainClass, appName, appArgs),
appResourceNamePrefix,
appId,
driverLabels,
Expand All @@ -242,51 +218,38 @@ private[spark] object KubernetesConf {
driverVolumes,
driverTolerations,
Seq.empty[Toleration],
sparkFiles
)
sparkFiles)
}

def createExecutorConf(
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Option[Pod]
): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_LABEL_PREFIX
)
sparkConf: SparkConf,
executorId: String,
appId: String,
driverPod: Option[Pod]): KubernetesConf[KubernetesExecutorSpecificConf] = {
val executorCustomLabels =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_LABEL_PREFIX)
require(
!executorCustomLabels.contains(SPARK_APP_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark."
)
s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.")
require(
!executorCustomLabels.contains(SPARK_EXECUTOR_ID_LABEL),
s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" +
" Spark."
)
" Spark.")
require(
!executorCustomLabels.contains(SPARK_ROLE_LABEL),
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark."
)
s"Custom executor labels cannot contain $SPARK_ROLE_LABEL as it is reserved for Spark.")
val executorLabels = Map(
SPARK_EXECUTOR_ID_LABEL -> executorId,
SPARK_APP_ID_LABEL -> appId,
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE
) ++
SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++
executorCustomLabels
val executorAnnotations = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_ANNOTATION_PREFIX
)
val executorMountSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_SECRETS_PREFIX
)
val executorAnnotations =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_ANNOTATION_PREFIX)
val executorMountSecrets =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_SECRETS_PREFIX)
val executorEnvSecrets = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX
)
KUBERNETES_EXECUTOR_SECRET_KEY_REF_PREFIX)
val executorEnv = sparkConf.getExecutorEnv.toMap
val executorVolumes = KubernetesVolumeUtils
.parseVolumesWithPrefix(sparkConf, KUBERNETES_EXECUTOR_VOLUMES_PREFIX)
Expand All @@ -296,18 +259,16 @@ private[spark] object KubernetesConf {
// (not the one used by cluster mode inside the container)
val appResourceNamePrefix = {
if (sparkConf
.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key)
.isEmpty) {
.getOption(KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key)
.isEmpty) {
getResourceNamePrefix(getAppName(sparkConf))
} else {
sparkConf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX)
}
}

val executorTolerationsMap = KubernetesUtils.parsePrefixedKeyValuePairs(
sparkConf,
KUBERNETES_EXECUTOR_TOLERATION_PREFIX
)
val executorTolerationsMap =
KubernetesUtils.parsePrefixedKeyValuePairs(sparkConf, KUBERNETES_EXECUTOR_TOLERATION_PREFIX)

val executorTolerations =
KubernetesUtils.parseTolerations(executorTolerationsMap)
Expand All @@ -325,7 +286,6 @@ private[spark] object KubernetesConf {
executorVolumes,
Seq.empty[Toleration],
executorTolerations,
Seq.empty[String]
)
Seq.empty[String])
}
}
Loading

0 comments on commit bb81f3c

Please sign in to comment.