Skip to content

Commit

Permalink
Wide refactoring of WebUI, UITab, and UIPage (see commit message)
Browse files Browse the repository at this point in the history
The biggest changes include
(1) Decoupling the SparkListener from any member of the hierarchy. This was
    previously arbitrarily tied to the UITab.
(2) Decoupling initializing a UITab from attaching it to a WebUI. This involves
    having each UITab initializing itself instead.
(3) Add an abstract parent for each UITab. This allows us to move the access
    of header tabs of the UI into the UITab abstract class itself.
(4) Abstract bind() logic into WebUI.
(5) Renaming UITab -> WebUITab, and UIPage -> WebUIPage.
  • Loading branch information
andrewor14 committed Apr 11, 2014
1 parent 6de06b0 commit 548c98c
Show file tree
Hide file tree
Showing 32 changed files with 164 additions and 215 deletions.
1 change: 0 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Expand Up @@ -212,7 +212,6 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.start()
ui.bind()

// Optionally log Spark events
Expand Down
Expand Up @@ -17,12 +17,9 @@

package org.apache.spark.deploy.history

import javax.servlet.http.HttpServletRequest

import scala.collection.mutable

import org.apache.hadoop.fs.{FileStatus, Path}
import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler._
Expand All @@ -45,15 +42,15 @@ import org.apache.spark.util.Utils
*/
class HistoryServer(
val baseLogDir: String,
securityManager: SecurityManager,
conf: SparkConf)
extends WebUI(new SecurityManager(conf)) with Logging {
extends WebUI(securityManager, HistoryServer.WEB_UI_PORT, conf) with Logging {

import HistoryServer._

private val fileSystem = Utils.getHadoopFileSystem(baseLogDir)
private val localHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(localHost)
private val port = WEB_UI_PORT

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
Expand Down Expand Up @@ -90,30 +87,20 @@ class HistoryServer(
// A mapping of application ID to its history information, which includes the rendered UI
val appIdToInfo = mutable.HashMap[String, ApplicationHistoryInfo]()

initialize()

/**
* Start the history server.
* Initialize the history server.
*
* This starts a background thread that periodically synchronizes information displayed on
* this UI with the event logs in the provided base directory.
*/
def start() {
def initialize() {
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(STATIC_RESOURCE_DIR, "/static"))
logCheckingThread.start()
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, conf))
logInfo("Started HistoryServer at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to bind HistoryServer", e)
System.exit(1)
}
}

/**
* Check for any updates to event logs in the base directory. This is only effective once
* the server has been bound.
Expand Down Expand Up @@ -179,12 +166,11 @@ class HistoryServer(
val path = logDir.getPath
val appId = path.getName
val replayBus = new ReplayListenerBus(logInfo.logPaths, fileSystem, logInfo.compressionCodec)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)
val appListener = new ApplicationEventListener
replayBus.addListener(appListener)
val ui = new SparkUI(conf, replayBus, appId, "/history/" + appId)

// Do not call ui.bind() to avoid creating a new server for each application
ui.start()
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
Expand Down Expand Up @@ -267,9 +253,9 @@ object HistoryServer {

def main(argStrings: Array[String]) {
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, conf)
val securityManager = new SecurityManager(conf)
val server = new HistoryServer(args.logDir, securityManager, conf)
server.bind()
server.start()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
Expand Down
Expand Up @@ -21,9 +21,9 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class IndexPage(parent: HistoryServer) extends UIPage("") {
private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") {

override def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
Expand Down
Expand Up @@ -118,7 +118,6 @@ private[spark] class Master(
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
Expand Down Expand Up @@ -670,7 +669,6 @@ private[spark] class Master(
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(
new SparkConf, replayBus, appName + " (completed)", "/history/" + app.id)
ui.start()
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
Expand Down
Expand Up @@ -28,11 +28,11 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI)
extends UIPage("app", includeJson = true) {
extends WebUIPage("app", includeJson = true) {

private val master = parent.masterActorRef
private val timeout = parent.timeout
Expand Down
Expand Up @@ -28,10 +28,10 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) {
private val master = parent.masterActorRef
private val timeout = parent.timeout

Expand Down
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{SparkUI, WebUI}
Expand All @@ -30,34 +28,22 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr) with Logging {
extends WebUI(master.securityMgr, requestedPort, master.conf) with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Master web UI", e)
System.exit(1)
}
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
Expand Down
Expand Up @@ -130,7 +130,6 @@ private[spark] class Worker(
createWorkDir()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
webUi.bind()
registerWithMaster()

Expand Down
Expand Up @@ -28,10 +28,10 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
Expand Down
Expand Up @@ -22,10 +22,10 @@ import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") {
private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
private val worker = parent.worker
private val workDir = parent.workDir

Expand Down
Expand Up @@ -20,26 +20,29 @@ package org.apache.spark.deploy.worker.ui
import java.io.File
import javax.servlet.http.HttpServletRequest

import org.apache.spark.Logging
import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.worker.Worker
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}
import org.apache.spark.util.AkkaUtils

/**
* Web UI server for the standalone worker.
*/
private[spark]
class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None)
extends WebUI(worker.securityMgr) with Logging {
class WorkerWebUI(
val worker: Worker,
val workDir: File,
port: Option[Int] = None)
extends WebUI(worker.securityMgr, WorkerWebUI.getUIPort(port, worker.conf), worker.conf)
with Logging {

private val host = Utils.localHostName()
private val port = requestedPort.getOrElse(
worker.conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
val timeout = AkkaUtils.askTimeout(worker.conf)

initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
val logPage = new LogPage(this)
attachPage(logPage)
attachPage(new IndexPage(this))
Expand All @@ -48,21 +51,13 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, worker.conf))
logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Worker web UI", e)
System.exit(1)
}
}
}

private[spark] object WorkerWebUI {
val DEFAULT_PORT = 8081
val STATIC_RESOURCE_BASE = SparkUI.STATIC_RESOURCE_DIR

def getUIPort(requestedPort: Option[Int], conf: SparkConf): Int = {
requestedPort.getOrElse(conf.getInt("worker.ui.port", WorkerWebUI.DEFAULT_PORT))
}
}
45 changes: 17 additions & 28 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Expand Up @@ -25,19 +25,19 @@ import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.JobProgressTab
import org.apache.spark.ui.storage.BlockManagerTab
import org.apache.spark.util.Utils

/**
* Top level user interface for Spark.
*/
private[spark] class SparkUI(
val sc: SparkContext,
conf: SparkConf,
val conf: SparkConf,
val securityManager: SecurityManager,
val listenerBus: SparkListenerBus,
var appName: String,
val basePath: String = "")
extends WebUI(securityManager, basePath) with Logging {
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath)
with Logging {

def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
Expand All @@ -46,21 +46,14 @@ private[spark] class SparkUI(
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null

private val bindHost = Utils.localHostName()
private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost)
private val port = conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)

// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
listenerBus.addListener(storageStatusListener)

/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
}
initialize()

/** Initialize all components of the server. */
def start() {
def initialize() {
listenerBus.addListener(storageStatusListener)
attachTab(new JobProgressTab(this))
attachTab(new BlockManagerTab(this))
attachTab(new EnvironmentTab(this))
Expand All @@ -72,22 +65,14 @@ private[spark] class SparkUI(
}
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, sc.conf))
logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Spark web UI", e)
System.exit(1)
}
/** Set the app name for this UI. */
def setAppName(name: String) {
appName = name
}

/** Attach a tab to this UI, along with its corresponding listener if it exists. */
override def attachTab(tab: UITab) {
super.attachTab(tab)
tab.listener.foreach(listenerBus.addListener)
/** Register the given listener with the listener bus. */
def registerListener(listener: SparkListener) {
listenerBus.addListener(listener)
}

/** Stop the server behind this web interface. Only valid after bind(). */
Expand All @@ -96,10 +81,14 @@ private[spark] class SparkUI(
logInfo("Stopped Spark web UI at %s".format(appUIAddress))
}

private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort
private[spark] def appUIAddress = "http://" + publicHostName + ":" + boundPort
}

private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"

def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}
}

0 comments on commit 548c98c

Please sign in to comment.