From 7e289fa1297aa21f6c8764e4a237eb3a674675e0 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 30 Apr 2015 15:16:26 -0700 Subject: [PATCH] Review feedback. --- .../spark/deploy/history/FsHistoryProvider.scala | 3 +-- .../apache/spark/deploy/history/HistoryPage.scala | 3 --- .../apache/spark/deploy/history/HistoryServer.scala | 2 ++ .../spark/scheduler/EventLoggingListener.scala | 1 - .../apache/spark/scheduler/SchedulerBackend.scala | 5 +++-- .../deploy/history/FsHistoryProviderSuite.scala | 10 ++++++++-- .../apache/spark/deploy/yarn/ApplicationMaster.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 12 +++++------- 8 files changed, 20 insertions(+), 18 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 2096fc70ae235..993763f3aa092 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -145,8 +145,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = { try { applications.get(appId).flatMap { appInfo => - val attempts = appInfo.attempts.filter(_.attemptId == attemptId) - attempts.headOption.map { attempt => + appInfo.attempts.find(_.attemptId == attemptId).map { attempt => val replayBus = new ReplayListenerBus() val ui = { val conf = this.conf.clone() diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index 1af233f09f385..0830cc1ba1245 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -22,9 +22,6 @@ import javax.servlet.http.HttpServletRequest import scala.xml.Node import org.apache.spark.ui.{WebUIPage, UIUtils} -import scala.collection.immutable.ListMap -import scala.collection.mutable.HashMap -import scala.collection.mutable.ArrayBuffer private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index f2883d4e2671b..754c8e9b6668b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -73,6 +73,8 @@ class HistoryServer( private val loaderServlet = new HttpServlet { protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = { + // Parse the URI created by getAttemptURI(). It contains an app ID and an optional + // attempt ID (separated by a slash). val parts = Option(req.getPathInfo()).getOrElse("").split("/") if (parts.length < 2) { res.sendError(HttpServletResponse.SC_BAD_REQUEST, diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index eba50b54fc7d8..529a5b2bf1a0d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -269,7 +269,6 @@ private[spark] object EventLoggingListener extends Logging { appId: String, appAttemptId: Option[String], compressionCodecName: Option[String] = None): String = { - val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId) val codec = compressionCodecName.map("." + _).getOrElse("") if (appAttemptId.isDefined) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 70500ccdb6212..646820520ea1b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -42,9 +42,10 @@ private[spark] trait SchedulerBackend { def applicationId(): String = appId /** - * Get an application ID associated with the job. + * Get the attempt ID for this run, if the cluster manager supports multiple + * attempts. Applications run in client mode will not have attempt IDs. * - * @return An application attempt id + * @return The application attempt id, if available. */ def applicationAttemptId(): Option[String] = None diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 469ed19f8a252..a0a0afa48833e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -108,8 +108,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers list.size should be (5) list.count(_.attempts.head.completed) should be (3) - def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long, - user: String, completed: Boolean): ApplicationHistoryInfo = { + def makeAppInfo( + id: String, + name: String, + start: Long, + end: Long, + lastMod: Long, + user: String, + completed: Boolean): ApplicationHistoryInfo = { ApplicationHistoryInfo(id, name, List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed))) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index e92e048712acf..27f804782f355 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -92,7 +92,7 @@ private[spark] class ApplicationMaster( // Propagate the attempt if, so that in case of event logging, // different attempt's logs gets created in different directory - System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString()) + System.setProperty("spark.yarn.app.attemptId", appAttemptId.getAttemptId().toString()) } logInfo("ApplicationAttemptId: " + appAttemptId) diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index 28b212146553f..aeb218a575455 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -39,19 +39,17 @@ private[spark] class YarnClusterSchedulerBackend( } override def applicationId(): String = - // In YARN Cluster mode, spark.yarn.app.id is expect to be set - // before user application is launched. - // So, if spark.yarn.app.id is not set, it is something wrong. + // In YARN Cluster mode, the application ID is expected to be set, so log an error if it's + // not found. sc.getConf.getOption("spark.yarn.app.id").getOrElse { logError("Application ID is not set.") super.applicationId } override def applicationAttemptId(): Option[String] = - // In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set - // before user application is launched. - // So, if spark.yarn.app.id is not set, it is something wrong. - sc.getConf.getOption("spark.yarn.app.attemptid").orElse { + // In YARN Cluster mode, the attempt ID is expected to be set, so log an error if it's + // not found. + sc.getConf.getOption("spark.yarn.app.attemptId").orElse { logError("Application attempt ID is not set.") super.applicationAttemptId }