Skip to content

Commit

Permalink
[SPARK-13054] Always post TaskEnd event for tasks
Browse files Browse the repository at this point in the history
I am using dynamic container allocation and speculation and am seeing issues with the active task accounting. The Executor UI still shows active tasks on the an executor but the job/stage is all completed. I think its also affecting the dynamic allocation being able to release containers because it thinks there are still tasks.
There are multiple issues with this:
-  If the task end for tasks (in this case probably because of speculation) comes in after the stage is finished, then the DAGScheduler.handleTaskCompletion will skip the task completion event

Author: Thomas Graves <tgraves@prevailsail.corp.gq1.yahoo.com>
Author: Thomas Graves <tgraves@staydecay.corp.gq1.yahoo.com>
Author: Tom Graves <tgraves@yahoo-inc.com>

Closes apache#10951 from tgravescs/SPARK-11701.
  • Loading branch information
Thomas Graves authored and roygao94 committed Mar 22, 2016
1 parent 28dbd73 commit b214f39
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1148,13 +1148,13 @@ class DAGScheduler(
null
}

// The success case is dealt with separately below.
// TODO: Why post it only for failed tasks in cancelled stages? Clarify semantics here.
if (event.reason != Success) {
val attemptId = task.stageAttemptId
listenerBus.post(SparkListenerTaskEnd(
stageId, attemptId, taskType, event.reason, event.taskInfo, taskMetrics))
}
// The stage may have already finished when we get this event -- eg. maybe it was a
// speculative task. It is important that we send the TaskEnd event in any case, so listeners
// are properly notified and can chose to handle it. For instance, some listeners are
// doing their own accounting and if they don't get the task end event they think
// tasks are still running when they really aren't.
listenerBus.post(SparkListenerTaskEnd(
stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

if (!stageIdToStage.contains(task.stageId)) {
// Skip all the actions if the stage has been cancelled.
Expand All @@ -1164,8 +1164,6 @@ class DAGScheduler(
val stage = stageIdToStage(task.stageId)
event.reason match {
case Success =>
listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
event.reason, event.taskInfo, taskMetrics))
stage.pendingPartitions -= task.partitionId
task match {
case rt: ResultTask[_, _] =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
val endedTasks = new HashSet[Long]

override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
Expand All @@ -148,6 +149,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
failedStages += stageInfo.stageId
}
}

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
}
}

var mapOutputTracker: MapOutputTrackerMaster = null
Expand Down Expand Up @@ -195,6 +200,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
sparkListener.submittedStageInfos.clear()
sparkListener.successfulStages.clear()
sparkListener.failedStages.clear()
sparkListener.endedTasks.clear()
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
Expand Down Expand Up @@ -982,6 +988,52 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
assert(countSubmittedMapStageAttempts() === 2)
}

test("task events always posted in speculation / when stage is killed") {
val baseRdd = new MyRDD(sc, 4, Nil)
val finalRdd = new MyRDD(sc, 4, List(new OneToOneDependency(baseRdd)))
submit(finalRdd, Array(0, 1, 2, 3))

// complete two tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(0), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(0)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(sparkListener.endedTasks.size == 2)

// finish other 2 tasks
runEvent(makeCompletionEvent(
taskSets(0).tasks(2), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(2)))
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 4)

// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))

// Stage should be complete. Finish one other Successful task to simulate what can happen
// with a speculative task and make sure the event is sent out
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 5)

// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty[AccumulableInfo], createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(sparkListener.endedTasks.size == 6)
}

test("ignore late map task completions") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, new HashPartitioner(2))
Expand Down Expand Up @@ -1944,6 +1996,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
info
}

private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener
info
}

private def makeCompletionEvent(
task: Task[_],
reason: TaskEndReason,
Expand Down

0 comments on commit b214f39

Please sign in to comment.