Skip to content

Commit

Permalink
[SPARK-4705] Handle multiple app attempts event logs, history server.
Browse files Browse the repository at this point in the history
This change modifies the event logging listener to write the logs for different application
attempts to different files. The attempt ID is set by the scheduler backend, so as long
as the backend returns that ID to SparkContext, things should work. Currently, the
YARN backend does that.

The history server was also modified to model multiple attempts per application. Each
attempt has its own UI and a separate row in the listing table, so that users can look at
all the attempts separately. The UI "adapts" itself to avoid showing attempt-specific info
when all the applications being shown have a single attempt.

Author: Marcelo Vanzin <vanzin@cloudera.com>
Author: twinkle sachdeva <twinkle@kite.ggn.in.guavus.com>
Author: twinkle.sachdeva <twinkle.sachdeva@guavus.com>
Author: twinkle sachdeva <twinkle.sachdeva@guavus.com>

Closes apache#5432 from vanzin/SPARK-4705 and squashes the following commits:

7e289fa [Marcelo Vanzin] Review feedback.
f66dcc5 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
bc885b7 [Marcelo Vanzin] Review feedback.
76a3651 [Marcelo Vanzin] Fix log cleaner, add test.
7c381ec [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
1aa309d [Marcelo Vanzin] Improve sorting of app attempts.
2ad77e7 [Marcelo Vanzin] Missed a reference to the old property name.
9d59d92 [Marcelo Vanzin] Scalastyle...
d5a9c37 [Marcelo Vanzin] Update JsonProtocol test, make property name consistent.
ba34b69 [Marcelo Vanzin] Use Option[String] for attempt id.
f1cb9b3 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
c14ec19 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
9092d39 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
86de638 [Marcelo Vanzin] Merge branch 'master' into SPARK-4705
07446c6 [Marcelo Vanzin] Disable striping for app id / name when multiple attempts exist.
9092af5 [Marcelo Vanzin] Fix HistoryServer test.
3a14503 [Marcelo Vanzin] Argh scalastyle.
657ec18 [Marcelo Vanzin] Fix yarn history URL, app links.
c3e0a82 [Marcelo Vanzin] Move app name to app info, more UI fixes.
ce5ee5d [Marcelo Vanzin] Misc UI, test, style fixes.
cbe8bba [Marcelo Vanzin] Attempt ID in listener event should be an option.
88b1de8 [Marcelo Vanzin] Add a test for apps with multiple attempts.
3245aa2 [Marcelo Vanzin] Make app attempts part of the history server model.
5fd5c6f [Marcelo Vanzin] Fix my broken rebase.
318525a [twinkle.sachdeva] SPARK-4705: 1) moved from directory structure to single file, as per the master branch. 2) Added the attempt id inside the SparkListenerApplicationStart, to make the info available independent of directory structure. 3) Changes in History Server to render the UI as per the snaphot II
6b2e521 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this
4c1fc26 [twinkle sachdeva] SPARK-4705 Incorporating the review comments regarding formatting, will do the rest of the changes after this
0eb7722 [twinkle sachdeva] SPARK-4705: Doing cherry-pick of fix into master
  • Loading branch information
Marcelo Vanzin authored and jeanlyn committed May 28, 2015
1 parent 6891c0e commit b5ea7d5
Show file tree
Hide file tree
Showing 21 changed files with 546 additions and 201 deletions.
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private var _heartbeatReceiver: RpcEndpointRef = _
@volatile private var _dagScheduler: DAGScheduler = _
private var _applicationId: String = _
private var _applicationAttemptId: Option[String] = None
private var _eventLogger: Option[EventLoggingListener] = None
private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
private var _cleaner: Option[ContextCleaner] = None
Expand Down Expand Up @@ -315,6 +316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null

Expand Down Expand Up @@ -472,6 +474,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_taskScheduler.start()

_applicationId = _taskScheduler.applicationId()
_applicationAttemptId = taskScheduler.applicationAttemptId()
_conf.set("spark.app.id", _applicationId)
_env.blockManager.initialize(_applicationId)

Expand All @@ -484,7 +487,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_eventLogger =
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
_conf, _hadoopConfiguration)
logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down Expand Up @@ -1868,7 +1872,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
// Note: this code assumes that the task scheduler has been initialized and has contacted
// the cluster manager to get an application ID (in case the cluster manager provides one).
listenerBus.post(SparkListenerApplicationStart(appName, Some(applicationId),
startTime, sparkUser))
startTime, sparkUser, applicationAttemptId))
}

/** Post the application end event */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@ package org.apache.spark.deploy.history

import org.apache.spark.ui.SparkUI

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
private[history] case class ApplicationAttemptInfo(
attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = false)

private[history] case class ApplicationHistoryInfo(
id: String,
name: String,
attempts: List[ApplicationAttemptInfo])

private[history] abstract class ApplicationHistoryProvider {

/**
Expand All @@ -41,9 +45,10 @@ private[history] abstract class ApplicationHistoryProvider {
* Returns the Spark UI for a specific application.
*
* @param appId The application ID.
* @param attemptId The application attempt ID (or None if there is no attempt ID).
* @return The application's UI, or None if application is not found.
*/
def getAppUI(appId: String): Option[SparkUI]
def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI]

/**
* Called when the server is shutting down.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,20 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}

/**
* A class that provides application history from event logs stored in the file system.
* This provider checks for new finished applications in the background periodically and
* renders the history application UI by parsing the associated event logs.
*/
private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
with Logging {
private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
extends ApplicationHistoryProvider with Logging {

def this(conf: SparkConf) = {
this(conf, new SystemClock())
}

import FsHistoryProvider._

Expand Down Expand Up @@ -75,8 +79,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
@volatile private var applications: mutable.LinkedHashMap[String, FsApplicationHistoryInfo]
= new mutable.LinkedHashMap()

// List of applications to be deleted by event log cleaner.
private var appsToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
Expand Down Expand Up @@ -138,31 +142,33 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis

override def getListing(): Iterable[FsApplicationHistoryInfo] = applications.values

override def getAppUI(appId: String): Option[SparkUI] = {
override def getAppUI(appId: String, attemptId: Option[String]): Option[SparkUI] = {
try {
applications.get(appId).map { info =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}
applications.get(appId).flatMap { appInfo =>
appInfo.attempts.find(_.attemptId == attemptId).map { attempt =>
val replayBus = new ReplayListenerBus()
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
HistoryServer.getAttemptURI(appId, attempt.attemptId))
// Do not call ui.bind() to avoid creating a new server for each application
}

val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, info.logPath)), replayBus)
val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(appInfo.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}
}
} catch {
case e: FileNotFoundException => None
Expand Down Expand Up @@ -220,7 +226,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private def mergeApplicationListing(logs: Seq[FileStatus]): Unit = {
val bus = new ReplayListenerBus()
val newApps = logs.flatMap { fileStatus =>
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Expand All @@ -232,76 +238,104 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
e)
None
}
}.toSeq.sortWith(compareAppInfo)

// When there are new logs, merge the new list with the existing one, maintaining
// the expected ordering (descending end time). Maintaining the order is important
// to avoid having to sort the list every time there is a request for the log list.
if (newApps.nonEmpty) {
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id) ||
mergedApps(info.id).logPath.endsWith(EventLoggingListener.IN_PROGRESS) &&
!info.logPath.endsWith(EventLoggingListener.IN_PROGRESS)) {
mergedApps += (info.id -> info)
}
}
}

val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
if (newAttempts.isEmpty) {
return
}

// Build a map containing all apps that contain new attempts. The app information in this map
// contains both the new app attempt, and those that were already loaded in the existing apps
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
newAttempts.foreach { attempt =>
val appInfo = newAppMap.get(attempt.appId)
.orElse(applications.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
new FsApplicationHistoryInfo(attempt.appId, attempt.name,
attempts.sortWith(compareAttemptInfo))
}
.getOrElse(new FsApplicationHistoryInfo(attempt.appId, attempt.name, List(attempt)))
newAppMap(attempt.appId) = appInfo
}

// Merge the new app list with the existing one, maintaining the expected ordering (descending
// end time). Maintaining the order is important to avoid having to sort the list every time
// there is a request for the log list.
val newApps = newAppMap.values.toSeq.sortWith(compareAppInfo)
val mergedApps = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()
def addIfAbsent(info: FsApplicationHistoryInfo): Unit = {
if (!mergedApps.contains(info.id)) {
mergedApps += (info.id -> info)
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)
}

applications = mergedApps
val newIterator = newApps.iterator.buffered
val oldIterator = applications.values.iterator.buffered
while (newIterator.hasNext && oldIterator.hasNext) {
if (newAppMap.contains(oldIterator.head.id)) {
oldIterator.next()
} else if (compareAppInfo(newIterator.head, oldIterator.head)) {
addIfAbsent(newIterator.next())
} else {
addIfAbsent(oldIterator.next())
}
}
newIterator.foreach(addIfAbsent)
oldIterator.foreach(addIfAbsent)

applications = mergedApps
}

/**
* Delete event logs from the log directory according to the clean policy defined by the user.
*/
private def cleanLogs(): Unit = {
private[history] def cleanLogs(): Unit = {
try {
val maxAge = conf.getTimeAsSeconds("spark.history.fs.cleaner.maxAge", "7d") * 1000

val now = System.currentTimeMillis()
val now = clock.getTimeMillis()
val appsToRetain = new mutable.LinkedHashMap[String, FsApplicationHistoryInfo]()

def shouldClean(attempt: FsApplicationAttemptInfo): Boolean = {
now - attempt.lastUpdated > maxAge && attempt.completed
}

// Scan all logs from the log directory.
// Only completed applications older than the specified max age will be deleted.
applications.values.foreach { info =>
if (now - info.lastUpdated <= maxAge || !info.completed) {
appsToRetain += (info.id -> info)
} else {
appsToClean += info
applications.values.foreach { app =>
val (toClean, toRetain) = app.attempts.partition(shouldClean)
attemptsToClean ++= toClean

if (toClean.isEmpty) {
appsToRetain += (app.id -> app)
} else if (toRetain.nonEmpty) {
appsToRetain += (app.id ->
new FsApplicationHistoryInfo(app.id, app.name, toRetain.toList))
}
}

applications = appsToRetain

val leftToClean = new mutable.ListBuffer[FsApplicationHistoryInfo]
appsToClean.foreach { info =>
val leftToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
attemptsToClean.foreach { attempt =>
try {
val path = new Path(logDir, info.logPath)
val path = new Path(logDir, attempt.logPath)
if (fs.exists(path)) {
fs.delete(path, true)
}
} catch {
case e: AccessControlException =>
logInfo(s"No permission to delete ${info.logPath}, ignoring.")
logInfo(s"No permission to delete ${attempt.logPath}, ignoring.")
case t: IOException =>
logError(s"IOException in cleaning logs of ${info.logPath}", t)
leftToClean += info
logError(s"IOException in cleaning ${attempt.logPath}", t)
leftToClean += attempt
}
}

appsToClean = leftToClean
attemptsToClean = leftToClean
} catch {
case t: Exception => logError("Exception in cleaning logs", t)
}
Expand All @@ -315,14 +349,36 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
private def compareAppInfo(
i1: FsApplicationHistoryInfo,
i2: FsApplicationHistoryInfo): Boolean = {
if (i1.endTime != i2.endTime) i1.endTime >= i2.endTime else i1.startTime >= i2.startTime
val a1 = i1.attempts.head
val a2 = i2.attempts.head
if (a1.endTime != a2.endTime) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
}

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
*
* @return Whether `a1` should precede `a2`.
*/
private def compareAttemptInfo(
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
} else {
!a1.completed
}
}

/**
* Replays the events in the specified log file and returns information about the associated
* application.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationHistoryInfo = {
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -336,10 +392,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationHistoryInfo(
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appId.getOrElse(logPath.getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
Expand Down Expand Up @@ -425,13 +482,21 @@ private object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"
}

private class FsApplicationHistoryInfo(
private class FsApplicationAttemptInfo(
val logPath: String,
id: String,
name: String,
val name: String,
val appId: String,
attemptId: Option[String],
startTime: Long,
endTime: Long,
lastUpdated: Long,
sparkUser: String,
completed: Boolean = true)
extends ApplicationHistoryInfo(id, name, startTime, endTime, lastUpdated, sparkUser, completed)
extends ApplicationAttemptInfo(
attemptId, startTime, endTime, lastUpdated, sparkUser, completed)

private class FsApplicationHistoryInfo(
id: String,
override val name: String,
override val attempts: List[FsApplicationAttemptInfo])
extends ApplicationHistoryInfo(id, name, attempts)
Loading

0 comments on commit b5ea7d5

Please sign in to comment.