Skip to content

Commit

Permalink
[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
Browse files Browse the repository at this point in the history
This change adds some new utility code to handle shutdown hooks in
Spark. The main goal is to take advantage of Hadoop 2.x's API for
shutdown hooks, which allows Spark to register a hook that will
run before the one that cleans up HDFS clients, and thus avoids
some races that would cause exceptions to show up and other issues
such as failure to properly close event logs.

Unfortunately, Hadoop 1.x does not have such APIs, so in that case
correctness is still left to chance.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes apache#5560 from vanzin/SPARK-6014 and squashes the following commits:

edfafb1 [Marcelo Vanzin] Better scaladoc.
fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014
e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.
  • Loading branch information
Marcelo Vanzin authored and nemccarthy committed Jun 19, 2015
1 parent 0f2663f commit 0999d00
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.SignalLogger
import org.apache.spark.util.{SignalLogger, Utils}

/**
* A web server that renders SparkUIs of completed applications.
Expand Down Expand Up @@ -194,9 +194,7 @@ object HistoryServer extends Logging {
val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()

Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
override def run(): Unit = server.stop()
})
Utils.addShutdownHook { () => server.stop() }

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.io.Files
import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
import org.apache.spark.util.Utils
import org.apache.spark.util.logging.FileAppender

/**
Expand Down Expand Up @@ -61,20 +62,15 @@ private[deploy] class ExecutorRunner(

// NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
// make sense to remove this in the future.
private var shutdownHook: Thread = null
private var shutdownHook: AnyRef = null

private[worker] def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
}

/**
Expand Down Expand Up @@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
workerThread = null
state = ExecutorState.KILLED
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
Utils.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
}
}

private def addShutdownHook(): Thread = {
val shutdownHook = new Thread("delete Spark local dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
private def addShutdownHook(): AnyRef = {
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
DiskBlockManager.this.doStop()
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
shutdownHook
}

/** Cleanup local dirs and stop shuffle sender. */
private[spark] def stop() {
// Remove the shutdown hook. It causes memory leaks if we leave it around.
try {
Runtime.getRuntime.removeShutdownHook(shutdownHook)
} catch {
case e: IllegalStateException => None
}
Utils.removeShutdownHook(shutdownHook)
doStop()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(

private def addShutdownHook() {
tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
Utils.addShutdownHook { () =>
logDebug("Shutdown hook called")
tachyonDirs.foreach { tachyonDir =>
try {
if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
Utils.deleteRecursively(tachyonDir, client)
}
} catch {
case e: Exception =>
logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
}
client.close()
}
})
client.close()
}
}
}
136 changes: 118 additions & 18 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.lang.management.ManagementFactory
import java.net._
import java.nio.ByteBuffer
import java.util.{Properties, Locale, Random, UUID}
import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
import java.util.concurrent._
import javax.net.ssl.HttpsURLConnection

Expand All @@ -30,7 +30,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import scala.util.Try
import scala.util.{Failure, Success, Try}
import scala.util.control.{ControlThrowable, NonFatal}

import com.google.common.io.{ByteStreams, Files}
Expand Down Expand Up @@ -64,9 +64,15 @@ private[spark] object CallSite {
private[spark] object Utils extends Logging {
val random = new Random()

val DEFAULT_SHUTDOWN_PRIORITY = 100

private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
@volatile private var localRootDirs: Array[String] = null


private val shutdownHooks = new SparkShutdownHookManager()
shutdownHooks.install()

/** Serialize an object using Java serialization */
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream()
Expand Down Expand Up @@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()

// Add a shutdown hook to delete the temp dirs when the JVM exits
Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
override def run(): Unit = Utils.logUncaughtExceptions {
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
addShutdownHook { () =>
logDebug("Shutdown hook called")
shutdownDeletePaths.foreach { dirPath =>
try {
Utils.deleteRecursively(new File(dirPath))
} catch {
case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
}
}
})
}

// Register the path to be deleted via shutdown hook
def registerShutdownDeleteDir(file: File) {
Expand Down Expand Up @@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
}
Utils.setupSecureURLConnection(uc, securityMgr)

val timeoutMs =
val timeoutMs =
conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
uc.setConnectTimeout(timeoutMs)
uc.setReadTimeout(timeoutMs)
Expand Down Expand Up @@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
/**
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
* default UncaughtExceptionHandler
*
*
* NOTE: This method is to be called by the spark-started JVM process.
*/
def tryOrExit(block: => Unit) {
Expand All @@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
}

/**
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
* exception
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
*
* NOTE: This method is to be called by the driver-side components to avoid stopping the
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
* spark-started JVM process .
*/
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
Expand Down Expand Up @@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging {
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
}

/**
* Adds a shutdown hook with default priority.
*
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
}

/**
* Adds a shutdown hook with the given priority. Hooks with lower priority values run
* first.
*
* @param hook The code to run during shutdown.
* @return A handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
shutdownHooks.add(priority, hook)
}

/**
* Remove a previously installed shutdown hook.
*
* @param ref A handle returned by `addShutdownHook`.
* @return Whether the hook was removed.
*/
def removeShutdownHook(ref: AnyRef): Boolean = {
shutdownHooks.remove(ref)
}

}

private [util] class SparkShutdownHookManager {

private val hooks = new PriorityQueue[SparkShutdownHook]()
private var shuttingDown = false

/**
* Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
* have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
* the best.
*/
def install(): Unit = {
val hookTask = new Runnable() {
override def run(): Unit = runAll()
}
Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
case Success(shmClass) =>
val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
.asInstanceOf[Int]
val shm = shmClass.getMethod("get").invoke(null)
shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
.invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))

case Failure(_) =>
Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
}
}

def runAll(): Unit = synchronized {
shuttingDown = true
while (!hooks.isEmpty()) {
Utils.logUncaughtExceptions(hooks.poll().run())
}
}

def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
checkState()
val hookRef = new SparkShutdownHook(priority, hook)
hooks.add(hookRef)
hookRef
}

def remove(ref: AnyRef): Boolean = synchronized {
checkState()
hooks.remove(ref)
}

private def checkState(): Unit = {
if (shuttingDown) {
throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
}
}

}

private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
extends Comparable[SparkShutdownHook] {

override def compareTo(other: SparkShutdownHook): Int = {
other.priority - priority
}

def run(): Unit = hook()

}

/**
Expand Down
Loading

0 comments on commit 0999d00

Please sign in to comment.