From 3dfae86eb492f8b39b34fabf9fb80e7382ba7486 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Wed, 28 May 2014 19:21:27 +0800 Subject: [PATCH] re-compute pending tasks when new host is added --- .../spark/scheduler/TaskSchedulerImpl.scala | 8 +++++++ .../spark/scheduler/TaskSetManager.scala | 22 ++++++++++++------- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 17292b4c15b8b..eae315ea5c1f6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -111,6 +111,8 @@ private[spark] class TaskSchedulerImpl( // This is a var so that we can reset it for testing purposes. private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this) + private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) + override def setDAGScheduler(dagScheduler: DAGScheduler) { this.dagScheduler = dagScheduler } @@ -210,11 +212,14 @@ private[spark] class TaskSchedulerImpl( SparkEnv.set(sc.env) // Mark each slave as alive and remember its hostname + //also track if new executor is added + var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) + newExecAvail = true } } @@ -233,6 +238,9 @@ private[spark] class TaskSchedulerImpl( // of locality levels so that it gets a chance to launch local tasks on all of them. var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { + if (delaySchedule && newExecAvail) { + taskSet.reAddPendingTasks() + } do { launchedTask = false for (i <- 0 until shuffledOffers.size) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index ed38ba755cc91..a16b1a44fd61d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -150,8 +150,7 @@ private[spark] class TaskSetManager( // of task index so that tasks with low indices get launched first. val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true) for (i <- (0 until numTasks).reverse) { - //if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet - addPendingTask(i, enforceCheck = !delaySchedule) + addPendingTask(i) } // Figure out which locality levels we have in our TaskSet, so we can do delay scheduling @@ -171,10 +170,8 @@ private[spark] class TaskSetManager( /** * Add a task to all the pending-task lists that it should be on. If readding is set, we are * re-adding the task so only include it in each list if it's not already there. - * If enforceCheck is set, we'll check the availability of executors/hosts before adding a task - * to the pending list, otherwise, we simply add the task according to its preference. */ - private def addPendingTask(index: Int, readding: Boolean = false, enforceCheck: Boolean = true) { + private def addPendingTask(index: Int, readding: Boolean = false) { // Utility method that adds `index` to a list only if readding=false or it's not already there def addTo(list: ArrayBuffer[Int]) { if (!readding || !list.contains(index)) { @@ -185,12 +182,12 @@ private[spark] class TaskSetManager( var hadAliveLocations = false for (loc <- tasks(index).preferredLocations) { for (execId <- loc.executorId) { - if (!enforceCheck || sched.isExecutorAlive(execId)) { + if (sched.isExecutorAlive(execId)) { addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer)) hadAliveLocations = true } } - if (!enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) { + if (sched.hasExecutorsAliveOnHost(loc.host)) { addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer)) for (rack <- sched.getRackForHost(loc.host)) { addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer)) @@ -199,7 +196,8 @@ private[spark] class TaskSetManager( } } - if (!hadAliveLocations) { + if (tasks(index).preferredLocations.isEmpty || + (!delaySchedule && !hadAliveLocations)) { // Even though the task might've had preferred locations, all of those hosts or executors // are dead; put it in the no-prefs list so we can schedule it elsewhere right away. addTo(pendingTasksWithNoPrefs) @@ -742,4 +740,12 @@ private[spark] class TaskSetManager( logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", ")) levels.toArray } + + //Re-compute the pending lists. This should be called when new executor is added + def reAddPendingTasks() { + logInfo("Re-computing pending task lists.") + for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) { + addPendingTask(i, readding = true) + } + } }