Skip to content

Commit

Permalink
Add parent IDs to StageInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Apr 29, 2015
1 parent 6e2cfea commit 43de96e
Show file tree
Hide file tree
Showing 7 changed files with 43 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class StageInfo(
val name: String,
val numTasks: Int,
val rddInfos: Seq[RDDInfo],
val parentIds: Seq[Int],
val details: String) {
/** When this stage was submitted from the DAGScheduler to a TaskScheduler. */
var submissionTime: Option[Long] = None
Expand Down Expand Up @@ -66,6 +67,7 @@ private[spark] object StageInfo {
stage.name,
numTasks.getOrElse(stage.numTasks),
rddInfos,
stage.parents.map(_.id),
stage.details)
}
}
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/ui/jobs/JobPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private[ui] class JobPage(parent: JobsTab) extends WebUIPage("job") {
// This could be empty if the JobProgressListener hasn't received information about the
// stage or if the stage information has been garbage collected
listener.stageIdToInfo.getOrElse(stageId,
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, "Unknown"))
new StageInfo(stageId, 0, "Unknown", 0, Seq.empty, Seq.empty, "Unknown"))
}

val activeStages = mutable.Buffer[StageInfo]()
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ private[spark] object JsonProtocol {

def stageInfoToJson(stageInfo: StageInfo): JValue = {
val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
Expand All @@ -235,6 +236,7 @@ private[spark] object JsonProtocol {
("Stage Name" -> stageInfo.name) ~
("Number of Tasks" -> stageInfo.numTasks) ~
("RDD Info" -> rddInfo) ~
("Parent IDs" -> parentIds) ~
("Details" -> stageInfo.details) ~
("Submission Time" -> submissionTime) ~
("Completion Time" -> completionTime) ~
Expand Down Expand Up @@ -521,7 +523,7 @@ private[spark] object JsonProtocol {
// The "Stage Infos" field was added in Spark 1.2.0
val stageInfos = Utils.jsonOption(json \ "Stage Infos")
.map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
}
SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
}
Expand Down Expand Up @@ -601,6 +603,9 @@ private[spark] object JsonProtocol {
val stageName = (json \ "Stage Name").extract[String]
val numTasks = (json \ "Number of Tasks").extract[Int]
val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
val parentIds = Utils.jsonOption(json \ "Parent IDs")
.map { l => l.extract[List[JValue]].map(_.extract[Int]) }
.getOrElse(Seq.empty)
val details = (json \ "Details").extractOpt[String].getOrElse("")
val submissionTime = Utils.jsonOption(json \ "Submission Time").map(_.extract[Long])
val completionTime = Utils.jsonOption(json \ "Completion Time").map(_.extract[Long])
Expand All @@ -610,7 +615,8 @@ private[spark] object JsonProtocol {
case None => Seq[AccumulableInfo]()
}

val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos, details)
val stageInfo = new StageInfo(
stageId, attemptId, stageName, numTasks, rddInfos, parentIds, details)
stageInfo.submissionTime = submissionTime
stageInfo.completionTime = completionTime
stageInfo.failureReason = failureReason
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
private val executorIdleTimeout = 3L

private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, Seq.empty, "no details")
}

private def createTaskInfo(taskId: Int, taskIndex: Int, executorId: String): TaskInfo = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
val jobCompletionTime = 1421191296660L

private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
SparkListenerStageSubmitted(stageInfo)
}

private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, "")
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
if (failed) {
stageInfo.failureReason = Some("Failed!")
}
Expand All @@ -51,7 +51,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
stageIds: Seq[Int],
jobGroup: Option[String] = None): SparkListenerJobStart = {
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, "")
new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
}
val properties: Option[Properties] = jobGroup.map { groupId =>
val props = new Properties()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
assert(storageListener.rddInfoList.isEmpty)

// 2 RDDs are known, but none are cached
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), "details")
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0, rddInfo1), Seq.empty, "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 2)
assert(storageListener.rddInfoList.isEmpty)
Expand All @@ -64,15 +64,16 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val rddInfo3Cached = rddInfo3
rddInfo2Cached.numCachedPartitions = 1
rddInfo3Cached.numCachedPartitions = 1
val stageInfo1 = new StageInfo(1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), "details")
val stageInfo1 = new StageInfo(
1, 0, "0", 100, Seq(rddInfo2Cached, rddInfo3Cached), Seq.empty, "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo1))
assert(storageListener._rddInfoMap.size === 4)
assert(storageListener.rddInfoList.size === 2)

// Submitting RDDInfos with duplicate IDs does nothing
val rddInfo0Cached = new RDDInfo(0, "freedom", 100, StorageLevel.MEMORY_ONLY, "scoop", Seq(10))
rddInfo0Cached.numCachedPartitions = 1
val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), "details")
val stageInfo0Cached = new StageInfo(0, 0, "0", 100, Seq(rddInfo0), Seq.empty, "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0Cached))
assert(storageListener._rddInfoMap.size === 4)
assert(storageListener.rddInfoList.size === 2)
Expand All @@ -88,7 +89,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val rddInfo1Cached = rddInfo1
rddInfo0Cached.numCachedPartitions = 1
rddInfo1Cached.numCachedPartitions = 1
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), "details")
val stageInfo0 = new StageInfo(
0, 0, "0", 100, Seq(rddInfo0Cached, rddInfo1Cached), Seq.empty, "details")
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 2)
assert(storageListener.rddInfoList.size === 2)
Expand All @@ -108,7 +110,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
val stageInfo0 = new StageInfo(
0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), Seq.empty, "details")
bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
Expand Down Expand Up @@ -168,8 +170,8 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {

val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, "scoop", Seq(4))
val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, "scoop", Seq(4))
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), "details")
val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details")
val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details")
val taskMetrics0 = new TaskMetrics
val taskMetrics1 = new TaskMetrics
val block0 = (RDDBlockId(0, 1), BlockStatus(memOnly, 100L, 0L, 0L))
Expand Down
24 changes: 19 additions & 5 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
}

test("StageInfo backward compatibility") {
test("StageInfo backward compatibility (details, accumulables)") {
val info = makeStageInfo(1, 2, 3, 4L, 5L)
val newJson = JsonProtocol.stageInfoToJson(info)

Expand Down Expand Up @@ -294,7 +294,7 @@ class JsonProtocolSuite extends FunSuite {
val stageIds = Seq[Int](1, 2, 3, 4)
val stageInfos = stageIds.map(x => makeStageInfo(x, x * 200, x * 300, x * 400, x * 500))
val dummyStageInfos =
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, "unknown"))
stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown"))
val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
val expectedJobStart =
Expand All @@ -320,8 +320,8 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
}

test("RDDInfo backward compatibility") {
// Prior to Spark 1.4.0, RDDInfo did not have a "Scope" and "Parent IDs" properties
test("RDDInfo backward compatibility (scope, parent IDs)") {
// Prior to Spark 1.4.0, RDDInfo did not have the "Scope" and "Parent IDs" properties
val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, "fable", Seq(1, 6, 8))
val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
.removeField({ _._1 == "Scope"})
Expand All @@ -330,6 +330,14 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
}

test("StageInfo backward compatibility (parent IDs)") {
// Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details")
val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ _._1 == "Parent IDs"})
val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details")
assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
}

/** -------------------------- *
| Helper test running methods |
* --------------------------- */
Expand Down Expand Up @@ -661,7 +669,7 @@ class JsonProtocolSuite extends FunSuite {

private def makeStageInfo(a: Int, b: Int, c: Int, d: Long, e: Long) = {
val rddInfos = (0 until a % 5).map { i => makeRddInfo(a + i, b + i, c + i, d + i, e + i) }
val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, "details")
val stageInfo = new StageInfo(a, 0, "greetings", b, rddInfos, Seq(100, 200, 300), "details")
val (acc1, acc2) = (makeAccumulableInfo(1), makeAccumulableInfo(2))
stageInfo.accumulables(acc1.id) = acc1
stageInfo.accumulables(acc2.id) = acc2
Expand Down Expand Up @@ -754,6 +762,7 @@ class JsonProtocolSuite extends FunSuite {
| "Stage Name": "greetings",
| "Number of Tasks": 200,
| "RDD Info": [],
| "ParentIDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down Expand Up @@ -808,6 +817,7 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Size": 501
| }
| ],
| "ParentIDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down Expand Up @@ -1193,6 +1203,7 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Size": 500
| }
| ],
| "Parent IDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down Expand Up @@ -1252,6 +1263,7 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Size": 1001
| }
| ],
| "ParentIDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down Expand Up @@ -1329,6 +1341,7 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Size": 1502
| }
| ],
| "ParentIDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down Expand Up @@ -1424,6 +1437,7 @@ class JsonProtocolSuite extends FunSuite {
| "Disk Size": 2003
| }
| ],
| "ParentIDs" : [100, 200, 300],
| "Details": "details",
| "Accumulables": [
| {
Expand Down

0 comments on commit 43de96e

Please sign in to comment.