From 7d57444044296773a27fe4141520291f96b2137f Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 2 Apr 2014 16:57:07 -0700 Subject: [PATCH 1/6] Refactoring the UI interface to add flexibility This commit introduces three (abstract) classes: WebUI, UITab, and UIPage. The top of the hierarchy is the WebUI, which contains many tabs and pages. Each tab in turn contains many pages. When a UITab is attached to a WebUI, the WebUI creates a handler for each of the tab's pages. Similarly, when a UIPage is attached to a WebUI, its handler is created. The server in WebUI is then ready to be bound to a host and a port. This commit also breaks down a couple of unnecessarily large files by moving certain classes to their own files. --- .../scala/org/apache/spark/SparkContext.scala | 2 +- .../apache/spark/deploy/master/Master.scala | 1 + .../deploy/master/ui/ApplicationPage.scala | 16 +- .../spark/deploy/master/ui/IndexPage.scala | 25 ++- .../spark/deploy/master/ui/MasterWebUI.scala | 50 ++---- .../apache/spark/deploy/worker/Worker.scala | 3 +- .../spark/deploy/worker/ui/IndexPage.scala | 8 +- .../spark/deploy/worker/ui/LogPage.scala | 147 ++++++++++++++++ .../spark/deploy/worker/ui/WorkerWebUI.scala | 160 ++---------------- .../scala/org/apache/spark/ui/SparkUI.scala | 95 ++++------- .../scala/org/apache/spark/ui/UIUtils.scala | 30 +++- .../scala/org/apache/spark/ui/WebUI.scala | 110 +++++++++--- .../apache/spark/ui/env/EnvironmentTab.scala | 56 ++++++ .../{EnvironmentUI.scala => IndexPage.scala} | 43 +---- .../apache/spark/ui/exec/ExecutorsTab.scala | 92 ++++++++++ .../{ExecutorsUI.scala => IndexPage.scala} | 90 ++-------- .../apache/spark/ui/jobs/ExecutorTable.scala | 7 +- .../org/apache/spark/ui/jobs/IndexPage.scala | 12 +- ...bProgressUI.scala => JobProgressTab.scala} | 37 ++-- .../org/apache/spark/ui/jobs/PoolPage.scala | 8 +- .../org/apache/spark/ui/jobs/PoolTable.scala | 7 +- .../org/apache/spark/ui/jobs/StagePage.scala | 34 ++-- .../org/apache/spark/ui/jobs/StageTable.scala | 8 +- ...kManagerUI.scala => BlockManagerTab.scala} | 27 +-- .../apache/spark/ui/storage/IndexPage.scala | 8 +- .../org/apache/spark/ui/storage/RDDPage.scala | 8 +- 26 files changed, 583 insertions(+), 501 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala create mode 100644 core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala rename core/src/main/scala/org/apache/spark/ui/env/{EnvironmentUI.scala => IndexPage.scala} (62%) create mode 100644 core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala rename core/src/main/scala/org/apache/spark/ui/exec/{ExecutorsUI.scala => IndexPage.scala} (60%) rename core/src/main/scala/org/apache/spark/ui/jobs/{JobProgressUI.scala => JobProgressTab.scala} (50%) rename core/src/main/scala/org/apache/spark/ui/storage/{BlockManagerUI.scala => BlockManagerTab.scala} (78%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index b23accbbb9410..495e9e485193c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -158,8 +158,8 @@ class SparkContext( // Initialize the Spark UI, registering all associated listeners private[spark] val ui = new SparkUI(this) - ui.bind() ui.start() + ui.bind() // Optionally log Spark events private[spark] val eventLogger: Option[EventLoggingListener] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 95bd62e88db2b..4c12ab192079f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -85,6 +85,7 @@ private[spark] class Master( val masterSource = new MasterSource(this) val webUi = new MasterWebUI(this, webUiPort) + webUi.start() val masterPublicAddress = { val envVar = System.getenv("SPARK_PUBLIC_DNS") diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index cb092cb5d576b..24282048b842e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -28,15 +28,17 @@ import org.json4s.JValue import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.ExecutorInfo -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class ApplicationPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class ApplicationPage(parent: MasterWebUI) + extends UIPage("app", includeJson = true) { + + private val master = parent.masterActorRef + private val timeout = parent.timeout /** Executor details for a particular application */ - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -47,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { } /** Executor details for a particular application */ - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val appId = request.getParameter("appId") val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -96,7 +98,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Application: " + app.desc.name) } - def executorRow(executor: ExecutorInfo): Seq[Node] = { + private def executorRow(executor: ExecutorInfo): Seq[Node] = { {executor.id} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala index 8c1d6c7cce450..f011c830a02da 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala @@ -25,24 +25,24 @@ import scala.xml.Node import akka.pattern.ask import org.json4s.JValue -import org.apache.spark.deploy.{JsonProtocol} +import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState} import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo} -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: MasterWebUI) { - val master = parent.masterActorRef - val timeout = parent.timeout +private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) { + private val master = parent.masterActorRef + private val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) JsonProtocol.writeMasterState(state) } /** Index view listing applications and executors */ - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse] val state = Await.result(stateFuture, timeout) @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { UIUtils.basicSparkPage(content, "Spark Master at " + state.uri) } - def workerRow(worker: WorkerInfo): Seq[Node] = { + private def workerRow(worker: WorkerInfo): Seq[Node] = { {worker.id} @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) { } - - def appRow(app: ApplicationInfo): Seq[Node] = { + private def appRow(app: ApplicationInfo): Seq[Node] = { {app.id} @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) { {Utils.megabytesToString(app.desc.memoryPerSlave)} - {WebUI.formatDate(app.submitDate)} + {UIUtils.formatDate(app.submitDate)} {app.desc.user} {app.state.toString} - {WebUI.formatDuration(app.duration)} + {UIUtils.formatDuration(app.duration)} } - def driverRow(driver: DriverInfo): Seq[Node] = { + private def driverRow(driver: DriverInfo): Seq[Node] = { {driver.id} {driver.submitDate} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index bd75b2dfd0e07..fbcc76b3cc150 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -17,13 +17,9 @@ package org.apache.spark.deploy.master.ui -import javax.servlet.http.HttpServletRequest - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging import org.apache.spark.deploy.master.Master -import org.apache.spark.ui.{ServerInfo, SparkUI} +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -31,50 +27,39 @@ import org.apache.spark.util.{AkkaUtils, Utils} * Web UI server for the standalone master. */ private[spark] -class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { - val masterActorRef = master.self - val timeout = AkkaUtils.askTimeout(master.conf) +class MasterWebUI(val master: Master, requestedPort: Int) + extends WebUI(master.securityMgr) with Logging { private val host = Utils.localHostName() private val port = requestedPort - private val applicationPage = new ApplicationPage(this) - private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None + val masterActorRef = master.self + val timeout = AkkaUtils.askTimeout(master.conf) - private val handlers: Seq[ServletContextHandler] = { - master.masterMetricsSystem.getServletHandlers ++ - master.applicationMetricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"), - createServletHandler("/app/json", - (request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr), - createServletHandler("/app", - (request: HttpServletRequest) => applicationPage.render(request), master.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), master.securityMgr) - ) + def start() { + attachPage(new ApplicationPage(this)) + attachPage(new IndexPage(this)) + attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static")) + master.masterMetricsSystem.getServletHandlers.foreach(attachHandler) + master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler) } + /** Bind to the HTTP server behind this web interface. */ def bind() { try { serverInfo = Some(startJettyServer(host, port, handlers, master.conf)) logInfo("Started Master web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Master JettyUtils", e) + logError("Failed to create Master web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { + for (handler <- ui.getHandlers) { rootHandler.addHandler(handler) if (!handler.isStarted) { handler.start() @@ -86,18 +71,13 @@ class MasterWebUI(val master: Master, requestedPort: Int) extends Logging { def detachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.handlers) { + for (handler <- ui.getHandlers) { if (handler.isStarted) { handler.stop() } rootHandler.removeHandler(handler) } } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object MasterWebUI { diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 8a71ddda4cb5e..97b5a37f1439c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -123,8 +123,9 @@ private[spark] class Worker( logInfo("Spark home: " + sparkHome) createWorkDir() webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) - context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) + webUi.start() webUi.bind() + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) registerWithMaster() metricsSystem.registerSource(workerSource) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala index 85200ab0e102d..bf7d552101484 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala @@ -28,21 +28,21 @@ import org.apache.spark.deploy.JsonProtocol import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse} import org.apache.spark.deploy.master.DriverState import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner} -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.Utils -private[spark] class IndexPage(parent: WorkerWebUI) { +private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) { val workerActor = parent.worker.self val worker = parent.worker val timeout = parent.timeout - def renderJson(request: HttpServletRequest): JValue = { + override def renderJson(request: HttpServletRequest): JValue = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) JsonProtocol.writeWorkerState(workerState) } - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse] val workerState = Await.result(stateFuture, timeout) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala new file mode 100644 index 0000000000000..f57900c99ce3d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.worker.ui + +import java.io.File +import javax.servlet.http.HttpServletRequest + +import scala.xml.Node + +import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.util.Utils + +private[spark] class LogPage(parent: WorkerWebUI) extends UIPage("logPage") { + private val worker = parent.worker + private val workDir = parent.workDir + + def renderLog(request: HttpServletRequest): String = { + val defaultBytes = 100 * 1024 + + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val path = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + s"${workDir.getPath}/$appId/$executorId/$logType" + case (None, None, Some(d)) => + s"${workDir.getPath}/$driverId/$logType" + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + + val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" + pre + Utils.offsetBytes(path, startByte, endByte) + } + + override def render(request: HttpServletRequest): Seq[Node] = { + val defaultBytes = 100 * 1024 + val appId = Option(request.getParameter("appId")) + val executorId = Option(request.getParameter("executorId")) + val driverId = Option(request.getParameter("driverId")) + val logType = request.getParameter("logType") + val offset = Option(request.getParameter("offset")).map(_.toLong) + val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) + + val (path, params) = (appId, executorId, driverId) match { + case (Some(a), Some(e), None) => + (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") + case (None, None, Some(d)) => + (s"${workDir.getPath}/$d/$logType", s"driverId=$d") + case _ => + throw new Exception("Request must specify either application or driver identifiers") + } + + val (startByte, endByte) = getByteRange(path, offset, byteLength) + val file = new File(path) + val logLength = file.length + val logText = {Utils.offsetBytes(path, startByte, endByte)} + val linkToMaster =

Back to Master

+ val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} + + val backButton = + if (startByte > 0) { + + + + } + else { + + } + + val nextButton = + if (endByte < logLength) { + + + + } + else { + + } + + val content = + + + {linkToMaster} +
+
{backButton}
+
{range}
+
{nextButton}
+
+
+
+
{logText}
+
+ + + UIUtils.basicSparkPage(content, logType + " log page for " + appId) + } + + /** Determine the byte range for a log or log page. */ + private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { + val defaultBytes = 100 * 1024 + val maxBytes = 1024 * 1024 + val file = new File(path) + val logLength = file.length() + val getOffset = offset.getOrElse(logLength - defaultBytes) + val startByte = + if (getOffset < 0) 0L + else if (getOffset > logLength) logLength + else getOffset + val logPageLength = math.min(byteLength, maxBytes) + val endByte = math.min(startByte + logPageLength, logLength) + (startByte, endByte) + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index de76a5d5eb7bc..e0a60121cb65a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -20,11 +20,9 @@ package org.apache.spark.deploy.worker.ui import java.io.File import javax.servlet.http.HttpServletRequest -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.Logging import org.apache.spark.deploy.worker.Worker -import org.apache.spark.ui.{JettyUtils, ServerInfo, SparkUI, UIUtils} +import org.apache.spark.ui.{SparkUI, WebUI} import org.apache.spark.ui.JettyUtils._ import org.apache.spark.util.{AkkaUtils, Utils} @@ -33,164 +31,34 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ private[spark] class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[Int] = None) - extends Logging { - - val timeout = AkkaUtils.askTimeout(worker.conf) + extends WebUI(worker.securityMgr) with Logging { private val host = Utils.localHostName() private val port = requestedPort.getOrElse( worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) - private val indexPage = new IndexPage(this) - private var serverInfo: Option[ServerInfo] = None + val timeout = AkkaUtils.askTimeout(worker.conf) - private val handlers: Seq[ServletContextHandler] = { - worker.metricsSystem.getServletHandlers ++ - Seq[ServletContextHandler]( - createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"), - createServletHandler("/log", - (request: HttpServletRequest) => log(request), worker.securityMgr), - createServletHandler("/logPage", - (request: HttpServletRequest) => logPage(request), worker.securityMgr), - createServletHandler("/json", - (request: HttpServletRequest) => indexPage.renderJson(request), worker.securityMgr), - createServletHandler("/", - (request: HttpServletRequest) => indexPage.render(request), worker.securityMgr) - ) + def start() { + val logPage = new LogPage(this) + attachPage(logPage) + attachPage(new IndexPage(this)) + attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static")) + attachHandler(createServletHandler("/log", + (request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr)) + worker.metricsSystem.getServletHandlers.foreach(attachHandler) } + /** Bind to the HTTP server behind this web interface. */ def bind() { try { - serverInfo = Some(JettyUtils.startJettyServer(host, port, handlers, worker.conf)) + serverInfo = Some(startJettyServer(host, port, handlers, worker.conf)) logInfo("Started Worker web UI at http://%s:%d".format(host, boundPort)) } catch { case e: Exception => - logError("Failed to create Worker JettyUtils", e) + logError("Failed to create Worker web UI", e) System.exit(1) } } - - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - private def log(request: HttpServletRequest): String = { - val defaultBytes = 100 * 1024 - - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val path = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - s"${workDir.getPath}/$appId/$executorId/$logType" - case (None, None, Some(d)) => - s"${workDir.getPath}/$driverId/$logType" - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - - val pre = s"==== Bytes $startByte-$endByte of $logLength of $path ====\n" - pre + Utils.offsetBytes(path, startByte, endByte) - } - - private def logPage(request: HttpServletRequest): Seq[scala.xml.Node] = { - val defaultBytes = 100 * 1024 - val appId = Option(request.getParameter("appId")) - val executorId = Option(request.getParameter("executorId")) - val driverId = Option(request.getParameter("driverId")) - val logType = request.getParameter("logType") - val offset = Option(request.getParameter("offset")).map(_.toLong) - val byteLength = Option(request.getParameter("byteLength")).map(_.toInt).getOrElse(defaultBytes) - - val (path, params) = (appId, executorId, driverId) match { - case (Some(a), Some(e), None) => - (s"${workDir.getPath}/$a/$e/$logType", s"appId=$a&executorId=$e") - case (None, None, Some(d)) => - (s"${workDir.getPath}/$d/$logType", s"driverId=$d") - case _ => - throw new Exception("Request must specify either application or driver identifiers") - } - - val (startByte, endByte) = getByteRange(path, offset, byteLength) - val file = new File(path) - val logLength = file.length - val logText = {Utils.offsetBytes(path, startByte, endByte)} - val linkToMaster =

Back to Master

- val range = Bytes {startByte.toString} - {endByte.toString} of {logLength} - - val backButton = - if (startByte > 0) { - - - - } - else { - - } - - val nextButton = - if (endByte < logLength) { - - - - } - else { - - } - - val content = - - - {linkToMaster} -
-
{backButton}
-
{range}
-
{nextButton}
-
-
-
-
{logText}
-
- - - UIUtils.basicSparkPage(content, logType + " log page for " + appId) - } - - /** Determine the byte range for a log or log page. */ - private def getByteRange(path: String, offset: Option[Long], byteLength: Int): (Long, Long) = { - val defaultBytes = 100 * 1024 - val maxBytes = 1024 * 1024 - val file = new File(path) - val logLength = file.length() - val getOffset = offset.getOrElse(logLength - defaultBytes) - val startByte = - if (getOffset < 0) 0L - else if (getOffset > logLength) logLength - else getOffset - val logPageLength = math.min(byteLength, maxBytes) - val endByte = math.min(startByte + logPageLength, logLength) - (startByte, endByte) - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a Worker UI that was not bound to a server!") - serverInfo.get.server.stop() - } } private[spark] object WorkerWebUI { diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index ef1ad872c8ef7..d8ea1b13362e3 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -17,103 +17,78 @@ package org.apache.spark.ui -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext} import org.apache.spark.scheduler._ import org.apache.spark.storage.StorageStatusListener import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.env.EnvironmentUI -import org.apache.spark.ui.exec.ExecutorsUI -import org.apache.spark.ui.jobs.JobProgressUI -import org.apache.spark.ui.storage.BlockManagerUI +import org.apache.spark.ui.env.EnvironmentTab +import org.apache.spark.ui.exec.ExecutorsTab +import org.apache.spark.ui.jobs.JobProgressTab +import org.apache.spark.ui.storage.BlockManagerTab import org.apache.spark.util.Utils -/** Top level user interface for Spark */ +/** + * Top level user interface for Spark. + */ private[spark] class SparkUI( val sc: SparkContext, conf: SparkConf, + val securityManager: SecurityManager, val listenerBus: SparkListenerBus, val appName: String, val basePath: String = "") - extends Logging { + extends WebUI(securityManager, basePath) with Logging { - def this(sc: SparkContext) = this(sc, sc.conf, sc.listenerBus, sc.appName) + def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName) def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) = - this(null, conf, listenerBus, appName, basePath) + this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath) // If SparkContext is not provided, assume the associated application is not live val live = sc != null - val securityManager = if (live) sc.env.securityManager else new SecurityManager(conf) - private val bindHost = Utils.localHostName() private val publicHost = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(bindHost) private val port = conf.get("spark.ui.port", SparkUI.DEFAULT_PORT).toInt - private var serverInfo: Option[ServerInfo] = None - private val storage = new BlockManagerUI(this) - private val jobs = new JobProgressUI(this) - private val env = new EnvironmentUI(this) - private val exec = new ExecutorsUI(this) + // Maintain executor storage status through Spark events + val storageStatusListener = new StorageStatusListener - val handlers: Seq[ServletContextHandler] = { - val metricsServletHandlers = if (live) { - SparkEnv.get.metricsSystem.getServletHandlers - } else { - Array[ServletContextHandler]() + /** Initialize all components of the server. Must be called before bind(). */ + def start() { + attachTab(new BlockManagerTab(this)) + attachTab(new JobProgressTab(this)) + attachTab(new EnvironmentTab(this)) + attachTab(new ExecutorsTab(this)) + attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) + attachHandler(createRedirectHandler("/", "/stages", basePath)) + if (live) { + sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } - storage.getHandlers ++ - jobs.getHandlers ++ - env.getHandlers ++ - exec.getHandlers ++ - metricsServletHandlers ++ - Seq[ServletContextHandler] ( - createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static"), - createRedirectHandler("/", "/stages", basePath) - ) + // Storage status listener must receive events first, as other listeners depend on its state + listenerBus.addListener(storageStatusListener) + getListeners.foreach(listenerBus.addListener) } - // Maintain executor storage status through Spark events - val storageStatusListener = new StorageStatusListener - - /** Bind the HTTP server which backs this web interface */ + /** Bind to the HTTP server behind this web interface. */ def bind() { + assert(!handlers.isEmpty, "SparkUI has not started yet!") try { serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) - logInfo("Started Spark Web UI at http://%s:%d".format(publicHost, boundPort)) + logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) } catch { case e: Exception => - logError("Failed to create Spark JettyUtils", e) + logError("Failed to create Spark web UI", e) System.exit(1) } } - def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) - - /** Initialize all components of the server */ - def start() { - storage.start() - jobs.start() - env.start() - exec.start() - - // Storage status listener must receive events first, as other listeners depend on its state - listenerBus.addListener(storageStatusListener) - listenerBus.addListener(storage.listener) - listenerBus.addListener(jobs.listener) - listenerBus.addListener(env.listener) - listenerBus.addListener(exec.listener) - } - - def stop() { - assert(serverInfo.isDefined, "Attempted to stop a SparkUI that was not bound to a server!") - serverInfo.get.server.stop() - logInfo("Stopped Spark Web UI at %s".format(appUIAddress)) + /** Stop the server behind this web interface. Only valid after bind(). */ + override def stop() { + super.stop() + logInfo("Stopped Spark web UI at %s".format(appUIAddress)) } private[spark] def appUIAddress = "http://" + publicHost + ":" + boundPort - } private[spark] object SparkUI { diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index a487924effbff..de4216849dc7d 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -17,6 +17,9 @@ package org.apache.spark.ui +import java.text.SimpleDateFormat +import java.util.Date + import scala.xml.Node /** Utility functions for generating XML pages with spark content. */ @@ -24,9 +27,32 @@ private[spark] object UIUtils { import Page._ + // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. + private val dateFormat = new ThreadLocal[SimpleDateFormat]() { + override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + } + + def formatDate(date: Date): String = dateFormat.get.format(date) + + def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + + def formatDuration(milliseconds: Long): String = { + val seconds = milliseconds.toDouble / 1000 + if (seconds < 60) { + return "%.0f s".format(seconds) + } + val minutes = seconds / 60 + if (minutes < 10) { + return "%.1f min".format(minutes) + } else if (minutes < 60) { + return "%.0f min".format(minutes) + } + val hours = minutes / 60 + "%.1f h".format(hours) + } + // Yarn has to go through a proxy so the base uri is provided and has to be on all links - private[spark] val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")). - getOrElse("") + val uiRoot : String = Option(System.getenv("APPLICATION_WEB_PROXY_BASE")).getOrElse("") def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index a7b872f3445a4..6f7385086b534 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -17,34 +17,100 @@ package org.apache.spark.ui -import java.text.SimpleDateFormat -import java.util.Date +import javax.servlet.http.HttpServletRequest + +import org.eclipse.jetty.servlet.ServletContextHandler + +import org.apache.spark.SecurityManager +import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.util.Utils +import scala.collection.mutable.ArrayBuffer +import org.apache.spark.scheduler.SparkListener +import scala.xml.Node +import org.json4s.JsonAST.{JNothing, JValue} /** - * Utilities used throughout the web UI. + * The top level component of the UI hierarchy that contains the server. + * + * Each WebUI represents a collection of tabs, each of which in turn represents a collection of + * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. + * All tabs and pages must be attached before bind()'ing the server. */ -private[spark] object WebUI { - // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. - private val dateFormat = new ThreadLocal[SimpleDateFormat]() { - override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") +private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { + protected val tabs = ArrayBuffer[UITab]() + protected val handlers = ArrayBuffer[ServletContextHandler]() + protected var serverInfo: Option[ServerInfo] = None + + /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */ + def attachTab(tab: UITab) { + tab.start() + tab.pages.foreach(attachPage) + tabs += tab } - def formatDate(date: Date): String = dateFormat.get.format(date) + /** Attach a page to this UI. Only valid before bind(). */ + def attachPage(page: UIPage) { + val pagePath = "/" + page.prefix + attachHandler(createServletHandler(pagePath, + (request: HttpServletRequest) => page.render(request), securityManager, basePath)) + if (page.includeJson) { + attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json", + (request: HttpServletRequest) => page.renderJson(request), securityManager, basePath)) + } + } - def formatDate(timestamp: Long): String = dateFormat.get.format(new Date(timestamp)) + /** Attach a handler to this UI. Only valid before bind(). */ + def attachHandler(handler: ServletContextHandler) { + handlers += handler + } - def formatDuration(milliseconds: Long): String = { - val seconds = milliseconds.toDouble / 1000 - if (seconds < 60) { - return "%.0f s".format(seconds) - } - val minutes = seconds / 60 - if (minutes < 10) { - return "%.1f min".format(minutes) - } else if (minutes < 60) { - return "%.0f min".format(minutes) - } - val hours = minutes / 60 - return "%.1f h".format(hours) + /** Return a list of listeners attached to this UI. */ + def getListeners = tabs.flatMap(_.listener) + + /** Return a list of handlers attached to this UI. */ + def getHandlers = handlers.toSeq + + /** + * Bind to the HTTP server behind this web interface. + * Overridden implementation should set serverInfo. + */ + def bind() + + /** Return the actual port to which this server is bound. Only valid after bind(). */ + def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1) + + /** Stop the server behind this web interface. Only valid after bind(). */ + def stop() { + assert(serverInfo.isDefined, + "Attempted to stop %s before binding to a server!".format(Utils.getFormattedClassName(this))) + serverInfo.get.server.stop() } } + +/** + * A tab that represents a collection of pages and a unit of listening for Spark events. + * Associating each tab with a listener is arbitrary and need not be the case. + */ +private[spark] abstract class UITab(val prefix: String) { + val pages = ArrayBuffer[UIPage]() + var listener: Option[SparkListener] = None + + /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ + def attachPage(page: UIPage) { + page.prefix = (prefix + "/" + page.prefix).stripSuffix("/") + pages += page + } + + def start() +} + +/** + * A page that represents the leaf node in the UI hierarchy. + * + * If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the + * HTML and the JSON content, rather than just the former. + */ +private[spark] abstract class UIPage(var prefix: String, val includeJson: Boolean = false) { + def render(request: HttpServletRequest): Seq[Node] = Seq[Node]() + def renderJson(request: HttpServletRequest): JValue = JNothing +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala new file mode 100644 index 0000000000000..dd4ea2a2332a2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.env + +import org.apache.spark.scheduler._ +import org.apache.spark.ui._ + +private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new EnvironmentListener) + attachPage(new IndexPage(this)) + } + + def environmentListener = { + assert(listener.isDefined, "EnvironmentTab has not started yet!") + listener.get.asInstanceOf[EnvironmentListener] + } +} + +/** + * A SparkListener that prepares information to be displayed on the EnvironmentTab + */ +private[ui] class EnvironmentListener extends SparkListener { + var jvmInformation = Seq[(String, String)]() + var sparkProperties = Seq[(String, String)]() + var systemProperties = Seq[(String, String)]() + var classpathEntries = Seq[(String, String)]() + + override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { + synchronized { + val environmentDetails = environmentUpdate.environmentDetails + jvmInformation = environmentDetails("JVM Information") + sparkProperties = environmentDetails("Spark Properties") + systemProperties = environmentDetails("System Properties") + classpathEntries = environmentDetails("Classpath Entries") + } + } +} diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala similarity index 62% rename from core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala rename to core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala index 23e90c34d5b33..bf1872f18d54e 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala @@ -21,30 +21,15 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.scheduler._ -import org.apache.spark.ui._ -import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.ui.{UIUtils, UIPage} import org.apache.spark.ui.Page.Environment -private[ui] class EnvironmentUI(parent: SparkUI) { +private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[EnvironmentListener] = None - - lazy val listener = _listener.get - - def start() { - _listener = Some(new EnvironmentListener) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/environment", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.environmentListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val runtimeInformationTable = UIUtils.listingTable( propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true) val sparkPropertiesTable = UIUtils.listingTable( @@ -70,23 +55,3 @@ private[ui] class EnvironmentUI(parent: SparkUI) { private def propertyRow(kv: (String, String)) = {kv._1}{kv._2} private def classPathRow(data: (String, String)) = {data._1}{data._2} } - -/** - * A SparkListener that prepares information to be displayed on the EnvironmentUI - */ -private[ui] class EnvironmentListener extends SparkListener { - var jvmInformation = Seq[(String, String)]() - var sparkProperties = Seq[(String, String)]() - var systemProperties = Seq[(String, String)]() - var classpathEntries = Seq[(String, String)]() - - override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) { - synchronized { - val environmentDetails = environmentUpdate.environmentDetails - jvmInformation = environmentDetails("JVM Information") - sparkProperties = environmentDetails("Spark Properties") - systemProperties = environmentDetails("System Properties") - classpathEntries = environmentDetails("Classpath Entries") - } - } -} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala new file mode 100644 index 0000000000000..2b833c58c8e44 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui.exec + +import scala.collection.mutable.HashMap + +import org.apache.spark.ExceptionFailure +import org.apache.spark.scheduler._ +import org.apache.spark.storage.StorageStatusListener +import org.apache.spark.ui.{SparkUI, UITab} + +private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new ExecutorsListener(parent.storageStatusListener)) + attachPage(new IndexPage(this)) + } + + def executorsListener = { + assert(listener.isDefined, "ExecutorsTab has not started yet!") + listener.get.asInstanceOf[ExecutorsListener] + } +} + +/** + * A SparkListener that prepares information to be displayed on the ExecutorsTab + */ +private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) + extends SparkListener { + + val executorToTasksActive = HashMap[String, Int]() + val executorToTasksComplete = HashMap[String, Int]() + val executorToTasksFailed = HashMap[String, Int]() + val executorToDuration = HashMap[String, Long]() + val executorToShuffleRead = HashMap[String, Long]() + val executorToShuffleWrite = HashMap[String, Long]() + + def storageStatusList = storageStatusListener.storageStatusList + + override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { + val eid = formatExecutorId(taskStart.taskInfo.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 + } + + override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { + val info = taskEnd.taskInfo + if (info != null) { + val eid = formatExecutorId(info.executorId) + executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 + executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration + taskEnd.reason match { + case e: ExceptionFailure => + executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 + case _ => + executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 + } + + // Update shuffle read/write + val metrics = taskEnd.taskMetrics + if (metrics != null) { + metrics.shuffleReadMetrics.foreach { shuffleRead => + executorToShuffleRead(eid) = + executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead + } + metrics.shuffleWriteMetrics.foreach { shuffleWrite => + executorToShuffleWrite(eid) = + executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten + } + } + } + } + + // This addresses executor ID inconsistencies in the local mode + private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) +} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala similarity index 60% rename from core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala rename to core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index 031ed88a493a8..fbbba2f63878f 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -11,7 +11,7 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and +* See the License for the specific language governing permissions and * limitations under the License. */ @@ -19,36 +19,18 @@ package org.apache.spark.ui.exec import javax.servlet.http.HttpServletRequest -import scala.collection.mutable.HashMap import scala.xml.Node -import org.eclipse.jetty.servlet.ServletContextHandler - -import org.apache.spark.ExceptionFailure -import org.apache.spark.scheduler._ -import org.apache.spark.storage.StorageStatusListener -import org.apache.spark.ui.JettyUtils._ +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.ui.Page.Executors -import org.apache.spark.ui.{SparkUI, UIUtils} import org.apache.spark.util.Utils -private[ui] class ExecutorsUI(parent: SparkUI) { +private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath - private var _listener: Option[ExecutorsListener] = None - - lazy val listener = _listener.get - - def start() { - _listener = Some(new ExecutorsListener(parent.storageStatusListener)) - } - - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/executors", - (request: HttpServletRequest) => render(request), parent.securityManager, basePath) - ) + private val listener = parent.executorsListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { val storageStatusList = listener.storageStatusList val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _) val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _) @@ -68,11 +50,11 @@ private[ui] class ExecutorsUI(parent: SparkUI) { -
-
- {execTable} -
-
; +
+
+ {execTable} +
+
; UIUtils.headerSparkPage( content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) @@ -158,55 +140,3 @@ private[ui] class ExecutorsUI(parent: SparkUI) { execFields.zip(execValues).toMap } } - -/** - * A SparkListener that prepares information to be displayed on the ExecutorsUI - */ -private[ui] class ExecutorsListener(storageStatusListener: StorageStatusListener) - extends SparkListener { - - val executorToTasksActive = HashMap[String, Int]() - val executorToTasksComplete = HashMap[String, Int]() - val executorToTasksFailed = HashMap[String, Int]() - val executorToDuration = HashMap[String, Long]() - val executorToShuffleRead = HashMap[String, Long]() - val executorToShuffleWrite = HashMap[String, Long]() - - def storageStatusList = storageStatusListener.storageStatusList - - override def onTaskStart(taskStart: SparkListenerTaskStart) = synchronized { - val eid = formatExecutorId(taskStart.taskInfo.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 0) + 1 - } - - override def onTaskEnd(taskEnd: SparkListenerTaskEnd) = synchronized { - val info = taskEnd.taskInfo - if (info != null) { - val eid = formatExecutorId(info.executorId) - executorToTasksActive(eid) = executorToTasksActive.getOrElse(eid, 1) - 1 - executorToDuration(eid) = executorToDuration.getOrElse(eid, 0L) + info.duration - taskEnd.reason match { - case e: ExceptionFailure => - executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1 - case _ => - executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1 - } - - // Update shuffle read/write - val metrics = taskEnd.taskMetrics - if (metrics != null) { - metrics.shuffleReadMetrics.foreach { shuffleRead => - executorToShuffleRead(eid) = - executorToShuffleRead.getOrElse(eid, 0L) + shuffleRead.remoteBytesRead - } - metrics.shuffleWriteMetrics.foreach { shuffleWrite => - executorToShuffleWrite(eid) = - executorToShuffleWrite.getOrElse(eid, 0L) + shuffleWrite.shuffleBytesWritten - } - } - } - } - - // This addresses executor ID inconsistencies in the local mode - private def formatExecutorId(execId: String) = storageStatusListener.formatExecutorId(execId) -} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala index 73861ae6746da..31173e48d7a1e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala @@ -20,11 +20,12 @@ package org.apache.spark.ui.jobs import scala.collection.mutable import scala.xml.Node +import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing executor summary */ -private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { - private lazy val listener = parent.listener +private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) { + private lazy val listener = parent.jobProgressListener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -69,7 +70,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressUI) { {k} {executorIdToAddress.getOrElse(k, "CANNOT FIND ADDRESS")} - {parent.formatDuration(v.taskTime)} + {UIUtils.formatDuration(v.taskTime)} {v.failedTasks + v.succeededTasks} {v.failedTasks} {v.succeededTasks} diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 70d62b66a4829..c600e58af004d 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -23,23 +23,23 @@ import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} /** Page showing list of all ongoing and recently finished stages and pools */ -private[ui] class IndexPage(parent: JobProgressUI) { +private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener private lazy val isFairScheduler = parent.isFairScheduler - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val activeStages = listener.activeStages.values.toSeq val completedStages = listener.completedStages.reverse.toSeq val failedStages = listener.failedStages.reverse.toSeq - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val activeStagesTable = new StageTable(activeStages.sortBy(_.submissionTime).reverse, parent) val completedStagesTable = @@ -57,7 +57,7 @@ private[ui] class IndexPage(parent: JobProgressUI) { // Total duration is not meaningful unless the UI is live
  • Total Duration: - {parent.formatDuration(now - sc.startTime)} + {UIUtils.formatDuration(now - sc.startTime)}
  • }}
  • diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala similarity index 50% rename from core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala rename to core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index b2c67381cc3da..c40b75d684510 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -17,44 +17,29 @@ package org.apache.spark.ui.jobs -import javax.servlet.http.HttpServletRequest - -import org.eclipse.jetty.servlet.ServletContextHandler - import org.apache.spark.SparkConf import org.apache.spark.scheduler.SchedulingMode -import org.apache.spark.ui.JettyUtils._ -import org.apache.spark.ui.SparkUI -import org.apache.spark.util.Utils +import org.apache.spark.ui.{SparkUI, UITab} /** Web UI showing progress status of all jobs in the given SparkContext. */ -private[ui] class JobProgressUI(parent: SparkUI) { +private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { val appName = parent.appName val basePath = parent.basePath val live = parent.live val sc = parent.sc - lazy val listener = _listener.get - lazy val isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR) - - private val indexPage = new IndexPage(this) - private val stagePage = new StagePage(this) - private val poolPage = new PoolPage(this) - private var _listener: Option[JobProgressListener] = None - def start() { val conf = if (live) sc.conf else new SparkConf - _listener = Some(new JobProgressListener(conf)) + listener = Some(new JobProgressListener(conf)) + attachPage(new IndexPage(this)) + attachPage(new StagePage(this)) + attachPage(new PoolPage(this)) } - def formatDuration(ms: Long) = Utils.msDurationToString(ms) + def jobProgressListener = { + assert(listener.isDefined, "JobProgressTab has not started yet!") + listener.get.asInstanceOf[JobProgressListener] + } - def getHandlers = Seq[ServletContextHandler]( - createServletHandler("/stages/stage", - (request: HttpServletRequest) => stagePage.render(request), parent.securityManager, basePath), - createServletHandler("/stages/pool", - (request: HttpServletRequest) => poolPage.render(request), parent.securityManager, basePath), - createServletHandler("/stages", - (request: HttpServletRequest) => indexPage.render(request), parent.securityManager, basePath) - ) + def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index bd33182b70059..53200ecdd4fee 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -23,17 +23,17 @@ import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.Page._ -import org.apache.spark.ui.UIUtils +import org.apache.spark.ui.{UIPage, UIUtils} /** Page showing specific pool details */ -private[ui] class PoolPage(parent: JobProgressUI) { +private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { private val appName = parent.appName private val basePath = parent.basePath private val live = parent.live private val sc = parent.sc - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val poolName = request.getParameter("poolname") val poolToActiveStages = listener.poolToActiveStages diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala index c5c8d8668740b..bb7a9c14f7761 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolTable.scala @@ -24,10 +24,9 @@ import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.UIUtils /** Table showing list of pools */ -private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { +private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressTab) { private val basePath = parent.basePath - private val poolToActiveStages = listener.poolToActiveStages - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener def toNodeSeq: Seq[Node] = { listener.synchronized { @@ -48,7 +47,7 @@ private[ui] class PoolTable(pools: Seq[Schedulable], parent: JobProgressUI) { SchedulingMode - {rows.map(r => makeRow(r, poolToActiveStages))} + {rows.map(r => makeRow(r, listener.poolToActiveStages))} } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 0c55f2ee7e944..bd3d878b1567f 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,16 +23,16 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.Page._ -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.{UIPage, UIUtils} import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ -private[ui] class StagePage(parent: JobProgressUI) { +private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") { private val appName = parent.appName private val basePath = parent.basePath - private lazy val listener = parent.listener + private lazy val listener = parent.jobProgressListener - def render(request: HttpServletRequest): Seq[Node] = { + override def render(request: HttpServletRequest): Seq[Node] = { listener.synchronized { val stageId = request.getParameter("id").toInt @@ -58,7 +58,7 @@ private[ui] class StagePage(parent: JobProgressUI) { val hasBytesSpilled = memoryBytesSpilled > 0 && diskBytesSpilled > 0 var activeTime = 0L - val now = System.currentTimeMillis() + val now = System.currentTimeMillis val tasksActive = listener.stageIdToTasksActive(stageId).values tasksActive.foreach(activeTime += _.timeRunning(now)) @@ -68,7 +68,7 @@ private[ui] class StagePage(parent: JobProgressUI) { -
    -
    - {execTable} -
    -
    ; +
    +
    + {execTable} +
    +
    ; UIUtils.headerSparkPage( content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index c600e58af004d..5c752793d2342 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -22,8 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable -import org.apache.spark.ui.Page._ import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.Page.Stages /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index c40b75d684510..611febe10c1aa 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -36,7 +36,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { attachPage(new PoolPage(this)) } - def jobProgressListener = { + def jobProgressListener: JobProgressListener = { assert(listener.isDefined, "JobProgressTab has not started yet!") listener.get.asInstanceOf[JobProgressListener] } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 53200ecdd4fee..9382251d6e612 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -22,8 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} -import org.apache.spark.ui.Page._ import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.Page.Stages /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index bd3d878b1567f..a4f6e5d69c515 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -22,8 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node -import org.apache.spark.ui.Page._ import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.Page.Stages import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index e41e6c0ab7da6..d918feafd97d0 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -23,7 +23,7 @@ import scala.collection.mutable.HashMap import scala.xml.Node import org.apache.spark.scheduler.{StageInfo, TaskInfo} -import org.apache.spark.ui.{WebUI, UIUtils} +import org.apache.spark.ui.UIUtils import org.apache.spark.util.Utils /** Page showing list of all ongoing and recently finished stages */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala index d7fcd823a242f..2b9cf35fcde55 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala @@ -34,7 +34,7 @@ private[ui] class BlockManagerTab(parent: SparkUI) extends UITab("storage") { attachPage(new RddPage(this)) } - def blockManagerListener = { + def blockManagerListener: BlockManagerListener = { assert(listener.isDefined, "BlockManagerTab has not started yet!") listener.get.asInstanceOf[BlockManagerListener] } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index f06fdf7ce4d4c..7bfcf13c69c6b 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -22,8 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.RDDInfo -import org.apache.spark.ui.Page._ import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.Page.Storage import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index cb21754550494..35ccfd505a3ef 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -22,8 +22,8 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} -import org.apache.spark.ui.Page._ import org.apache.spark.ui.{UIPage, UIUtils} +import org.apache.spark.ui.Page.Storage import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ From ed25dfc704af544994a4a3bfe98ecaef9fff5bdb Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 2 Apr 2014 21:13:45 -0700 Subject: [PATCH 3/6] Generalize SparkUI header to display tabs dynamically --- .../main/scala/org/apache/spark/ui/Page.scala | 22 ------------ .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/UIUtils.scala | 36 +++++-------------- .../scala/org/apache/spark/ui/WebUI.scala | 11 +++--- .../apache/spark/ui/env/EnvironmentTab.scala | 2 ++ .../org/apache/spark/ui/env/IndexPage.scala | 3 +- .../apache/spark/ui/exec/ExecutorsTab.scala | 2 ++ .../org/apache/spark/ui/exec/IndexPage.scala | 5 ++- .../org/apache/spark/ui/jobs/IndexPage.scala | 3 +- .../apache/spark/ui/jobs/JobProgressTab.scala | 2 ++ .../org/apache/spark/ui/jobs/PoolPage.scala | 5 ++- .../org/apache/spark/ui/jobs/StagePage.scala | 9 +++-- .../spark/ui/storage/BlockManagerTab.scala | 2 ++ .../apache/spark/ui/storage/IndexPage.scala | 3 +- .../org/apache/spark/ui/storage/RDDPage.scala | 8 ++--- 15 files changed, 37 insertions(+), 78 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/ui/Page.scala diff --git a/core/src/main/scala/org/apache/spark/ui/Page.scala b/core/src/main/scala/org/apache/spark/ui/Page.scala deleted file mode 100644 index b2a069a37552d..0000000000000 --- a/core/src/main/scala/org/apache/spark/ui/Page.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.ui - -private[spark] object Page extends Enumeration { - val Stages, Storage, Environment, Executors = Value -} diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index d8ea1b13362e3..c333dd3784bb7 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -55,8 +55,8 @@ private[spark] class SparkUI( /** Initialize all components of the server. Must be called before bind(). */ def start() { - attachTab(new BlockManagerTab(this)) attachTab(new JobProgressTab(this)) + attachTab(new BlockManagerTab(this)) attachTab(new EnvironmentTab(this)) attachTab(new ExecutorsTab(this)) attachHandler(createStaticHandler(SparkUI.STATIC_RESOURCE_DIR, "/static")) diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index de4216849dc7d..7cf16b5ed29b1 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -25,8 +25,6 @@ import scala.xml.Node /** Utility functions for generating XML pages with spark content. */ private[spark] object UIUtils { - import Page._ - // SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use. private val dateFormat = new ThreadLocal[SimpleDateFormat]() { override def initialValue(): SimpleDateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") @@ -62,26 +60,13 @@ private[spark] object UIUtils { basePath: String, appName: String, title: String, - page: Page.Value) : Seq[Node] = { - val jobs = page match { - case Stages => -
  • Stages
  • - case _ =>
  • Stages
  • - } - val storage = page match { - case Storage => -
  • Storage
  • - case _ =>
  • Storage
  • - } - val environment = page match { - case Environment => -
  • Environment
  • - case _ =>
  • Environment
  • - } - val executors = page match { - case Executors => -
  • Executors
  • - case _ =>
  • Executors
  • + tabs: Seq[UITab], + activeTab: UITab) : Seq[Node] = { + + val header = tabs.map { tab => +
  • + {tab.name} +
  • } @@ -100,12 +85,7 @@ private[spark] object UIUtils { - + diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index f20aec893c787..0b847a9a471f0 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -42,6 +42,10 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: protected val handlers = ArrayBuffer[ServletContextHandler]() protected var serverInfo: Option[ServerInfo] = None + def getTabs: Seq[UITab] = tabs.toSeq + def getHandlers: Seq[ServletContextHandler] = handlers.toSeq + def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener) + /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */ def attachTab(tab: UITab) { tab.start() @@ -65,12 +69,6 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: handlers += handler } - /** Return a list of listeners attached to this UI. */ - def getListeners = tabs.flatMap(_.listener) - - /** Return a list of handlers attached to this UI. */ - def getHandlers = handlers.toSeq - /** Initialize all components of the server. Must be called before bind(). */ def start() @@ -98,6 +96,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: private[spark] abstract class UITab(val prefix: String) { val pages = ArrayBuffer[UIPage]() var listener: Option[SparkListener] = None + var name = prefix.capitalize /** Attach a page to this tab. This prepends the page's prefix with the tab's own prefix. */ def attachPage(page: UIPage) { diff --git a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala index ad355e59a37aa..6a2304f1ad42f 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/EnvironmentTab.scala @@ -33,6 +33,8 @@ private[ui] class EnvironmentTab(parent: SparkUI) extends UITab("environment") { assert(listener.isDefined, "EnvironmentTab has not started yet!") listener.get.asInstanceOf[EnvironmentListener] } + + def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala index bf1872f18d54e..bde672909bbcc 100644 --- a/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/env/IndexPage.scala @@ -22,7 +22,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{UIUtils, UIPage} -import org.apache.spark.ui.Page.Environment private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") { private val appName = parent.appName @@ -46,7 +45,7 @@ private[ui] class IndexPage(parent: EnvironmentTab) extends UIPage("") {

    Classpath Entries

    {classpathEntriesTable} - UIUtils.headerSparkPage(content, basePath, appName, "Environment", Environment) + UIUtils.headerSparkPage(content, basePath, appName, "Environment", parent.headerTabs, parent) } private def propertyHeader = Seq("Name", "Value") diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 3941a1849b182..c1f5ca856ffe1 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -37,6 +37,8 @@ private[ui] class ExecutorsTab(parent: SparkUI) extends UITab("executors") { assert(listener.isDefined, "ExecutorsTab has not started yet!") listener.get.asInstanceOf[ExecutorsListener] } + + def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index 9a711d773ae01..1956b6c63929e 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -22,7 +22,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Executors import org.apache.spark.util.Utils private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { @@ -56,8 +55,8 @@ private[ui] class IndexPage(parent: ExecutorsTab) extends UIPage("") { ; - UIUtils.headerSparkPage( - content, basePath, appName, "Executors (" + execInfo.size + ")", Executors) + UIUtils.headerSparkPage(content, basePath, appName, "Executors (" + execInfo.size + ")", + parent.headerTabs, parent) } /** Header fields for the executors table */ diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala index 5c752793d2342..2b54603af104e 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/IndexPage.scala @@ -23,7 +23,6 @@ import scala.xml.{Node, NodeSeq} import org.apache.spark.scheduler.Schedulable import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Stages /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") { @@ -92,7 +91,7 @@ private[ui] class IndexPage(parent: JobProgressTab) extends UIPage("") {

    Failed Stages ({failedStages.size})

    ++ failedStagesTable.toNodeSeq - UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Spark Stages", parent.headerTabs, parent) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala index 611febe10c1aa..93d26f7dd3632 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobProgressTab.scala @@ -42,4 +42,6 @@ private[ui] class JobProgressTab(parent: SparkUI) extends UITab("stages") { } def isFairScheduler = jobProgressListener.schedulingMode.exists(_ == SchedulingMode.FAIR) + + def headerTabs: Seq[UITab] = parent.getTabs } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala index 9382251d6e612..7fffe2affb0f2 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/PoolPage.scala @@ -23,7 +23,6 @@ import scala.xml.Node import org.apache.spark.scheduler.{Schedulable, StageInfo} import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Stages /** Page showing specific pool details */ private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") { @@ -51,8 +50,8 @@ private[ui] class PoolPage(parent: JobProgressTab) extends UIPage("pool") {

    Summary

    ++ poolTable.toNodeSeq ++

    {activeStages.size} Active Stages

    ++ activeStagesTable.toNodeSeq - UIUtils.headerSparkPage( - content, basePath, appName, "Fair Scheduler Pool: " + poolName, Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Fair Scheduler Pool: " + poolName, + parent.headerTabs, parent) } } } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index a4f6e5d69c515..372210919cd91 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -23,7 +23,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Stages import org.apache.spark.util.{Utils, Distribution} /** Page showing statistics and task list for a given stage */ @@ -42,8 +41,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") {

    Summary Metrics

    No tasks have started yet

    Tasks

    No tasks have started yet - return UIUtils.headerSparkPage( - content, basePath, appName, "Details for Stage %s".format(stageId), Stages) + return UIUtils.headerSparkPage(content, basePath, appName, + "Details for Stage %s".format(stageId), parent.headerTabs, parent) } val tasks = listener.stageIdToTaskData(stageId).values.toSeq.sortBy(_.taskInfo.launchTime) @@ -204,8 +203,8 @@ private[ui] class StagePage(parent: JobProgressTab) extends UIPage("stage") {

    Aggregated Metrics by Executor

    ++ executorTable.toNodeSeq ++

    Tasks

    ++ taskTable - UIUtils.headerSparkPage( - content, basePath, appName, "Details for Stage %d".format(stageId), Stages) + UIUtils.headerSparkPage(content, basePath, appName, "Details for Stage %d".format(stageId), + parent.headerTabs, parent) } } diff --git a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala index 2b9cf35fcde55..ac83f71ed31de 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/BlockManagerTab.scala @@ -38,6 +38,8 @@ private[ui] class BlockManagerTab(parent: SparkUI) extends UITab("storage") { assert(listener.isDefined, "BlockManagerTab has not started yet!") listener.get.asInstanceOf[BlockManagerListener] } + + def headerTabs: Seq[UITab] = parent.getTabs } /** diff --git a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala index 7bfcf13c69c6b..96b08d07d48d2 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala @@ -23,7 +23,6 @@ import scala.xml.Node import org.apache.spark.storage.RDDInfo import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Storage import org.apache.spark.util.Utils /** Page showing list of RDD's currently stored in the cluster */ @@ -35,7 +34,7 @@ private[ui] class IndexPage(parent: BlockManagerTab) extends UIPage("") { override def render(request: HttpServletRequest): Seq[Node] = { val rdds = listener.rddInfoList val content = UIUtils.listingTable(rddHeader, rddRow, rdds) - UIUtils.headerSparkPage(content, basePath, appName, "Storage ", Storage) + UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent) } /** Header fields for the RDD table */ diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index 35ccfd505a3ef..a65ba0a020bcd 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -23,7 +23,6 @@ import scala.xml.Node import org.apache.spark.storage.{BlockId, BlockStatus, StorageStatus, StorageUtils} import org.apache.spark.ui.{UIPage, UIUtils} -import org.apache.spark.ui.Page.Storage import org.apache.spark.util.Utils /** Page showing storage details for a given RDD */ @@ -37,7 +36,8 @@ private[ui] class RddPage(parent: BlockManagerTab) extends UIPage("rdd") { val storageStatusList = listener.storageStatusList val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse { // Rather than crashing, render an "RDD Not Found" page - return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", Storage) + return UIUtils.headerSparkPage(Seq[Node](), basePath, appName, "RDD Not Found", + parent.headerTabs, parent) } // Worker table @@ -95,8 +95,8 @@ private[ui] class RddPage(parent: BlockManagerTab) extends UIPage("rdd") { ; - UIUtils.headerSparkPage( - content, basePath, appName, "RDD Storage Info for " + rddInfo.name, Storage) + UIUtils.headerSparkPage(content, basePath, appName, "RDD Storage Info for " + rddInfo.name, + parent.headerTabs, parent) } /** Header fields for the worker table */ From 9a48fa1de7b357f6ffdaad8e93af7b2b7e39bc06 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Apr 2014 15:53:48 -0700 Subject: [PATCH 4/6] Allow adding tabs to SparkUI dynamically + add example An example of how this is done is in org.apache.spark.ui.FooTab. Run it through bin/spark-class to see what it looks like (which should more or less match your expectations...). --- .../spark/deploy/master/ui/MasterWebUI.scala | 18 +-- .../spark/deploy/worker/ui/WorkerWebUI.scala | 2 +- .../scala/org/apache/spark/ui/FooTab.scala | 105 ++++++++++++++++++ .../scala/org/apache/spark/ui/SparkUI.scala | 13 ++- .../scala/org/apache/spark/ui/WebUI.scala | 26 ++++- 5 files changed, 139 insertions(+), 25 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/ui/FooTab.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala index c8d51e44a4dff..3a30919a70584 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterWebUI.scala @@ -35,7 +35,7 @@ class MasterWebUI(val master: Master, requestedPort: Int) val masterActorRef = master.self val timeout = AkkaUtils.askTimeout(master.conf) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { attachPage(new ApplicationPage(this)) attachPage(new IndexPage(this)) @@ -59,25 +59,13 @@ class MasterWebUI(val master: Master, requestedPort: Int) /** Attach a reconstructed UI to this Master UI. Only valid after bind(). */ def attachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.getHandlers) { - rootHandler.addHandler(handler) - if (!handler.isStarted) { - handler.start() - } - } + ui.getHandlers.foreach(attachHandler) } /** Detach a reconstructed UI from this Master UI. Only valid after bind(). */ def detachUI(ui: SparkUI) { assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs") - val rootHandler = serverInfo.get.rootHandler - for (handler <- ui.getHandlers) { - if (handler.isStarted) { - handler.stop() - } - rootHandler.removeHandler(handler) - } + ui.getHandlers.foreach(detachHandler) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index ae1b7ab014e6e..490a383be42e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -38,7 +38,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I worker.conf.get("worker.ui.port", WorkerWebUI.DEFAULT_PORT).toInt) val timeout = AkkaUtils.askTimeout(worker.conf) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { val logPage = new LogPage(this) attachPage(logPage) diff --git a/core/src/main/scala/org/apache/spark/ui/FooTab.scala b/core/src/main/scala/org/apache/spark/ui/FooTab.scala new file mode 100644 index 0000000000000..1e30fa75a263d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ui/FooTab.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ui + +import javax.servlet.http.HttpServletRequest + +import scala.collection.mutable +import scala.xml.Node + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.SparkContext._ +import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} + +/* + * This is an example of how to extend the SparkUI by adding new tabs to it. It is intended + * only as a demonstration and should be removed before merging into master! + * + * bin/spark-class org.apache.spark.ui.FooTab + */ + +/** A tab that displays basic information about jobs seen so far. */ +private[spark] class FooTab(parent: SparkUI) extends UITab("foo") { + val appName = parent.appName + val basePath = parent.basePath + + def start() { + listener = Some(new FooListener) + attachPage(new IndexPage(this)) + } + + def fooListener: FooListener = { + assert(listener.isDefined, "ExecutorsTab has not started yet!") + listener.get.asInstanceOf[FooListener] + } + + def headerTabs: Seq[UITab] = parent.getTabs +} + +/** A foo page. Enough said. */ +private[spark] class IndexPage(parent: FooTab) extends UIPage("") { + private val appName = parent.appName + private val basePath = parent.basePath + private val listener = parent.fooListener + + override def render(request: HttpServletRequest): Seq[Node] = { + val results = listener.jobResultMap.toSeq.sortBy { case (k, _) => k } + val content = +
    +
    + Foo Jobs: +
      + {results.map { case (k, v) =>
    • Job {k}: {v}
    • }} +
    +
    +
    + UIUtils.headerSparkPage(content, basePath, appName, "Foo", parent.headerTabs, parent) + } +} + +/** A listener that maintains a mapping between job IDs and job results. */ +private[spark] class FooListener extends SparkListener { + val jobResultMap = mutable.Map[Int, String]() + + override def onJobEnd(end: SparkListenerJobEnd) { + jobResultMap(end.jobId) = end.jobResult.toString + } +} + + +/** + * Start a SparkContext and a SparkUI with a FooTab attached. + */ +private[spark] object FooTab { + def main(args: Array[String]) { + val sc = new SparkContext("local", "Foo Tab", new SparkConf) + val fooTab = new FooTab(sc.ui) + sc.ui.attachTab(fooTab) + + // Run a few jobs + sc.parallelize(1 to 1000).count() + sc.parallelize(1 to 2000).persist().count() + sc.parallelize(1 to 3000).map(i => (i/2, i)).groupByKey().count() + sc.parallelize(1 to 4000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 5000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 6000).map(i => (i/2, i)).groupByKey().persist().count() + sc.parallelize(1 to 7000).map(i => (i/2, i)).groupByKey().persist().count() + + readLine("\n> Started SparkUI with a Foo tab...") + } +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index c333dd3784bb7..ac22189f9f04f 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -52,8 +52,9 @@ private[spark] class SparkUI( // Maintain executor storage status through Spark events val storageStatusListener = new StorageStatusListener + listenerBus.addListener(storageStatusListener) - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() { attachTab(new JobProgressTab(this)) attachTab(new BlockManagerTab(this)) @@ -64,14 +65,10 @@ private[spark] class SparkUI( if (live) { sc.env.metricsSystem.getServletHandlers.foreach(attachHandler) } - // Storage status listener must receive events first, as other listeners depend on its state - listenerBus.addListener(storageStatusListener) - getListeners.foreach(listenerBus.addListener) } /** Bind to the HTTP server behind this web interface. */ def bind() { - assert(!handlers.isEmpty, "SparkUI has not started yet!") try { serverInfo = Some(startJettyServer(bindHost, port, handlers, sc.conf)) logInfo("Started Spark web UI at http://%s:%d".format(publicHost, boundPort)) @@ -82,6 +79,12 @@ private[spark] class SparkUI( } } + /** Attach a tab to this UI, along with its corresponding listener if it exists. */ + override def attachTab(tab: UITab) { + super.attachTab(tab) + tab.listener.foreach(listenerBus.addListener) + } + /** Stop the server behind this web interface. Only valid after bind(). */ override def stop() { super.stop() diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 0b847a9a471f0..4392814fd1b39 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -35,7 +35,6 @@ import org.apache.spark.util.Utils * * Each WebUI represents a collection of tabs, each of which in turn represents a collection of * pages. The use of tabs is optional, however; a WebUI may choose to include pages directly. - * All tabs and pages must be attached before bind()'ing the server. */ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: String = "") { protected val tabs = ArrayBuffer[UITab]() @@ -46,14 +45,14 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: def getHandlers: Seq[ServletContextHandler] = handlers.toSeq def getListeners: Seq[SparkListener] = tabs.flatMap(_.listener) - /** Attach a tab to this UI, along with all of its attached pages. Only valid before bind(). */ + /** Attach a tab to this UI, along with all of its attached pages. */ def attachTab(tab: UITab) { tab.start() tab.pages.foreach(attachPage) tabs += tab } - /** Attach a page to this UI. Only valid before bind(). */ + /** Attach a page to this UI. */ def attachPage(page: UIPage) { val pagePath = "/" + page.prefix attachHandler(createServletHandler(pagePath, @@ -64,9 +63,26 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } } - /** Attach a handler to this UI. Only valid before bind(). */ + /** Attach a handler to this UI. */ def attachHandler(handler: ServletContextHandler) { handlers += handler + serverInfo.foreach { info => + info.rootHandler.addHandler(handler) + if (!handler.isStarted) { + handler.start() + } + } + } + + /** Detach a handler from this UI. */ + def detachHandler(handler: ServletContextHandler) { + handlers -= handler + serverInfo.foreach { info => + info.rootHandler.removeHandler(handler) + if (handler.isStarted) { + handler.stop() + } + } } /** Initialize all components of the server. Must be called before bind(). */ @@ -89,6 +105,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } } + /** * A tab that represents a collection of pages and a unit of listening for Spark events. * Associating each tab with a listener is arbitrary and need not be the case. @@ -108,6 +125,7 @@ private[spark] abstract class UITab(val prefix: String) { def start() } + /** * A page that represents the leaf node in the UI hierarchy. * From 8f7323b76ffb64a30c6efc5b471f9898f94c0979 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Thu, 3 Apr 2014 16:33:23 -0700 Subject: [PATCH 5/6] End of file new lines, indentation, and imports (minor) --- core/src/main/scala/org/apache/spark/ui/FooTab.scala | 2 +- core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala | 2 +- .../src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/FooTab.scala b/core/src/main/scala/org/apache/spark/ui/FooTab.scala index 1e30fa75a263d..620fe8001a85c 100644 --- a/core/src/main/scala/org/apache/spark/ui/FooTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/FooTab.scala @@ -102,4 +102,4 @@ private[spark] object FooTab { readLine("\n> Started SparkUI with a Foo tab...") } -} \ No newline at end of file +} diff --git a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala index 1956b6c63929e..bc6a822b080c3 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/IndexPage.scala @@ -11,7 +11,7 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and + * See the License for the specific language governing permissions and * limitations under the License. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala index 51b11a29cbd98..4063ce3d7ca44 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/UIUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.streaming.ui import scala.xml.Node -import org.apache.spark.ui.Page private[spark] object UIUtils { From c78c92d2240d563219c9d7036d480619dc789d76 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Mon, 7 Apr 2014 16:26:00 -0700 Subject: [PATCH 6/6] Remove outdated comment --- core/src/main/scala/org/apache/spark/ui/WebUI.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/WebUI.scala b/core/src/main/scala/org/apache/spark/ui/WebUI.scala index 4392814fd1b39..126a7ff2f6080 100644 --- a/core/src/main/scala/org/apache/spark/ui/WebUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/WebUI.scala @@ -85,7 +85,7 @@ private[spark] abstract class WebUI(securityManager: SecurityManager, basePath: } } - /** Initialize all components of the server. Must be called before bind(). */ + /** Initialize all components of the server. */ def start() /**