Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into parallelize-python-…
Browse files Browse the repository at this point in the history
…tests
  • Loading branch information
JoshRosen committed Jun 29, 2015
2 parents 110cd9d + 881662e commit 1bacf1b
Show file tree
Hide file tree
Showing 133 changed files with 2,728 additions and 1,847 deletions.
4 changes: 4 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -86,4 +86,8 @@ local-1430917381535_2
DESCRIPTION
NAMESPACE
test_support/*
.*Rd
help/*
html/*
INDEX
.lintr
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -948,6 +948,6 @@ The following components are provided under the MIT License. See project link fo
(MIT License) SLF4J LOG4J-12 Binding (org.slf4j:slf4j-log4j12:1.7.5 - http://www.slf4j.org)
(MIT License) pyrolite (org.spark-project:pyrolite:2.0.1 - http://pythonhosted.org/Pyro4/)
(MIT License) scopt (com.github.scopt:scopt_2.10:3.2.0 - https://github.com/scopt/scopt)
(The MIT License) Mockito (org.mockito:mockito-all:1.8.5 - http://www.mockito.org)
(The MIT License) Mockito (org.mockito:mockito-core:1.9.5 - http://www.mockito.org)
(MIT License) jquery (https://jquery.org/license/)
(MIT License) AnchorJS (https://github.com/bryanbraun/anchorjs)
12 changes: 1 addition & 11 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -69,16 +69,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-mapper-asl</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down Expand Up @@ -354,7 +344,7 @@
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
8 changes: 8 additions & 0 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
_dagScheduler = ds
}

/**
* A unique identifier for the Spark application.
* Its format depends on the scheduler implementation.
* (i.e.
* in case of local spark app something like 'local-1433865536131'
* in case of YARN something like 'application_1433865536131_34483'
* )
*/
def applicationId: String = _applicationId
def applicationAttemptId: Option[String] = _applicationAttemptId

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
replayBus.addListener(appListener)
val appInfo = replay(fs.getFileStatus(new Path(logDir, attempt.logPath)), replayBus)

appInfo.foreach { app => ui.setAppName(s"${app.name} ($appId)") }
ui.setAppName(s"${appInfo.name} ($appId)")

val uiAclsEnabled = conf.getBoolean("spark.history.ui.acls.enable", false)
ui.getSecurityManager.setAcls(uiAclsEnabled)
Expand Down Expand Up @@ -282,12 +282,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val newAttempts = logs.flatMap { fileStatus =>
try {
val res = replay(fileStatus, bus)
res match {
case Some(r) => logDebug(s"Application log ${r.logPath} loaded successfully.")
case None => logWarning(s"Failed to load application log ${fileStatus.getPath}. " +
"The application may have not started.")
}
res
logInfo(s"Application log ${res.logPath} loaded successfully.")
Some(res)
} catch {
case e: Exception =>
logError(
Expand Down Expand Up @@ -433,11 +429,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Replays the events in the specified log file and returns information about the associated
* application. Return `None` if the application ID cannot be located.
* application.
*/
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus): Option[FsApplicationAttemptInfo] = {
private def replay(eventLog: FileStatus, bus: ReplayListenerBus): FsApplicationAttemptInfo = {
val logPath = eventLog.getPath()
logInfo(s"Replaying log path: $logPath")
val logInput =
Expand All @@ -451,18 +445,16 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val appCompleted = isApplicationCompleted(eventLog)
bus.addListener(appListener)
bus.replay(logInput, logPath.toString, !appCompleted)
appListener.appId.map { appId =>
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appId,
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
}
new FsApplicationAttemptInfo(
logPath.getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.appId.getOrElse(logPath.getName()),
appListener.appAttemptId,
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(eventLog).get,
appListener.sparkUser.getOrElse(NOT_STARTED),
appCompleted)
} finally {
logInput.close()
}
Expand Down
88 changes: 44 additions & 44 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -572,55 +572,55 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
val attempt = taskInfo.attempt
val timelineObject =
s"""
{
'className': 'task task-assignment-timeline-object',
'group': '$executorId',
'content': '<div class="task-assignment-timeline-content"' +
'data-toggle="tooltip" data-placement="top"' +
'data-html="true" data-container="body"' +
'data-title="${s"Task " + index + " (attempt " + attempt + ")"}<br>' +
'Status: ${taskInfo.status}<br>' +
'Launch Time: ${UIUtils.formatDate(new Date(launchTime))}' +
'${
|{
|'className': 'task task-assignment-timeline-object',
|'group': '$executorId',
|'content': '<div class="task-assignment-timeline-content"
|data-toggle="tooltip" data-placement="top"
|data-html="true" data-container="body"
|data-title="${s"Task " + index + " (attempt " + attempt + ")"}<br>
|Status: ${taskInfo.status}<br>
|Launch Time: ${UIUtils.formatDate(new Date(launchTime))}
|${
if (!taskInfo.running) {
s"""<br>Finish Time: ${UIUtils.formatDate(new Date(finishTime))}"""
} else {
""
}
}' +
'<br>Scheduler Delay: $schedulerDelay ms' +
'<br>Task Deserialization Time: ${UIUtils.formatDuration(deserializationTime)}' +
'<br>Shuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)}' +
'<br>Executor Computing Time: ${UIUtils.formatDuration(executorComputingTime)}' +
'<br>Shuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)}' +
'<br>Result Serialization Time: ${UIUtils.formatDuration(serializationTime)}' +
'<br>Getting Result Time: ${UIUtils.formatDuration(gettingResultTime)}">' +
'<svg class="task-assignment-timeline-duration-bar">' +
'<rect class="scheduler-delay-proportion" ' +
'x="$schedulerDelayProportionPos%" y="0px" height="26px"' +
'width="$schedulerDelayProportion%""></rect>' +
'<rect class="deserialization-time-proportion" '+
'x="$deserializationTimeProportionPos%" y="0px" height="26px"' +
'width="$deserializationTimeProportion%"></rect>' +
'<rect class="shuffle-read-time-proportion" ' +
'x="$shuffleReadTimeProportionPos%" y="0px" height="26px"' +
'width="$shuffleReadTimeProportion%"></rect>' +
'<rect class="executor-runtime-proportion" ' +
'x="$executorRuntimeProportionPos%" y="0px" height="26px"' +
'width="$executorComputingTimeProportion%"></rect>' +
'<rect class="shuffle-write-time-proportion" ' +
'x="$shuffleWriteTimeProportionPos%" y="0px" height="26px"' +
'width="$shuffleWriteTimeProportion%"></rect>' +
'<rect class="serialization-time-proportion" ' +
'x="$serializationTimeProportionPos%" y="0px" height="26px"' +
'width="$serializationTimeProportion%"></rect>' +
'<rect class="getting-result-time-proportion" ' +
'x="$gettingResultTimeProportionPos%" y="0px" height="26px"' +
'width="$gettingResultTimeProportion%"></rect></svg>',
'start': new Date($launchTime),
'end': new Date($finishTime)
}
"""
}
|<br>Scheduler Delay: $schedulerDelay ms
|<br>Task Deserialization Time: ${UIUtils.formatDuration(deserializationTime)}
|<br>Shuffle Read Time: ${UIUtils.formatDuration(shuffleReadTime)}
|<br>Executor Computing Time: ${UIUtils.formatDuration(executorComputingTime)}
|<br>Shuffle Write Time: ${UIUtils.formatDuration(shuffleWriteTime)}
|<br>Result Serialization Time: ${UIUtils.formatDuration(serializationTime)}
|<br>Getting Result Time: ${UIUtils.formatDuration(gettingResultTime)}">
|<svg class="task-assignment-timeline-duration-bar">
|<rect class="scheduler-delay-proportion"
|x="$schedulerDelayProportionPos%" y="0px" height="26px"
|width="$schedulerDelayProportion%""></rect>
|<rect class="deserialization-time-proportion"
|x="$deserializationTimeProportionPos%" y="0px" height="26px"
|width="$deserializationTimeProportion%"></rect>
|<rect class="shuffle-read-time-proportion"
|x="$shuffleReadTimeProportionPos%" y="0px" height="26px"
|width="$shuffleReadTimeProportion%"></rect>
|<rect class="executor-runtime-proportion"
|x="$executorRuntimeProportionPos%" y="0px" height="26px"
|width="$executorComputingTimeProportion%"></rect>
|<rect class="shuffle-write-time-proportion"
|x="$shuffleWriteTimeProportionPos%" y="0px" height="26px"
|width="$shuffleWriteTimeProportion%"></rect>
|<rect class="serialization-time-proportion"
|x="$serializationTimeProportionPos%" y="0px" height="26px"
|width="$serializationTimeProportion%"></rect>
|<rect class="getting-result-time-proportion"
|x="$gettingResultTimeProportionPos%" y="0px" height="26px"
|width="$gettingResultTimeProportion%"></rect></svg>',
|'start': new Date($launchTime),
|'end': new Date($finishTime)
|}
|""".stripMargin.replaceAll("\n", " ")
timelineObject
}.mkString("[", ",", "]")

Expand Down
33 changes: 33 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
}
}
}

/**
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
* in a circular buffer. The current contents of the buffer can be accessed using
* the toString method.
*/
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
var pos: Int = 0
var buffer = new Array[Int](sizeInBytes)

def write(i: Int): Unit = {
buffer(pos) = i
pos = (pos + 1) % buffer.length
}

override def toString: String = {
val (end, start) = buffer.splitAt(pos)
val input = new java.io.InputStream {
val iterator = (start ++ end).iterator

def read(): Int = if (iterator.hasNext) iterator.next() else -1
}
val reader = new BufferedReader(new InputStreamReader(input))
val stringBuilder = new StringBuilder
var line = reader.readLine()
while (line != null) {
stringBuilder.append(line)
stringBuilder.append("\n")
line = reader.readLine()
}
stringBuilder.toString()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,33 +67,29 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
// Write a new-style application log.
val newAppComplete = newLogFile("new1", None, inProgress = false)
writeFile(newAppComplete, true, None,
SparkListenerApplicationStart(
"new-app-complete", Some("new-app-complete"), 1L, "test", None),
SparkListenerApplicationStart("new-app-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(5L)
)

// Write a new-style application log.
val newAppCompressedComplete = newLogFile("new1compressed", None, inProgress = false,
Some("lzf"))
writeFile(newAppCompressedComplete, true, None,
SparkListenerApplicationStart(
"new-app-compressed-complete", Some("new-app-compressed-complete"), 1L, "test", None),
SparkListenerApplicationStart("new-app-compressed-complete", None, 1L, "test", None),
SparkListenerApplicationEnd(4L))

// Write an unfinished app, new-style.
val newAppIncomplete = newLogFile("new2", None, inProgress = true)
writeFile(newAppIncomplete, true, None,
SparkListenerApplicationStart(
"new-app-incomplete", Some("new-app-incomplete"), 1L, "test", None)
SparkListenerApplicationStart("new-app-incomplete", None, 1L, "test", None)
)

// Write an old-style application log.
val oldAppComplete = new File(testDir, "old1")
oldAppComplete.mkdir()
createEmptyFile(new File(oldAppComplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppComplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-complete", Some("old-app-complete"), 2L, "test", None),
SparkListenerApplicationStart("old-app-complete", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(oldAppComplete, provider.APPLICATION_COMPLETE))
Expand All @@ -107,8 +103,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
oldAppIncomplete.mkdir()
createEmptyFile(new File(oldAppIncomplete, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(oldAppIncomplete, provider.LOG_PREFIX + "1"), false, None,
SparkListenerApplicationStart(
"old-app-incomplete", Some("old-app-incomplete"), 2L, "test", None)
SparkListenerApplicationStart("old-app-incomplete", None, 2L, "test", None)
)

// Force a reload of data from the log directory, and check that both logs are loaded.
Expand All @@ -129,16 +124,16 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
List(ApplicationAttemptInfo(None, start, end, lastMod, user, completed)))
}

list(0) should be (makeAppInfo("new-app-complete", "new-app-complete", 1L, 5L,
list(0) should be (makeAppInfo(newAppComplete.getName(), "new-app-complete", 1L, 5L,
newAppComplete.lastModified(), "test", true))
list(1) should be (makeAppInfo("new-app-compressed-complete",
list(1) should be (makeAppInfo(newAppCompressedComplete.getName(),
"new-app-compressed-complete", 1L, 4L, newAppCompressedComplete.lastModified(), "test",
true))
list(2) should be (makeAppInfo("old-app-complete", "old-app-complete", 2L, 3L,
list(2) should be (makeAppInfo(oldAppComplete.getName(), "old-app-complete", 2L, 3L,
oldAppComplete.lastModified(), "test", true))
list(3) should be (makeAppInfo("old-app-incomplete", "old-app-incomplete", 2L, -1L,
list(3) should be (makeAppInfo(oldAppIncomplete.getName(), "old-app-incomplete", 2L, -1L,
oldAppIncomplete.lastModified(), "test", false))
list(4) should be (makeAppInfo("new-app-incomplete", "new-app-incomplete", 1L, -1L,
list(4) should be (makeAppInfo(newAppIncomplete.getName(), "new-app-incomplete", 1L, -1L,
newAppIncomplete.lastModified(), "test", false))

// Make sure the UI can be rendered.
Expand All @@ -162,7 +157,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
logDir.mkdir()
createEmptyFile(new File(logDir, provider.SPARK_VERSION_PREFIX + "1.0"))
writeFile(new File(logDir, provider.LOG_PREFIX + "1"), false, Option(codec),
SparkListenerApplicationStart("app2", Some("app2"), 2L, "test", None),
SparkListenerApplicationStart("app2", None, 2L, "test", None),
SparkListenerApplicationEnd(3L)
)
createEmptyFile(new File(logDir, provider.COMPRESSION_CODEC_PREFIX + codecName))
Expand All @@ -185,12 +180,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
test("SPARK-3697: ignore directories that cannot be read.") {
val logFile1 = newLogFile("new1", None, inProgress = false)
writeFile(logFile1, true, None,
SparkListenerApplicationStart("app1-1", Some("app1-1"), 1L, "test", None),
SparkListenerApplicationStart("app1-1", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
val logFile2 = newLogFile("new2", None, inProgress = false)
writeFile(logFile2, true, None,
SparkListenerApplicationStart("app1-2", Some("app1-2"), 1L, "test", None),
SparkListenerApplicationStart("app1-2", None, 1L, "test", None),
SparkListenerApplicationEnd(2L)
)
logFile2.setReadable(false, false)
Expand Down Expand Up @@ -223,18 +218,6 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
}
}

test("Parse logs that application is not started") {
val provider = new FsHistoryProvider((createTestConf()))

val logFile1 = newLogFile("app1", None, inProgress = true)
writeFile(logFile1, true, None,
SparkListenerLogStart("1.4")
)
updateAndCheck(provider) { list =>
list.size should be (0)
}
}

test("SPARK-5582: empty log directory") {
val provider = new FsHistoryProvider(createTestConf())

Expand Down
8 changes: 8 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
assert(!Utils.isInDirectory(nullFile, parentDir))
assert(!Utils.isInDirectory(nullFile, childFile3))
}

test("circular buffer") {
val buffer = new CircularBuffer(25)
val stream = new java.io.PrintStream(buffer, true, "UTF-8")

stream.println("test circular test circular test circular test circular test circular")
assert(buffer.toString === "t circular test circular\n")
}
}
Loading

0 comments on commit 1bacf1b

Please sign in to comment.