Skip to content

Commit

Permalink
Handle logs generated by 1.0 and 1.1.
Browse files Browse the repository at this point in the history
These logs don't have app IDs, so they should not be filtered.
  • Loading branch information
Marcelo Vanzin committed Jun 29, 2015
1 parent 1eca3fe commit 7b91b74
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
// List of application logs to be deleted by event log cleaner.
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]

// Constants used to parse Spark 1.0.0 log directories.
private[history] val LOG_PREFIX = "EVENT_LOG_"
private[history] val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
private[history] val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
private[history] val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"

/**
* Return a runnable that performs the given operation on the event logs.
* This operation is expected to be executed periodically.
Expand Down Expand Up @@ -155,20 +149,20 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
HistoryServer.getAttemptURI(appId, attempt.attemptId), attempt.startTime)
// Do not call ui.bind() to avoid creating a new server for each application
}

val appListener = new ApplicationEventListener()
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
appInfo.map { info =>
ui.setAppName(s"${info.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
// make sure to set admin acls before view acls so they are properly picked up
ui.getSecurityManager.setAdminAcls(appListener.adminAcls.getOrElse(""))
ui.getSecurityManager.setViewAcls(attempt.sparkUser,
appListener.viewAcls.getOrElse(""))
ui
}.orNull
}
}
} catch {
Expand Down Expand Up @@ -451,17 +445,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(

// Without an app ID, new logs will render incorrectly in the listing page, so do not list or
// try to show their UI. Some old versions of Spark generate logs without an app ID, so let
// logs generated by those versions go through.
if (appListener.appId.isDefined || !sparkVersionHasAppId(eventLog)) {
Some(new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
appCompleted))
} else {
None
}
} finally {
logInput.close()
Expand Down Expand Up @@ -537,10 +537,34 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Returns whether the version of Spark that generated logs records app IDs. App IDs were added
* in Spark 1.1.
*/
private def sparkVersionHasAppId(entry: FileStatus): Boolean = {
if (isLegacyLogDirectory(entry)) {
fs.listStatus(entry.getPath())
.find { status => status.getPath().getName().startsWith(SPARK_VERSION_PREFIX) }
.map { status =>
val version = status.getPath().getName().substring(SPARK_VERSION_PREFIX.length())
version != "1.0" && version != "1.1"
}
.getOrElse(true)
} else {
true
}
}

}

private object FsHistoryProvider {
private[history] object FsHistoryProvider {
val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

// Constants used to parse Spark 1.0.0 log directories.
val LOG_PREFIX = "EVENT_LOG_"
val SPARK_VERSION_PREFIX = EventLoggingListener.SPARK_VERSION_KEY + "_"
val COMPRESSION_CODEC_PREFIX = EventLoggingListener.COMPRESSION_CODEC_KEY + "_"
val APPLICATION_COMPLETE = "APPLICATION_COMPLETE"
}

private class FsApplicationAttemptInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import org.apache.spark.util.{JsonProtocol, ManualClock, Utils}

class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matchers with Logging {

import FsHistoryProvider._

private var testDir: File = null

before {
Expand Down Expand Up @@ -67,48 +69,39 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart(
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationStart(newAppComplete.getName(), Some("new-app-complete"), 1L, "test",
None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart(
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationStart(newAppCompressedComplete.getName(), Some("new-complete-lzf"),
1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart(
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
SparkListenerApplicationStart(newAppIncomplete.getName(), Some("new-incomplete"), 1L, "test",
None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
val oldAppComplete = writeOldLog("old1", "1.0", None, true,
SparkListenerApplicationStart("old1", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))

// Check for logs so that we force the older unfinished app to be loaded, to make
// sure unfinished apps are also sorted correctly.
provider.checkForLogs()

// Write an unfinished app, old-style.
val oldAppIncomplete = new File(testDir, "old2")
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
val oldAppIncomplete = writeOldLog("old2", "1.0", None, false,
SparkListenerApplicationStart("old2", None, 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -129,16 +122,15 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
list(0) should be (makeAppInfo("new-app-complete", newAppComplete.getName(), 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-app-compressed-complete",
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
list(1) should be (makeAppInfo("new-complete-lzf", newAppCompressedComplete.getName(),
1L, 4L, newAppCompressedComplete.lastModified(), "test", true))
list(2) should be (makeAppInfo("old-app-complete", oldAppComplete.getName(), 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), oldAppIncomplete.getName(), 2L,
-1L, oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-incomplete", newAppIncomplete.getName(), 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
Expand All @@ -160,12 +152,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val codec = if (valid) CompressionCodec.createCodec(new SparkConf(), codecName) else null
val logDir = new File(testDir, codecName)
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
createEmptyFile(new File(logDir, SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
createEmptyFile(new File(logDir, COMPRESSION_CODEC_PREFIX + codecName))

val logPath = new Path(logDir.getAbsolutePath())
try {
Expand Down Expand Up @@ -390,6 +382,33 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("SPARK-8372: new logs with no app ID are ignored") {
val provider = new FsHistoryProvider(createTestConf())

// Write a new log file without an app id, to make sure it's ignored.
val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)

// Write a 1.2 log file with no start event (= no app id), it should be ignored.
writeOldLog("v12Log", "1.2", None, false)

// Write 1.0 and 1.1 logs, which don't have app ids.
writeOldLog("v11Log", "1.1", None, true,
SparkListenerApplicationStart("v11Log", None, 2L, "test", None),
SparkListenerApplicationEnd(3L))
writeOldLog("v10Log", "1.0", None, true,
SparkListenerApplicationStart("v10Log", None, 2L, "test", None),
SparkListenerApplicationEnd(4L))

updateAndCheck(provider) { list =>
list.size should be (2)
list(0).id should be ("v10Log")
list(1).id should be ("v11Log")
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down Expand Up @@ -429,4 +448,23 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
new SparkConf().set("spark.history.fs.logDirectory", testDir.getAbsolutePath())
}

private def writeOldLog(
fname: String,
sparkVersion: String,
codec: Option[CompressionCodec],
completed: Boolean,
events: SparkListenerEvent*): File = {
val log = new File(testDir, fname)
log.mkdir()

val oldEventLog = new File(log, LOG_PREFIX + "1")
createEmptyFile(new File(log, SPARK_VERSION_PREFIX + sparkVersion))
writeFile(new File(log, LOG_PREFIX + "1"), false, codec, events: _*)
if (completed) {
createEmptyFile(new File(log, APPLICATION_COMPLETE))
}

log
}

}

0 comments on commit 7b91b74

Please sign in to comment.