Skip to content

Commit

Permalink
[SPARK-8372] History server shows incorrect information for applicati…
Browse files Browse the repository at this point in the history
…on not started

The history server may show an incorrect App ID for an incomplete application like <App ID>.inprogress. This app info will never disappear even after the app is completed.
![incorrectappinfo](https://cloud.githubusercontent.com/assets/9278199/8156147/2a10fdbe-137d-11e5-9620-c5b61d93e3c1.png)

The cause of the issue is that a log path name is used as the app id when app id cannot be got during replay.

Author: Carson Wang <carson.wang@intel.com>

Closes apache#6827 from carsonwang/SPARK-8372 and squashes the following commits:

cdbb089 [Carson Wang] Fix code style
3e46b35 [Carson Wang] Update code style
90f5dde [Carson Wang] Add a unit test
d8c9cd0 [Carson Wang] Replaying events only return information when app is started
  • Loading branch information
carsonwang authored and nemccarthy committed Jun 19, 2015
1 parent 47025a0 commit 4069763
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

ui.setAppName(s"${appInfo.name} ($appId)")
appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,8 +282,12 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -429,9 +433,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application.
* application. Return `None` if the application ID cannot be located.
*/
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -445,16 +451,18 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
} finally {
logInput.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,29 +67,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationStart(
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationStart(
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
SparkListenerApplicationStart(
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationStart(
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
Expand All @@ -103,7 +107,8 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
SparkListenerApplicationStart(
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -124,16 +129,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
list(1) should be (makeAppInfo("new-app-compressed-complete",
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
Expand All @@ -157,7 +162,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
Expand All @@ -180,12 +185,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
Expand Down Expand Up @@ -218,6 +223,18 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("Parse logs that application is not started") {
val provider = new FsHistoryProvider((createTestConf()))

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
list.size should be (0)
}
}

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

Expand Down

0 comments on commit 4069763

Please sign in to comment.