Skip to content

Commit

Permalink
Added GC to components (apache#56)
Browse files Browse the repository at this point in the history
  • Loading branch information
foxish authored and ash211 committed Jan 27, 2017
1 parent 422dceb commit c3428f7
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 2 deletions.
2 changes: 1 addition & 1 deletion resource-managers/kubernetes/core/pom.xml
Expand Up @@ -29,7 +29,7 @@
<name>Spark Project Kubernetes</name>
<properties>
<sbt.project.name>kubernetes</sbt.project.name>
<kubernetes.client.version>1.4.17</kubernetes.client.version>
<kubernetes.client.version>1.4.34</kubernetes.client.version>
</properties>

<dependencies>
Expand Down
Expand Up @@ -123,6 +123,8 @@ private[spark] class Client(
.endSpec()
.done()
sparkConf.set("spark.kubernetes.driver.service.name", service.getMetadata.getName)
sparkConf.set("spark.kubernetes.driver.pod.name", kubernetesAppId)

sparkConf.setIfMissing("spark.driver.port", DEFAULT_DRIVER_PORT.toString)
sparkConf.setIfMissing("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT.toString)
val submitRequest = buildSubmissionRequest()
Expand All @@ -131,6 +133,23 @@ private[spark] class Client(

val podWatcher = new Watcher[Pod] {
override def eventReceived(action: Action, t: Pod): Unit = {
if (action == Action.ADDED) {
val ownerRefs = new ArrayBuffer[OwnerReference]
ownerRefs += new OwnerReferenceBuilder()
.withApiVersion(t.getApiVersion)
.withController(true)
.withKind(t.getKind)
.withName(t.getMetadata.getName)
.withUid(t.getMetadata.getUid)
.build()

secret.getMetadata().setOwnerReferences(ownerRefs.asJava)
kubernetesClient.secrets().createOrReplace(secret)

service.getMetadata().setOwnerReferences(ownerRefs.asJava)
kubernetesClient.services().createOrReplace(service)
}

if ((action == Action.ADDED || action == Action.MODIFIED)
&& t.getStatus.getPhase == "Running"
&& !submitCompletedFuture.isDone) {
Expand Down
Expand Up @@ -60,6 +60,11 @@ private[spark] class KubernetesClusterSchedulerBackend(
.getOrElse(
throw new SparkException("Must specify the service name the driver is running with"))

private val kubernetesDriverPodName = conf
.getOption("spark.kubernetes.driver.pod.name")
.getOrElse(
throw new SparkException("Must specify the driver pod name"))

private val executorMemory = conf.getOption("spark.executor.memory").getOrElse("1g")
private val executorMemoryBytes = Utils.byteStringAsBytes(executorMemory)

Expand All @@ -82,6 +87,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
private val kubernetesClient = KubernetesClientBuilder
.buildFromWithinPod(kubernetesMaster, kubernetesNamespace)

val driverPod = try {
kubernetesClient.pods().inNamespace(kubernetesNamespace).
withName(kubernetesDriverPodName).get()
} catch {
case throwable: Throwable =>
logError(s"Executor cannot find driver pod.", throwable)
throw new SparkException(s"Executor cannot find driver pod", throwable)
}

override val minRegisteredRatio =
if (conf.getOption("spark.scheduler.minRegisteredResourcesRatio").isEmpty) {
0.8
Expand Down Expand Up @@ -202,7 +216,15 @@ private[spark] class KubernetesClusterSchedulerBackend(
.withNewMetadata()
.withName(name)
.withLabels(selectors)
.endMetadata()
.withOwnerReferences()
.addNewOwnerReference()
.withController(true)
.withApiVersion(driverPod.getApiVersion)
.withKind(driverPod.getKind)
.withName(driverPod.getMetadata.getName)
.withUid(driverPod.getMetadata.getUid)
.endOwnerReference()
.endMetadata()
.withNewSpec()
.addNewContainer()
.withName(s"exec-${applicationId()}-container")
Expand Down

0 comments on commit c3428f7

Please sign in to comment.