Skip to content

Commit

Permalink
Merge branch 'streaming-web-ui' of github.com:tdas/spark into streami…
Browse files Browse the repository at this point in the history
…ng-web-ui
  • Loading branch information
tdas committed Apr 11, 2014
2 parents e038b4b + 252c566 commit 34bb364
Show file tree
Hide file tree
Showing 21 changed files with 36 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}

private[spark] class IndexPage(parent: HistoryServer) extends WebUIPage("") {

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val appRows = parent.appIdToInfo.values.toSeq.sortBy { app => -app.lastUpdated }
val appTable = UIUtils.listingTable(appHeader, appRow, appRows)
val content =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ class HistoryServer(
// Remove any applications that should no longer be retained
appIdToInfo.foreach { case (appId, info) =>
if (!retainedAppIds.contains(appId)) {
detachUI(info.ui)
detachSparkUI(info.ui)
appIdToInfo.remove(appId)
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ class HistoryServer(
// Do not call ui.bind() to avoid creating a new server for each application
replayBus.replay()
if (appListener.applicationStarted) {
attachUI(ui)
attachSparkUI(ui)
val appName = appListener.appName
val sparkUser = appListener.sparkUser
val startTime = appListener.startTime
Expand All @@ -193,13 +193,13 @@ class HistoryServer(
}

/** Attach a reconstructed UI to this server. Only valid after bind(). */
private def attachUI(ui: SparkUI) {
private def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this server. Only valid after bind(). */
private def detachUI(ui: SparkUI) {
private def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "HistoryServer must be bound before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ private[spark] class Master(
if (completedApps.size >= RETAINED_APPLICATIONS) {
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
completedApps.take(toRemove).foreach( a => {
appIdToUI.remove(a.id).foreach { ui => webUi.detachUI(ui) }
appIdToUI.remove(a.id).foreach { ui => webUi.detachSparkUI(ui) }
applicationMetricsSystem.removeSource(a.appSource)
})
completedApps.trimStart(toRemove)
Expand Down Expand Up @@ -672,7 +672,7 @@ private[spark] class Master(
replayBus.replay()
app.desc.appUiUrl = ui.basePath
appIdToUI(app.id) = ui
webUi.attachUI(ui)
webUi.attachSparkUI(ui)
return true
} catch {
case t: Throwable =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ import org.apache.spark.deploy.master.ExecutorInfo
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class ApplicationPage(parent: MasterWebUI)
extends WebUIPage("app", includeJson = true) {
private[spark] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") {

private val master = parent.masterActorRef
private val timeout = parent.timeout
Expand All @@ -49,7 +48,7 @@ private[spark] class ApplicationPage(parent: MasterWebUI)
}

/** Executor details for a particular application */
override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val appId = request.getParameter("appId")
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.master.{ApplicationInfo, DriverInfo, WorkerInfo}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includeJson = true) {
private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("") {
private val master = parent.masterActorRef
private val timeout = parent.timeout

Expand All @@ -42,7 +42,7 @@ private[spark] class IndexPage(parent: MasterWebUI) extends WebUIPage("", includ
}

/** Index view listing applications and executors */
override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (master ? RequestMasterState)(timeout).mapTo[MasterStateResponse]
val state = Await.result(stateFuture, timeout)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ class MasterWebUI(val master: Master, requestedPort: Int)
}

/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
def attachUI(ui: SparkUI) {
def attachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before attaching SparkUIs")
ui.getHandlers.foreach(attachHandler)
}

/** Detach a reconstructed UI from this Master UI. Only valid after bind(). */
def detachUI(ui: SparkUI) {
def detachSparkUI(ui: SparkUI) {
assert(serverInfo.isDefined, "Master UI must be bound to a server before detaching SparkUIs")
ui.getHandlers.foreach(detachHandler)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
pre + Utils.offsetBytes(path, startByte, endByte)
}

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val defaultBytes = 100 * 1024
val appId = Option(request.getParameter("appId"))
val executorId = Option(request.getParameter("executorId"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.deploy.worker.{DriverRunner, ExecutorRunner}
import org.apache.spark.ui.{WebUIPage, UIUtils}
import org.apache.spark.util.Utils

private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includeJson = true) {
private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("") {
val workerActor = parent.worker.self
val worker = parent.worker
val timeout = parent.timeout
Expand All @@ -42,7 +42,7 @@ private[spark] class IndexPage(parent: WorkerWebUI) extends WebUIPage("", includ
JsonProtocol.writeWorkerState(workerState)
}

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val stateFuture = (workerActor ? RequestWorkerState)(timeout).mapTo[WorkerStateResponse]
val workerState = Await.result(stateFuture, timeout)

Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/ui/JettyUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ private[spark] object JettyUtils extends Logging {
/** Create a handler for serving files from a static directory */
def createStaticHandler(resourceBase: String, path: String): ServletContextHandler = {
val contextHandler = new ServletContextHandler
contextHandler.setInitParameter("org.eclipse.jetty.servlet.Default.gzip", "false")
val staticHandler = new DefaultServlet
val holder = new ServletHolder(staticHandler)
Option(getClass.getClassLoader.getResource(resourceBase)) match {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.ui.jobs.JobProgressTab
import org.apache.spark.ui.storage.StorageTab

/**
* Top level user interface for Spark.
* Top level user interface for a Spark application.
*/
private[spark] class SparkUI(
val sc: SparkContext,
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ private[spark] object UIUtils extends Logging {
tableClass += " table-fixed"
}
val colWidth = 100.toDouble / headers.size
val colWidthAttr =if (fixedWidth) colWidth + "%" else ""
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
val headerRow: Seq[Node] = {
// if none of the headers have "\n" in them
if (headers.forall(!_.contains("\n"))) {
Expand Down
16 changes: 8 additions & 8 deletions core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,8 @@ private[spark] abstract class WebUI(
val pagePath = "/" + page.prefix
attachHandler(createServletHandler(pagePath,
(request: HttpServletRequest) => page.render(request), securityManager, basePath))
if (page.includeJson) {
attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
}
attachHandler(createServletHandler(pagePath.stripSuffix("/") + "/json",
(request: HttpServletRequest) => page.renderJson(request), securityManager, basePath))
}

/** Attach a handler to this UI. */
Expand Down Expand Up @@ -121,6 +119,7 @@ private[spark] abstract class WebUI(

/**
* A tab that represents a collection of pages.
* The prefix is appended to the parent address to form a full path, and must not contain slashes.
*/
private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
val pages = ArrayBuffer[WebUIPage]()
Expand All @@ -141,10 +140,11 @@ private[spark] abstract class WebUITab(parent: WebUI, val prefix: String) {
* A page that represents the leaf node in the UI hierarchy.
*
* The direct parent of a WebUIPage is not specified as it can be either a WebUI or a WebUITab.
* If includeJson is true, the parent WebUI (direct or indirect) creates handlers for both the
* HTML and the JSON content, rather than just the former.
* If the parent is a WebUI, the prefix is appended to the parent's address to form a full path.
* Else, if the parent is a WebUITab, the prefix is appended to the super prefix of the parent
* to form a relative path. The prefix must not contain slashes.
*/
private[spark] abstract class WebUIPage(var prefix: String, val includeJson: Boolean = false) {
def render(request: HttpServletRequest): Seq[Node] = Seq[Node]()
private[spark] abstract class WebUIPage(var prefix: String) {
def render(request: HttpServletRequest): Seq[Node]
def renderJson(request: HttpServletRequest): JValue = JNothing
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[ui] class IndexPage(parent: EnvironmentTab) extends WebUIPage("") {
private val basePath = parent.basePath
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val runtimeInformationTable = UIUtils.listingTable(
propertyHeader, jvmRow, listener.jvmInformation, fixedWidth = true)
val sparkPropertiesTable = UIUtils.listingTable(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ private[ui] class IndexPage(parent: ExecutorsTab) extends WebUIPage("") {
private val basePath = parent.basePath
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val maxMem = storageStatusList.map(_.maxMem).fold(0L)(_ + _)
val memUsed = storageStatusList.map(_.memUsed()).fold(0L)(_ + _)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ private[ui] class IndexPage(parent: JobProgressTab) extends WebUIPage("") {
private val listener = parent.listener
private lazy val isFairScheduler = parent.isFairScheduler

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val activeStages = listener.activeStages.values.toSeq
val completedStages = listener.completedStages.reverse.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
private val sc = parent.sc
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val poolName = request.getParameter("poolname")
val poolToActiveStages = listener.poolToActiveStages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
private val basePath = parent.basePath
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
listener.synchronized {
val stageId = request.getParameter("id").toInt

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[ui] class RddPage(parent: StorageTab) extends WebUIPage("rdd") {
private val basePath = parent.basePath
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val rddId = request.getParameter("id").toInt
val storageStatusList = listener.storageStatusList
val rddInfo = listener.rddInfoList.find(_.id == rddId).getOrElse {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[ui] class IndexPage(parent: StorageTab) extends WebUIPage("") {
private val basePath = parent.basePath
private val listener = parent.listener

override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val rdds = listener.rddInfoList
val content = UIUtils.listingTable(rddHeader, rddRow, rdds)
UIUtils.headerSparkPage(content, basePath, appName, "Storage ", parent.headerTabs, parent)
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/ui/UISuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class UISuite extends FunSuite {

val newTab = new WebUITab(sparkUI, "foo") {
attachPage(new WebUIPage("") {
override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
<b>"html magic"</b>
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
private val emptyCell = "-"

/** Render the page */
override def render(request: HttpServletRequest): Seq[Node] = {
def render(request: HttpServletRequest): Seq[Node] = {
val content =
generateBasicStats() ++
<br></br><h4>Statistics over last {listener.retainedCompletedBatches.size} processed batches</h4> ++
Expand Down

0 comments on commit 34bb364

Please sign in to comment.