Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor UI interface to allow dynamically adding tabs #2

Merged
merged 8 commits into from
Apr 10, 2014
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ class SparkContext(

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()
ui.bind()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private[spark] class Master(
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class ApplicationPage(parent: MasterWebUI)
extends UIPage("app", includeJson = true) {

private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand All @@ -47,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
}

/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down Expand Up @@ -96,7 +98,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
private val master = parent.masterActorRef
private val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}

/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)

Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}

def workerRow(worker: WorkerInfo): Seq[Node] = {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
Expand All @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}


def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
<td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}

def driverRow(driver: DriverInfo): Seq[Node] = {
private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,86 +17,55 @@

package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr) with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)
private var serverInfo: Option[ServerInfo] = None
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

private val handlers: Seq[ServletContextHandler] = {
master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/app/json",
(request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
createServletHandler("/app",
(request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
)
/** Initialize all components of the server. */
def start() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer(host, port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Master JettyUtils", e)
logError("Failed to create Master web UI", e)
System.exit(1)
}
}

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
}

def stop() {
assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
serverInfo.get.server.stop()
ui.getHandlers.foreach(detachHandler)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,9 @@ private[spark] class Worker(
host, port, cores, Utils.megabytesToString(memory)))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
webUi.bind()
registerWithMaster()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) {
private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}

def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)

Expand Down
Loading