Skip to content

Commit

Permalink
Minor changes
Browse files Browse the repository at this point in the history
Rename ApplicationListener to ApplicationEventListener, and stop
ReplaySparkListener in Master after use.
  • Loading branch information
andrewor14 committed Mar 24, 2014
1 parent a9eae7e commit 1b2f391
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.deploy.SparkUIContainer
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.Utils
import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus}
import org.apache.spark.scheduler.{ApplicationEventListener, ReplayListenerBus}

/**
* A web server that renders SparkUIs of finished applications.
Expand Down Expand Up @@ -131,7 +131,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int)
// If the application completion file is found
if (replayBus.isApplicationComplete) {
val ui = new SparkUI(replayBus, appId, "/history/%s".format(appId))
val appListener = new ApplicationListener
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)

// Do not call ui.bind() to avoid creating a new server for each application
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,7 @@ private[spark] class Master(
// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
val success = replayBus.replay()
replayBus.stop()
if (success) Some(ui) else None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.apache.spark.scheduler
* SparkListenerApplicationEnd will be received. Otherwise, only the latest event
* of each type will take effect.
*/
private[spark] class ApplicationListener extends SparkListener {
private[spark] class ApplicationEventListener extends SparkListener {
var appName = "<Not Started>"
var startTime = -1L
var endTime = -1L
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

/**
* An EventBus that replays logged events from persisted storage.
* A SparkListenerBus that replays logged events from persisted storage.
*
* This class expects files to be appropriately prefixed as specified in EventLoggingListener.
* There exists a one-to-one mapping between ReplayListenerBus and event logging applications.
Expand Down Expand Up @@ -64,10 +64,9 @@ private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus
conf.set("spark.io.compression.codec", codec)
CompressionCodec.createCodec(conf)
}
applicationComplete =
filePaths.exists { file =>
EventLoggingListener.isApplicationCompleteFile(file.getName)
}
applicationComplete = filePaths.exists { file =>
EventLoggingListener.isApplicationCompleteFile(file.getName)
}
started = true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
val eventLogDir = sc.eventLogger.map { logger => Some(logger.logDir) }.getOrElse(None)
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, eventLogDir)
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))

client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ private[spark] class SparkUI(
// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener

def setAppName(name: String) = appName = name
def setAppName(name: String) {
appName = name
}

/** Initialize all components of the server */
def start() {
Expand Down

0 comments on commit 1b2f391

Please sign in to comment.