diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index eae315ea5c1f6..309db8263075f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -212,7 +212,7 @@ private[spark] class TaskSchedulerImpl( SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname - //also track if new executor is added + // Also track if new executor is added var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host @@ -232,15 +232,15 @@ private[spark] class TaskSchedulerImpl( for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) + if (delaySchedule && newExecAvail) { + taskSet.reAddPendingTasks() + } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { - if (delaySchedule && newExecAvail) { - taskSet.reAddPendingTasks() - } do { launchedTask = false for (i <- 0 until shuffledOffers.size) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a16b1a44fd61d..8b9f46261bf54 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -196,8 +196,7 @@ private[spark] class TaskSetManager( } } - if (tasks(index).preferredLocations.isEmpty || - (!delaySchedule && !hadAliveLocations)) { + if (tasks(index).preferredLocations.isEmpty || (!delaySchedule && !hadAliveLocations)) { // Even though the task might've had preferred locations, all of those hosts or executors // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. addTo(pendingTasksWithNoPrefs) @@ -744,7 +743,8 @@ private[spark] class TaskSetManager( //Re-compute the pending lists. This should be called when new executor is added def reAddPendingTasks() { logInfo("Re-computing pending task lists.") - for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) { + for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 + && !successful(index))) { addPendingTask(i, readding = true) } }