Skip to content

Commit

Permalink
finally found the issue ... clean up debug stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 11, 2015
1 parent 8c29707 commit cb245da
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,6 @@ class DAGScheduler(
stage.pendingTasks.clear()

// First figure out the indexes of partition ids to compute.
println(s"finding partitions to compute for $stage")
val partitionsToCompute: Seq[Int] = {
stage match {
case stage: ShuffleMapStage =>
Expand Down Expand Up @@ -945,7 +944,6 @@ class DAGScheduler(
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
println(debugString)
}
}

Expand Down Expand Up @@ -1064,7 +1062,6 @@ class DAGScheduler(

if (runningStages.contains(shuffleStage) && shuffleStage.pendingTasks.isEmpty) {
markStageAsFinished(shuffleStage)
println(s"marking $shuffleStage as finished")
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
Expand All @@ -1091,7 +1088,6 @@ class DAGScheduler(
.map(_._2).mkString(", "))
submitStage(shuffleStage)
} else {
println(s"looking for newly runnable stage")
val newlyRunnable = new ArrayBuffer[Stage]
for (shuffleStage <- waitingStages) {
logInfo("Missing parents for " + shuffleStage + ": " +
Expand All @@ -1102,7 +1098,6 @@ class DAGScheduler(
newlyRunnable += shuffleStage
}
val newlyRunnableWithJob = newlyRunnable.map{x => x -> activeJobForStage(x)}
println(s"newly runnable stages = $newlyRunnableWithJob")
waitingStages --= newlyRunnable
runningStages ++= newlyRunnable
for {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ class DAGSchedulerSuite
// normally done by TaskSetManager
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
taskSets += taskSet
println(s"submitting taskSet $taskSet. taskSets = $taskSets")
}
override def cancelTasks(stageId: Int, interruptThread: Boolean) {
cancelledStages += stageId
Expand Down Expand Up @@ -558,18 +557,11 @@ class DAGSchedulerSuite
/** This tests the case where another FetchFailed comes in while the map stage is getting
* re-run. */
test("late fetch failures don't cause multiple concurrent attempts for the same map stage") {
println("begin late fetch failure")
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
val jobId = submit(reduceRdd, Array(0, 1))
println(s"late fetch failure: jobId = $jobId")
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
submit(reduceRdd, Array(0, 1))

val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
Expand All @@ -579,26 +571,14 @@ class DAGSchedulerSuite
// The map stage should have been submitted.
assert(countSubmittedMapStageAttempts() === 1)

println("late fetch failure: taskSets = " + taskSets)
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
// The MapOutputTracker should know about both map output locations.
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) ===
Array("hostA", "hostB"))

println("late fetch failure: taskSets = " + taskSets)
println("late fetch failure: submittedStages = " + sparkListener.submittedStageInfos)
println(s"late fetch failure: jobToStages = ${scheduler.jobIdToStageIds}")
println(s"late fetch failure: jobToActiveJob = ${scheduler.jobIdToActiveJob}")
println(s"late fetch failure: waitingStages = ${scheduler.waitingStages}")
println(s"late fetch failure: runningStages = ${scheduler.runningStages}")
println(s"late fetch failure: failedStages = ${scheduler.failedStages}")
assert(mapOutputTracker.getServerStatuses(shuffleId, 1).map(_._1.host) ===
Array("hostA", "hostB"))

// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
Expand Down Expand Up @@ -643,7 +623,6 @@ class DAGSchedulerSuite
*/
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
"the same stage") {
println("begin extremely late fetch failure")
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
Expand All @@ -661,17 +640,15 @@ class DAGSchedulerSuite
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)

println("extremely late fetch failure: taskSets = " + taskSets)
// Complete the map stage.
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))

// The reduce stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedReduceStageAttempts() === 1)

println("extremely late fetch failure: taskSets = " + taskSets)
// The first result task fails, with a fetch failure for the output from the first mapper.
runEvent(CompletionEvent(
taskSets(1).tasks(0),
Expand Down

0 comments on commit cb245da

Please sign in to comment.