Skip to content

Commit

Permalink
Add a test for apps with multiple attempts.
Browse files Browse the repository at this point in the history
  • Loading branch information
Marcelo Vanzin committed Apr 8, 2015
1 parent 3245aa2 commit 88b1de8
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
// map. If an attempt has been updated, it replaces the old attempt in the list.
val newAppMap = new mutable.HashMap[String, FsApplicationHistoryInfo]()
newAttempts.foreach { attempt =>
val appInfo = applications.get(attempt.appId)
.orElse(newAppMap.get(attempt.appId))
val appInfo = newAppMap.get(attempt.appId)
.orElse(applications.get(attempt.appId))
.map { app =>
val attempts =
app.attempts.filter(_.attemptId != attempt.attemptId).toList ++ List(attempt)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
/** Create a fake log file using the new log format used in Spark 1.3+ */
private def newLogFile(
appId: String,
appAttemptId: String,
inProgress: Boolean,
codec: Option[String] = None): File = {
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, "")
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId, appAttemptId)
val logPath = new URI(logUri).getPath + ip
new File(logPath)
}
Expand All @@ -59,20 +60,20 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
val provider = new FsHistoryProvider(createTestConf())

// Write a new-style application log.
val newAppComplete = newLogFile("new1", inProgress = false)
val newAppComplete = newLogFile("new1", "", inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart("new-app-complete", None, 1L, "test"),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", inProgress = false, Some("lzf"))
val newAppCompressedComplete = newLogFile("new1compressed", "", inProgress = false, Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test"),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", inProgress = true)
val newAppIncomplete = newLogFile("new2", "", inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test")
)
Expand Down Expand Up @@ -101,34 +102,33 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers

// Force a reload of data from the log directory, and check that both logs are loaded.
// Take the opportunity to check that the offset checks work as expected.
provider.checkForLogs()

val list = provider.getListing().toSeq
list should not be (null)
list.size should be (5)
list.count(_.attempts.head.completed) should be (3)

def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long,
user: String, completed: Boolean): ApplicationHistoryInfo = {
ApplicationHistoryInfo(id,
List(ApplicationAttemptInfo("", name, start, end, lastMod, user, completed)))
}
updateAndCheck(provider) { list =>
list.size should be (5)
list.count(_.attempts.head.completed) should be (3)

def makeAppInfo(id: String, name: String, start: Long, end: Long, lastMod: Long,
user: String, completed: Boolean): ApplicationHistoryInfo = {
ApplicationHistoryInfo(id,
List(ApplicationAttemptInfo("", name, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
list.foreach { case info =>
val appUi = provider.getAppUI(info.id, "")
appUi should not be null
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
list.foreach { case info =>
val appUi = provider.getAppUI(info.id, "")
appUi should not be null
}
}
}

Expand Down Expand Up @@ -165,51 +165,50 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
}

test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", inProgress = false)
val logFile1 = newLogFile("new1", "", inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", inProgress = false)
val logFile2 = newLogFile("new2", "", inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", None, 1L, "test"),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)

val provider = new FsHistoryProvider(createTestConf())
provider.checkForLogs()

val list = provider.getListing().toSeq
list should not be (null)
list.size should be (1)
updateAndCheck(provider) { list =>
list.size should be (1)
}
}

test("history file is renamed from inprogress to completed") {
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("app1", inProgress = true)
val logFile1 = newLogFile("app1", "", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L)
)
provider.checkForLogs()
val appListBeforeRename = provider.getListing()
appListBeforeRename.size should be (1)
appListBeforeRename.head.attempts.head.logPath should endWith(EventLoggingListener.IN_PROGRESS)
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should
endWith(EventLoggingListener.IN_PROGRESS)
}

logFile1.renameTo(newLogFile("app1", inProgress = false))
provider.checkForLogs()
val appListAfterRename = provider.getListing()
appListAfterRename.size should be (1)
appListAfterRename.head.attempts.head.logPath should not
endWith(EventLoggingListener.IN_PROGRESS)
logFile1.renameTo(newLogFile("app1", "", inProgress = false))
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.head.asInstanceOf[FsApplicationAttemptInfo].logPath should not
endWith(EventLoggingListener.IN_PROGRESS)
}
}

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

val logFile1 = newLogFile("app1", inProgress = true)
val logFile1 = newLogFile("app1", "", inProgress = true)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test"),
SparkListenerApplicationEnd(2L))
Expand All @@ -222,6 +221,81 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
appListAfterRename.size should be (1)
}

test("apps with multiple attempts") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", "attempt1", inProgress = false)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", "attempt1"),
SparkListenerApplicationEnd(2L)
)

updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (1)
}

val attempt2 = newLogFile("app1", "attempt2", inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", "attempt2")
)

updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be ("attempt1")
}

val completedAttempt2 = newLogFile("app1", "attempt2", inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", "attempt2"),
SparkListenerApplicationEnd(4L)
)

updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be ("attempt2")
}

val app2Attempt1 = newLogFile("app2", "attempt1", inProgress = false)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app2", Some("app2"), 5L, "test", "attempt1"),
SparkListenerApplicationEnd(6L)
)

updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
list.head.attempts.head.attemptId should be ("attempt1")

list.foreach { case app =>
app.attempts.foreach { attempt =>
val appUi = provider.getAppUI(app.id, attempt.attemptId)
appUi should not be null
}
}

}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
*
* updateAndCheck(provider) { list =>
* // asserts
* }
*/
private def updateAndCheck(provider: FsHistoryProvider)
(checkFn: Seq[ApplicationHistoryInfo] => Unit): Unit = {
provider.checkForLogs()
checkFn(provider.getListing().toSeq)
}

private def writeFile(file: File, isNewFormat: Boolean, codec: Option[CompressionCodec],
events: SparkListenerEvent*) = {
val fstream = new FileOutputStream(file)
Expand Down

0 comments on commit 88b1de8

Please sign in to comment.