Skip to content

Commit

Permalink
Replaying events only return information when app is started
Browse files Browse the repository at this point in the history
  • Loading branch information
carsonwang committed Jun 15, 2015
1 parent 4c5889e commit d8c9cd0
Showing 1 changed file with 24 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
appInfo.map { app => ui.setAppName(s"${app.name} ($appId)") }

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,8 +282,14 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
res.map { r =>
logInfo(s"Application log ${r.logPath} loaded successfully.")
}.getOrElse {
logInfo(
s"Failed to load application log ${fileStatus.getPath}." +
"The application may have not started.")
}
res
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -431,7 +437,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
* Replays the events in the specified log file and returns information about the associated
* application.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -445,16 +453,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
} finally {
logInput.close()
}
Expand Down

0 comments on commit d8c9cd0

Please sign in to comment.