From ce5ee5de4206d74491ecd13411ca2947a81ab4e1 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 8 Apr 2015 16:57:21 -0700 Subject: [PATCH] Misc UI, test, style fixes. --- .../spark/deploy/history/HistoryPage.scala | 24 ++++++++++++------- .../spark/deploy/history/HistoryServer.scala | 11 ++++++--- .../scheduler/EventLoggingListener.scala | 23 ++++++++++++------ .../spark/scheduler/TaskScheduler.scala | 4 ++-- .../org/apache/spark/util/JsonProtocol.scala | 2 +- .../scheduler/EventLoggingListenerSuite.scala | 4 ++-- .../spark/deploy/yarn/ApplicationMaster.scala | 7 +++--- .../cluster/YarnClusterSchedulerBackend.scala | 2 +- 8 files changed, 48 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index e51256c40122b..b6da1e09444fc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -160,6 +160,7 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } private def attemptRow( + renderAttemptIdColumn: Boolean, info: ApplicationHistoryInfo, attempt: ApplicationAttemptInfo, isFirst: Boolean): Seq[Node] = { @@ -178,22 +179,27 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { if (isFirst) { if (info.attempts.size > 1) { - {info.id} + {info.id} ++ + {attempt.name} } else { - {info.id} + {info.id} ++ + {attempt.name} } } else { - new xml.Comment("") + Nil } } { - if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) { - {attempt.attemptId} + if (renderAttemptIdColumn) { + if (info.attempts.size > 1 && !attempt.attemptId.isEmpty) { + {attempt.attemptId} + } else { +   + } } else { Nil } } - {attempt.name} {startTime} {endTime} @@ -204,12 +210,12 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") } private def appRow(info: ApplicationHistoryInfo): Seq[Node] = { - attemptRow(info, info.attempts.head, true) + attemptRow(false, info, info.attempts.head, true) } private def appWithAttemptRow(info: ApplicationHistoryInfo): Seq[Node] = { - attemptRow(info, info.attempts.head, true) ++ - info.attempts.drop(1).flatMap(attemptRow(info, _, false)) + attemptRow(true, info, info.attempts.head, true) ++ + info.attempts.drop(1).flatMap(attemptRow(true, info, _, false)) } private def makePageLink(linkPage: Int, showIncomplete: Boolean): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index b7b4355351619..0761d2846b854 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -80,18 +80,23 @@ class HistoryServer( return } - val appId = parts(1) + val appKey = + if (parts.length == 3) { + s"${parts(1)}/${parts(2)}" + } else { + parts(1) + } // Note we don't use the UI retrieved from the cache; the cache loader above will register // the app's UI, and all we need to do is redirect the user to the same URI that was // requested, and the proper data should be served at that point. try { - appCache.get(appId) + appCache.get(appKey) res.sendRedirect(res.encodeRedirectURL(req.getRequestURI())) } catch { case e: Exception => e.getCause() match { case nsee: NoSuchElementException => - val msg =
Application {appId} not found.
+ val msg =
Application {appKey} not found.
res.setStatus(HttpServletResponse.SC_NOT_FOUND) UIUtils.basicSparkPage(msg, "Not Found").foreach( n => res.getWriter().write(n.toString)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index c75c7a53afefa..238694324c19f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -55,8 +55,8 @@ private[spark] class EventLoggingListener( import EventLoggingListener._ - def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) = - this(appId, "", logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) + def this(appId: String, appAttemptId : String, logBaseDir: URI, sparkConf: SparkConf) = + this(appId, appAttemptId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf)) private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false) private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false) @@ -253,9 +253,12 @@ private[spark] object EventLoggingListener extends Logging { * we won't know which codec to use to decompress the metadata needed to open the file in * the first place. * + * The log file name will identify the compression codec used for the contents, if any. + * For example, app_123 for an uncompressed log, app_123.lzf for an LZF-compressed log. + * * @param logBaseDir Directory where the log file will be written. * @param appId A unique app ID. - * @param appAttemptId A unique attempt id of appId. + * @param appAttemptId A unique attempt id of appId. May be the empty string. * @param compressionCodecName Name to identify the codec used to compress the contents * of the log, or None if compression is not enabled. * @return A path which consists of file-system-safe characters. @@ -265,14 +268,20 @@ private[spark] object EventLoggingListener extends Logging { appId: String, appAttemptId: String, compressionCodecName: Option[String] = None): String = { - val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase - if (appAttemptId.equals("")) { - logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase + val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) + val codec = compressionCodecName.map("." + _).getOrElse("") + if (appAttemptId.isEmpty) { + base + codec } else { - logBaseDir.toString.stripSuffix("/") + "/" + name.stripSuffix("/") + "_" + appAttemptId + base + "_" + sanitize(appAttemptId) + codec } } + private def sanitize(str: String): String = { + str.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase + } + /** * Opens an event log file and returns an input stream that contains the event data. * diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala index d612409c81b6f..9a64629eeb885 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala @@ -73,14 +73,14 @@ private[spark] trait TaskScheduler { * @return An application ID */ def applicationId(): String = appId - + /** * Process a lost executor */ def executorLost(executorId: String, reason: ExecutorLossReason): Unit /** - * Get an application's attempt Id associated with the job. + * Get an application's attempt ID associated with the job. * * @return An application's Attempt ID */ diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 23c18edf70b25..0a877a78714b2 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -195,7 +195,7 @@ private[spark] object JsonProtocol { ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~ ("Timestamp" -> applicationStart.time) ~ ("User" -> applicationStart.sparkUser) ~ - ("appAttemptId" -> applicationStart.appAttemptId) + ("appAttemptId" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) } def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = { diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala index e9b59a0041f52..1d6c5c868e8f4 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala @@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef test("Verify log file exist") { // Verify logging directory exists val conf = getLoggingConf(testDirPath) - val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf) + val eventLogger = new EventLoggingListener("test", "", testDirPath.toUri(), conf) eventLogger.start() val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS) @@ -140,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef val conf = getLoggingConf(testDirPath, compressionCodec) extraConf.foreach { case (k, v) => conf.set(k, v) } val logName = compressionCodec.map("test-" + _).getOrElse("test") - val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf) + val eventLogger = new EventLoggingListener(logName, "", testDirPath.toUri(), conf) val listenerBus = new LiveListenerBus val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 3cc2a9f5570b1..e14a70d353a3f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -91,10 +91,9 @@ private[spark] class ApplicationMaster( // Propagate the application ID so that YarnClusterSchedulerBackend can pick it up. System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString()) - // Propagate the attempt if, so that in case of event logging, - // different attempt's logs gets created in different directory - System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString()) - + // Propagate the attempt if, so that in case of event logging, + // different attempt's logs gets created in different directory + System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString()) } logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index f20f4dcb00d64..f022e8296fa66 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -46,7 +46,7 @@ private[spark] class YarnClusterSchedulerBackend( logError("Application ID is not set.") super.applicationId } - + override def applicationAttemptId(): String = // In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set // before user application is launched.