Skip to content

Commit

Permalink
Misc UI, test, style fixes.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 9, 2015
1 parent cbe8bba commit ce5ee5d
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
}

private def attemptRow(
renderAttemptIdColumn: Boolean,
info: ApplicationHistoryInfo,
attempt: ApplicationAttemptInfo,
isFirst: Boolean): Seq[Node] = {
Expand All @@ -178,22 +179,27 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
{
if (isFirst) {
if (info.attempts.size > 1) {
<td rowspan={info.attempts.size.toString}><a href={uiAddress}>{info.id}</a></td>
<td rowspan={info.attempts.size.toString}><a href={uiAddress}>{info.id}</a></td> ++
<td rowspan={info.attempts.size.toString}>{attempt.name}</td>
} else {
<td><a href={uiAddress}>{info.id}</a></td>
<td><a href={uiAddress}>{info.id}</a></td> ++
<td>{attempt.name}</td>
}
} else {
new xml.Comment("")
Nil
}
}
{
if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) {
<td><a href={getAttemptURI(info.id, attempt)}>{attempt.attemptId}</a></td>
if (renderAttemptIdColumn) {
if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) {
<td><a href={getAttemptURI(info.id, attempt)}>{attempt.attemptId}</a></td>
} else {
<td>&nbsp;</td>
}
} else {
Nil
}
}
<td>{attempt.name}</td>
<td sorttable_customkey={attempt.startTime.toString}>{startTime}</td>
<td sorttable_customkey={attempt.endTime.toString}>{endTime}</td>
<td sorttable_customkey={(attempt.endTime - attempt.startTime).toString}>
Expand All @@ -204,12 +210,12 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
}

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
attemptRow(info, info.attempts.head, true)
attemptRow(false, info, info.attempts.head, true)
}

private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = {
attemptRow(info, info.attempts.head, true) ++
info.attempts.drop(1).flatMap(attemptRow(info, _, false))
attemptRow(true, info, info.attempts.head, true) ++
info.attempts.drop(1).flatMap(attemptRow(true, info, _, false))
}

private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,18 +80,23 @@ class HistoryServer(
return
}

val appId = parts(1)
val appKey =
if (parts.length == 3) {
s"${parts(1)}/${parts(2)}"
} else {
parts(1)
}

// Note we don't use the UI retrieved from the cache; the cache loader above will register
// the app's UI, and all we need to do is redirect the user to the same URI that was
// requested, and the proper data should be served at that point.
try {
appCache.get(appId)
appCache.get(appKey)
res.sendRedirect(res.encodeRedirectURL(req.getRequestURI()))
} catch {
case e: Exception => e.getCause() match {
case nsee: NoSuchElementException =>
val msg = <div class="row-fluid">Application {appId} not found.</div>
val msg = <div class="row-fluid">Application {appKey} not found.</div>
res.setStatus(HttpServletResponse.SC_NOT_FOUND)
UIUtils.basicSparkPage(msg, "Not Found").foreach(
n => res.getWriter().write(n.toString))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ private[spark] class EventLoggingListener(

import EventLoggingListener._

def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
this(appId, "", logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
def this(appId: String, appAttemptId : String, logBaseDir: URI, sparkConf: SparkConf) =
this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
Expand Down Expand Up @@ -253,9 +253,12 @@ private[spark] object EventLoggingListener extends Logging {
* we won't know which codec to use to decompress the metadata needed to open the file in
* the first place.
*
* The log file name will identify the compression codec used for the contents, if any.
* For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log.
*
* @param logBaseDir Directory where the log file will be written.
* @param appId A unique app ID.
* @param appAttemptId A unique attempt id of appId.
* @param appAttemptId A unique attempt id of appId. May be the empty string.
* @param compressionCodecName Name to identify the codec used to compress the contents
* of the log, or None if compression is not enabled.
* @return A path which consists of file-system-safe characters.
Expand All @@ -265,14 +268,20 @@ private[spark] object EventLoggingListener extends Logging {
appId: String,
appAttemptId: String,
compressionCodecName: Option[String] = None): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
if (appAttemptId.equals("")) {
logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/")
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isEmpty) {
base + codec
} else {
logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + "_" + appAttemptId
base + "_" + sanitize(appAttemptId) + codec
}
}

private def sanitize(str: String): String = {
str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
}

/**
* Opens an event log file and returns an input stream that contains the event data.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId

/**
* Process a lost executor
*/
def executorLost(executorId: String, reason: ExecutorLossReason): Unit

/**
* Get an application's attempt Id associated with the job.
* Get an application's attempt ID associated with the job.
*
* @return An application's Attempt ID
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ private[spark] object JsonProtocol {
("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
("Timestamp" -> applicationStart.time) ~
("User" -> applicationStart.sparkUser) ~
("appAttemptId" -> applicationStart.appAttemptId)
("appAttemptId" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing))
}

def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
test("Verify log file exist") {
// Verify logging directory exists
val conf = getLoggingConf(testDirPath)
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener("test", "", testDirPath.toUri(), conf)
eventLogger.start()

val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
Expand Down Expand Up @@ -140,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
val conf = getLoggingConf(testDirPath, compressionCodec)
extraConf.foreach { case (k, v) => conf.set(k, v) }
val logName = compressionCodec.map("test-" + _).getOrElse("test")
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
val eventLogger = new EventLoggingListener(logName, "", testDirPath.toUri(), conf)
val listenerBus = new LiveListenerBus
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
125L, "Mickey", None)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,9 @@ private[spark] class ApplicationMaster(
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

// Propagate the attempt if, so that in case of event logging,
// different attempt's logs gets created in different directory
System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())

// Propagate the attempt if, so that in case of event logging,
// different attempt's logs gets created in different directory
System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
}

logInfo("ApplicationAttemptId: " + appAttemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application ID is not set.")
super.applicationId
}

override def applicationAttemptId(): String =
// In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set
// before user application is launched.
Expand Down

0 comments on commit ce5ee5d

Please sign in to comment.