Skip to content

Commit

Permalink
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
Browse files Browse the repository at this point in the history
…ng-web-ui

Conflicts:
	core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
	core/src/main/scala/org/apache/spark/ui/SparkUI.scala
	core/src/main/scala/org/apache/spark/ui/storage/IndexPage.scala
  • Loading branch information
tdas committed Apr 10, 2014
2 parents 3e986f8 + 168fe86 commit 827e81a
Show file tree
Hide file tree
Showing 29 changed files with 736 additions and 576 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,8 @@ class SparkContext(config: SparkConf) extends Logging {

// Initialize the Spark UI, registering all associated listeners
private[spark] val ui = new SparkUI(this)
ui.bind()
ui.start()
ui.bind()

// Optionally log Spark events
private[spark] val eventLogger: Option[EventLoggingListener] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ private[spark] class Master(
logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi.start()
webUi.bind()
masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,17 @@ import org.json4s.JValue
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class ApplicationPage(parent: MasterWebUI)
extends UIPage("app", includeJson = true) {

private val master = parent.masterActorRef
private val timeout = parent.timeout

/** Executor details for a particular application */
def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand All @@ -47,7 +49,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
}

/** Executor details for a particular application */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down Expand Up @@ -96,7 +98,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Application: " + app.desc.name)
}

def executorRow(executor: ExecutorInfo): Seq[Node] = {
private def executorRow(executor: ExecutorInfo): Seq[Node] = {
<tr>
<td>{executor.id}</td>
<td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,24 @@ import scala.xml.Node
import akka.pattern.ask
import org.json4s.JValue

import org.apache.spark.deploy.{JsonProtocol}
import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, RequestMasterState}
import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUI, UIUtils}
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) {
val master = parent.masterActorRef
val timeout = parent.timeout
private[spark] class IndexPage(parent: MasterWebUI) extends UIPage("", includeJson = true) {
private val master = parent.masterActorRef
private val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
JsonProtocol.writeMasterState(state)
}

/** Index view listing applications and executors */
def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)

Expand Down Expand Up @@ -139,7 +139,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}

def workerRow(worker: WorkerInfo): Seq[Node] = {
private def workerRow(worker: WorkerInfo): Seq[Node] = {
<tr>
<td>
<a href={worker.webUiAddress}>{worker.id}</a>
Expand All @@ -154,8 +154,7 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</tr>
}


def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
Expand All @@ -169,14 +168,14 @@ private[spark] class IndexPage(parent: MasterWebUI) {
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
</td>
<td>{WebUI.formatDate(app.submitDate)}</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
<td>{app.state.toString}</td>
<td>{WebUI.formatDuration(app.duration)}</td>
<td>{UIUtils.formatDuration(app.duration)}</td>
</tr>
}

def driverRow(driver: DriverInfo): Seq[Node] = {
private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
<td>{driver.submitDate}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,86 +17,55 @@

package org.apache.spark.deploy.master.ui

import javax.servlet.http.HttpServletRequest

import org.eclipse.jetty.servlet.ServletContextHandler

import org.apache.spark.Logging
import org.apache.spark.deploy.master.Master
import org.apache.spark.ui.{ServerInfo, SparkUI}
import org.apache.spark.ui.{SparkUI, WebUI}
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.util.{AkkaUtils, Utils}

/**
* Web UI server for the standalone master.
*/
private[spark]
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)
class MasterWebUI(val master: Master, requestedPort: Int)
extends WebUI(master.securityMgr) with Logging {

private val host = Utils.localHostName()
private val port = requestedPort
private val applicationPage = new ApplicationPage(this)
private val indexPage = new IndexPage(this)
private var serverInfo: Option[ServerInfo] = None
val masterActorRef = master.self
val timeout = AkkaUtils.askTimeout(master.conf)

private val handlers: Seq[ServletContextHandler] = {
master.masterMetricsSystem.getServletHandlers ++
master.applicationMetricsSystem.getServletHandlers ++
Seq[ServletContextHandler](
createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"),
createServletHandler("/app/json",
(request: HttpServletRequest) => applicationPage.renderJson(request), master.securityMgr),
createServletHandler("/app",
(request: HttpServletRequest) => applicationPage.render(request), master.securityMgr),
createServletHandler("/json",
(request: HttpServletRequest) => indexPage.renderJson(request), master.securityMgr),
createServletHandler("/",
(request: HttpServletRequest) => indexPage.render(request), master.securityMgr)
)
/** Initialize all components of the server. */
def start() {
attachPage(new ApplicationPage(this))
attachPage(new IndexPage(this))
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
}

/** Bind to the HTTP server behind this web interface. */
def bind() {
try {
serverInfo = Some(startJettyServer("0.0.0.0", port, handlers, master.conf))
logInfo("Started Master web UI at http://%s:%d".format(host, boundPort))
} catch {
case e: Exception =>
logError("Failed to create Master JettyUtils", e)
logError("Failed to create Master web UI", e)
System.exit(1)
}
}

def boundPort: Int = serverInfo.map(_.boundPort).getOrElse(-1)

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
rootHandler.addHandler(handler)
if (!handler.isStarted) {
handler.start()
}
}
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
val rootHandler = serverInfo.get.rootHandler
for (handler <- ui.handlers) {
if (handler.isStarted) {
handler.stop()
}
rootHandler.removeHandler(handler)
}
}

def stop() {
assert(serverInfo.isDefined, "Attempted to stop a Master UI that was not bound to a server!")
serverInfo.get.server.stop()
ui.getHandlers.foreach(detachHandler)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,9 @@ private[spark] class Worker(
host, port, cores, Utils.megabytesToString(memory)))
logInfo("Spark home: " + sparkHome)
createWorkDir()
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
webUi.bind()
registerWithMaster()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,21 +28,21 @@ import org.apache.spark.deploy.JsonProtocol
import org.apache.spark.deploy.DeployMessages.{RequestWorkerState, WorkerStateResponse}
import org.apache.spark.deploy.master.DriverState
import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.UIUtils
import org.apache.spark.ui.{UIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) {
private[spark] class IndexPage(parent: WorkerWebUI) extends UIPage("", includeJson = true) {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout

def renderJson(request: HttpServletRequest): JValue = {
override def renderJson(request: HttpServletRequest): JValue = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)
JsonProtocol.writeWorkerState(workerState)
}

def render(request: HttpServletRequest): Seq[Node] = {
override def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)

Expand Down
Loading

0 comments on commit 827e81a

Please sign in to comment.