Skip to content

Commit

Permalink
SPARK-2645: Fix for SparkContext stop behavior
Browse files Browse the repository at this point in the history
  • Loading branch information
rekhajoshm committed Jun 25, 2015
1 parent b566b66 commit 380c5b0
Showing 1 changed file with 39 additions and 37 deletions.
76 changes: 39 additions & 37 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import org.apache.spark.storage._
import org.apache.spark.unsafe.memory.{ExecutorMemoryManager, MemoryAllocator}
import org.apache.spark.util.{RpcUtils, Utils}

import scala.util.control.NonFatal

/**
* :: DeveloperApi ::
* Holds all the runtime environment objects for a running Spark instance (either master or worker),
Expand Down Expand Up @@ -91,46 +93,46 @@ class SparkEnv (

private[spark] def stop() {

if(isStopped) return

isStopped = true
try {
pythonWorkers.foreach { case (key, worker) => worker.stop()}
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()
} catch {
case e: Exception =>
logInfo("Exception while SparkEnv stop", e)
}
if(!isStopped) {
isStopped = true
try {
pythonWorkers.foreach { case (key, worker) => worker.stop()}
Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
blockManager.stop()
blockManager.master.stop()
metricsSystem.stop()
outputCommitCoordinator.stop()
rpcEnv.shutdown()
} catch {
case NonFatal(e) =>
logInfo("Exception while SparkEnv stop", e)
}

// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.

// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()

// Note that blockTransferService is stopped by BlockManager since it is started by it.

// If we only stop sc, but the driver process still run as a services then we need to delete
// the tmp dir, if not, it will create too many tmp dirs.
// We only need to delete the tmp dir create by driver, because sparkFilesDir is point to the
// current working dir in executor which we do not need to delete.
driverTmpDirToDelete match {
case Some(path) => {
try {
Utils.deleteRecursively(new File(path))
} catch {
case e: Exception =>
logWarning(s"Exception while deleting Spark temp dir: $path", e)
}
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
case None => // We just need to delete tmp dir created by driver, so do nothing on executor
}
}

Expand Down

0 comments on commit 380c5b0

Please sign in to comment.