Skip to content

Commit

Permalink
Review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 30, 2015
1 parent f66dcc5 commit 7e289fa
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).flatMap { appInfo =>
val attempts = appInfo.attempts.filter(_.attemptId == attemptId)
attempts.headOption.map { attempt =>
appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}
import scala.collection.immutable.ListMap
import scala.collection.mutable.HashMap
import scala.collection.mutable.ArrayBuffer

private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ class HistoryServer(

private val loaderServlet = new HttpServlet {
protected override def doGet(req: HttpServletRequest, res: HttpServletResponse): Unit = {
// Parse the URI created by getAttemptURI(). It contains an app ID and an optional
// attempt ID (separated by a slash).
val parts = Option(req.getPathInfo()).getOrElse("").split("/")
if (parts.length < 2) {
res.sendError(HttpServletResponse.SC_BAD_REQUEST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ private[spark] object EventLoggingListener extends Logging {
appId: String,
appAttemptId: Option[String],
compressionCodecName: Option[String] = None): String = {
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
val base = logBaseDir.toString.stripSuffix("/") + "/" + sanitize(appId)
val codec = compressionCodecName.map("." + _).getOrElse("")
if (appAttemptId.isDefined) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,10 @@ private[spark] trait SchedulerBackend {
def applicationId(): String = appId

/**
* Get an application ID associated with the job.
* Get the attempt ID for this run, if the cluster manager supports multiple
* attempts. Applications run in client mode will not have attempt IDs.
*
* @return An application attempt id
* @return The application attempt id, if available.
*/
def applicationAttemptId(): Option[String] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,14 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
list.size should be (5)
list.count(_.attempts.head.completed) should be (3)

def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long,
user: String, completed: Boolean): ApplicationHistoryInfo = {
def makeAppInfo(
id: String,
name: String,
start: Long,
end: Long,
lastMod: Long,
user: String,
completed: Boolean): ApplicationHistoryInfo = {
ApplicationHistoryInfo(id, name,
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class ApplicationMaster(

// Propagate the attempt if, so that in case of event logging,
// different attempt's logs gets created in different directory
System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())
System.setProperty("spark.yarn.app.attemptId", appAttemptId.getAttemptId().toString())
}

logInfo("ApplicationAttemptId: " + appAttemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,19 +39,17 @@ private[spark] class YarnClusterSchedulerBackend(
}

override def applicationId(): String =
// In YARN Cluster mode, spark.yarn.app.id is expect to be set
// before user application is launched.
// So, if spark.yarn.app.id is not set, it is something wrong.
// In YARN Cluster mode, the application ID is expected to be set, so log an error if it's
// not found.
sc.getConf.getOption("spark.yarn.app.id").getOrElse {
logError("Application ID is not set.")
super.applicationId
}

override def applicationAttemptId(): Option[String] =
// In YARN Cluster mode, spark.yarn.app.attemptid is expect to be set
// before user application is launched.
// So, if spark.yarn.app.id is not set, it is something wrong.
sc.getConf.getOption("spark.yarn.app.attemptid").orElse {
// In YARN Cluster mode, the attempt ID is expected to be set, so log an error if it's
// not found.
sc.getConf.getOption("spark.yarn.app.attemptId").orElse {
logError("Application attempt ID is not set.")
super.applicationAttemptId
}
Expand Down

0 comments on commit 7e289fa

Please sign in to comment.