Skip to content

Commit

Permalink
Styling and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Mar 22, 2014
1 parent 5dbfbb4 commit d5154da
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,25 +33,28 @@ import org.apache.spark.util.Utils
import org.apache.spark.scheduler.{ApplicationListener, ReplayListenerBus}

/**
* A web server that re-renders SparkUIs of finished applications.
* A web server that renders SparkUIs of finished applications.
*
* For the standalone mode, MasterWebUI already achieves this functionality. Thus, the
* main use case of the HistoryServer is in other deploy modes (e.g. Yarn or Mesos).
*
* The logging directory structure is as follows: Within the given base directory, each
* application's event logs are maintained in the application's own sub-directory.
* application's event logs are maintained in the application's own sub-directory. This
* is the same structure as maintained in the event log write code path in
* EventLoggingListener.
*
* @param baseLogDir The base directory in which event logs are found
* @param requestedPort The requested port to which this server is to be bound
*/
class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
class HistoryServer(val baseLogDir: String, requestedPort: Int)
extends SparkUIContainer("History Server") with Logging {

private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
private val host = Option(System.getenv("SPARK_PUBLIC_DNS")).getOrElse(Utils.localHostName())
private val port = requestedPort
private val indexPage = new IndexPage(this)
private val fileSystem = Utils.getHadoopFileSystem(new URI(baseLogDir))
private val conf = new SparkConf
private val securityManager = new SecurityManager(conf)
private val indexPage = new IndexPage(this)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheck = -1L
Expand Down Expand Up @@ -84,8 +87,8 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
* from the application's event logs, attaches this UI to itself, and stores metadata
* information for this application.
*
* If the logs for an existing finished application are no longer found, remove all
* associated information and detach the SparkUI.
* If the logs for an existing finished application are no longer found, the server
* removes all associated information and detaches the SparkUI.
*/
def checkForLogs() {
if (logCheckReady) {
Expand Down Expand Up @@ -137,7 +140,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
if (success) {
attachUI(ui)
val appName = if (appListener.applicationStarted) appListener.appName else appId
ui.setAppName("%s (history)".format(appName))
ui.setAppName("%s (finished)".format(appName))
val startTime = appListener.startTime
val endTime = appListener.endTime
val info = ApplicationHistoryInfo(appName, startTime, endTime, lastUpdated, logPath, ui)
Expand All @@ -155,7 +158,7 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
/** Return the address of this server. */
def getAddress = "http://" + host + ":" + boundPort

/** Return when this directory is last modified. */
/** Return when this directory was last modified. */
private def getModificationTime(dir: FileStatus): Long = {
val logFiles = fileSystem.listStatus(dir.getPath)
if (logFiles != null) {
Expand All @@ -171,20 +174,30 @@ class HistoryServer(val baseLogDir: String, requestedPort: Int, conf: SparkConf)
}
}

/**
* The recommended way of starting and stopping a HistoryServer is through the scripts
* start-history-server.sh and stop-history-server.sh. The path to a base log directory
* is must be specified, while the requested UI port is optional. For example:
*
* ./sbin/spark-history-server.sh /tmp/spark-events 18080
* ./sbin/spark-history-server.sh hdfs://1.2.3.4:9000/spark-events
*
* This launches the HistoryServer as a Spark daemon.
*/
object HistoryServer {
val STATIC_RESOURCE_DIR = SparkUI.STATIC_RESOURCE_DIR

// Minimum interval between each check for logs, which requires a disk access
val UPDATE_INTERVAL_SECONDS = 5

def main(argStrings: Array[String]) {
val conf = new SparkConf
val args = new HistoryServerArguments(argStrings, conf)
val server = new HistoryServer(args.logDir, args.port, conf)
val args = new HistoryServerArguments(argStrings)
val server = new HistoryServer(args.logDir, args.port)
server.bind()

// Wait until the end of the world... or if the HistoryServer process is manually stopped
while(true) { Thread.sleep(Int.MaxValue) }
server.stop()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,20 @@ package org.apache.spark.deploy.history

import java.net.URI

import org.apache.spark.SparkConf
import org.apache.spark.util.{Utils, IntParam}
import org.apache.hadoop.fs.Path

import org.apache.spark.util.{IntParam, Utils}

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf) {
private[spark] class HistoryServerArguments(args: Array[String]) {
var port = 18080
var logDir = ""

parse(args.toList)

def parse(args: List[String]): Unit = {
private def parse(args: List[String]): Unit = {
args match {
case ("--port" | "-p") :: IntParam(value) :: tail =>
port = value
Expand All @@ -53,7 +53,7 @@ private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf
validateLogDir()
}

def validateLogDir() {
private def validateLogDir() {
if (logDir == "") {
System.err.println("Logging directory must be specified.")
printUsageAndExit(1)
Expand All @@ -66,10 +66,7 @@ private[spark] class HistoryServerArguments(args: Array[String], conf: SparkConf
}
}

/**
* Print usage and exit JVM with the given exit code.
*/
def printUsageAndExit(exitCode: Int) {
private def printUsageAndExit(exitCode: Int) {
System.err.println(
"Usage: HistoryServer [options]\n" +
"\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,13 @@

package org.apache.spark.deploy.history

import java.text.SimpleDateFormat
import java.util.Date
import javax.servlet.http.HttpServletRequest

import scala.xml.Node

import org.apache.spark.ui.{UIUtils, WebUI}

private[spark] class IndexPage(parent: HistoryServer) {
private val dateFmt = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss")

def render(request: HttpServletRequest): Seq[Node] = {
parent.checkForLogs()
Expand Down Expand Up @@ -59,12 +56,12 @@ private[spark] class IndexPage(parent: HistoryServer) {
private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val appName = if (info.started) info.name else parent.getAppId(info.logPath)
val uiAddress = parent.getAddress + info.ui.basePath
val startTime = if (info.started) dateFmt.format(new Date(info.startTime)) else "Not started"
val endTime = if (info.finished) dateFmt.format(new Date(info.endTime)) else "Not finished"
val startTime = if (info.started) WebUI.formatDate(info.startTime) else "Not started"
val endTime = if (info.finished) WebUI.formatDate(info.endTime) else "Not finished"
val difference = if (info.started && info.finished) info.endTime - info.startTime else -1L
val duration = if (difference > 0) WebUI.formatDuration(difference) else "---"
val logDirectory = parent.getAppId(info.logPath)
val lastUpdated = dateFmt.format(new Date(info.lastUpdated))
val lastUpdated = WebUI.formatDate(info.lastUpdated)
<tr>
<td><a href={uiAddress}>{appName}</a></td>
<td>{startTime}</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ private[spark] class ReplayListenerBus(logDir: String) extends SparkListenerBus

/** If a compression codec is specified, wrap the given stream in a compression stream. */
private def wrapForCompression(stream: InputStream): InputStream = {
compressionCodec.map { codec => codec.compressedInputStream(stream) }.getOrElse(stream)
compressionCodec.map(_.compressedInputStream(stream)).getOrElse(stream)
}

/** Return a list of paths representing files found in the given directory. */
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/WebUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,6 @@ private[spark] object WebUI {
return "%.0f min".format(minutes)
}
val hours = minutes / 60
return "%.1f h".format(hours)
"%.1f h".format(hours)
}
}

0 comments on commit d5154da

Please sign in to comment.