Skip to content

Commit

Permalink
Limit number of live applications + add configurability
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Apr 7, 2014
1 parent a3598de commit 248cb3d
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 26 deletions.
8 changes: 6 additions & 2 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ DEFAULT_MEM=${SPARK_MEM:-512m}

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.akka.logLifecycleEvents=true"

# Add java opts and memory settings for master, worker, executors, and repl.
# Add java opts and memory settings for master, worker, history server, executors, and repl.
case "$1" in
# Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
# Master, Worker, and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
'org.apache.spark.deploy.master.Master')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_MASTER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
Expand All @@ -58,6 +58,10 @@ case "$1" in
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_WORKER_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;
'org.apache.spark.deploy.history.HistoryServer')
OUR_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS $SPARK_HISTORY_OPTS"
OUR_JAVA_MEM=${SPARK_DAEMON_MEMORY:-$DEFAULT_MEM}
;;

# Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
'org.apache.spark.executor.CoarseGrainedExecutorBackend')
Expand Down
7 changes: 5 additions & 2 deletions bin/spark-class2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ if "x%OUR_JAVA_MEM%"=="x" set OUR_JAVA_MEM=512m

set SPARK_DAEMON_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% -Dspark.akka.logLifecycleEvents=true

rem Add java opts and memory settings for master, worker, executors, and repl.
rem Master and Worker use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
rem Add java opts and memory settings for master, worker, history server, executors, and repl.
rem Master, Worker and HistoryServer use SPARK_DAEMON_JAVA_OPTS (and specific opts) + SPARK_DAEMON_MEMORY.
if "%1"=="org.apache.spark.deploy.master.Master" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_MASTER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.worker.Worker" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_WORKER_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%
) else if "%1"=="org.apache.spark.deploy.history.HistoryServer" (
set OUR_JAVA_OPTS=%SPARK_DAEMON_JAVA_OPTS% %SPARK_HISTORY_OPTS%
if not "x%SPARK_DAEMON_MEMORY%"=="x" set OUR_JAVA_MEM=%SPARK_DAEMON_MEMORY%

rem Executors use SPARK_JAVA_OPTS + SPARK_EXECUTOR_MEMORY.
) else if "%1"=="org.apache.spark.executor.CoarseGrainedExecutorBackend" (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,28 +47,36 @@ import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}
* @param baseLogDir The base directory in which event logs are found
* @param requestedPort The requested port to which this server is to be bound
*/
class HistoryServer(val baseLogDir: String, requestedPort: Int)
class HistoryServer(
val baseLogDir: String,
requestedPort: Int,
conf: SparkConf)
extends SparkUIContainer("History Server") with Logging {

import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = requestedPort
private val conf = new SparkConf
private val securityManager = new SecurityManager(conf)
private val indexPage = new IndexPage(this)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheck = -1L

private val handlers = Seq[ServletContextHandler](
createStaticHandler(HistoryServer.STATIC_RESOURCE_DIR, "/static"),
createStaticHandler(STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), securityMgr = securityManager)
)

// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

// A set of recently removed applications that the server should avoid re-rendering
val appIdBlacklist = mutable.HashSet[String]()

/** Bind to the HTTP server behind this web interface */
override def bind() {
try {
Expand Down Expand Up @@ -99,22 +107,28 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
val logStatus = fileSystem.listStatus(new Path(baseLogDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

// Forget about any SparkUIs that can no longer be found
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
appIdBlacklist.clear()
}
}
appIdBlacklist.retain(appIds.contains)

// Render SparkUI for any new completed applications
logDirs.foreach { dir =>
val path = dir.getPath.toString
val appId = getAppId(path)
val lastUpdated = getModificationTime(dir)
if (!appIdToInfo.contains(appId)) {
if (!appIdToInfo.contains(appId) && !appIdBlacklist.contains(appId)) {
maybeRenderUI(appId, path, lastUpdated)
}
}

// Remove any outdated SparkUIs
val appIds = logDirs.map { dir => getAppId(dir.getPath.toString) }
appIdToInfo.foreach { case (appId, info) =>
if (!appIds.contains(appId)) {
detachUI(info.ui)
appIdToInfo.remove(appId)
// If the cap is reached, remove the least recently updated application
if (appIdToInfo.size > RETAINED_APPLICATIONS) {
removeOldestApp()
}
}
}
Expand Down Expand Up @@ -144,14 +158,17 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay()
if (success) {
if (success && appListener.applicationStarted) {
attachUI(ui)
val appName = if (appListener.applicationStarted) appListener.appName else appId
val appName = appListener.appName
ui.setAppName("%s (finished)".format(appName))
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
appIdToInfo(appId) = info
} else {
logWarning("Reconstructing application UI was unsuccessful. Either no event logs were" +
"found or the event signaling application start is missing: %s".format(logPath))
}
} else {
logWarning("Skipping incomplete application: %s".format(logPath))
Expand All @@ -168,7 +185,10 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
def getAppId(logPath: String): String = logPath.split("/").last

/** Return the address of this server. */
def getAddress = "http://" + publicHost + ":" + boundPort
def getAddress: String = "http://" + publicHost + ":" + boundPort

/** Return the total number of application logs found, blacklisted or not. */
def getTotalApplications: Int = appIdToInfo.size + appIdBlacklist.size

/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
Expand All @@ -180,12 +200,26 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
}
}

/**
* Remove the oldest application and detach its associated UI. As an optimization, add the
* application to a blacklist to avoid re-rendering it the next time.
*/
private def removeOldestApp() {
val appToRemove = appIdToInfo.toSeq.minBy { case (_, info) => info.lastUpdated }
appToRemove match { case (id, info) =>
appIdToInfo.remove(id)
detachUI(info.ui)
appIdBlacklist.add(id)
}
}

/** Return whether the last log check has happened sufficiently long ago. */
private def logCheckReady: Boolean = {
System.currentTimeMillis - lastLogCheck > HistoryServer.UPDATE_INTERVAL_SECONDS * 1000
System.currentTimeMillis - lastLogCheck > UPDATE_INTERVAL_SECONDS * 1000
}
}


/**
* The recommended way of starting and stopping a HistoryServer is through the scripts
* start-history-server.sh and stop-history-server.sh. The path to a base log directory
Expand All @@ -197,14 +231,19 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
* This launches the HistoryServer as a Spark daemon.
*/
object HistoryServer {
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR
private val conf = new SparkConf

// Minimum interval between each check for logs, which requires a disk access
val UPDATE_INTERVAL_SECONDS = 5
// Minimum interval between each check for logs, which requires a disk access (seconds)
private val UPDATE_INTERVAL_SECONDS = conf.getInt("spark.history.updateInterval", 5)

// How many applications to retain
private val RETAINED_APPLICATIONS = conf.getInt("spark.deploy.retainedApplications", 20)

private val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, args.port)
val server = new HistoryServer(args.logDir, args.port, conf)
server.bind()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
Expand All @@ -213,6 +252,7 @@ object HistoryServer {
}
}


private[spark] case class ApplicationHistoryInfo(
name: String,
startTime: Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,10 @@ private[spark] class IndexPage(parent: HistoryServer) {
<div class="span12">
<ul class="unstyled">
<li><strong>Event Log Location: </strong> {parent.baseLogDir}</li>
<br></br>
<h4>Finished Applications</h4> {appTable}
<h4>
Showing {parent.appIdToInfo.size}/{parent.getTotalApplications} Finished Applications
</h4>
{appTable}
</ul>
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ private[spark] class Master(

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.start()
val success = replayBus.replay()
if (success) Some(ui) else None
}
Expand Down

0 comments on commit 248cb3d

Please sign in to comment.