Skip to content

Commit

Permalink
style
Browse files Browse the repository at this point in the history
  • Loading branch information
squito committed Jun 10, 2015
1 parent 55f4a94 commit b6bc248
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1108,8 +1108,8 @@ class DAGScheduler(
} else {

// It is likely that we receive multiple FetchFailed for a single stage (because we have
// multiple tasks running concurrently on different executors). In that case, it is possible
// the fetch failure has already been handled by the scheduler.
// multiple tasks running concurrently on different executors). In that case, it is
// possible the fetch failure has already been handled by the scheduler.
if (runningStages.contains(failedStage)) {
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
s"due to a fetch failure from $mapStage (${mapStage.name})")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -611,15 +611,16 @@ class DAGSchedulerSuite
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)

// NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't effect anything --
// our calling it just makes *SURE* it gets called between the desired event and our check.

// NOTE: the actual ResubmitFailedStages may get called at any time during this, shouldn't
// effect anything -- our calling it just makes *SURE* it gets called between the desired event
// and our check.
}

/** This tests the case where a late FetchFailed comes in after the map stage has finished getting
* retried and a new reduce stage starts running.
*/
test("extremely late fetch failures don't cause multiple concurrent attempts for the same stage") {
test("extremely late fetch failures don't cause multiple concurrent attempts for " +
"the same stage") {
val shuffleMapRdd = new MyRDD(sc, 2, Nil)
val shuffleDep = new ShuffleDependency(shuffleMapRdd, null)
val shuffleId = shuffleDep.shuffleId
Expand Down

0 comments on commit b6bc248

Please sign in to comment.