Skip to content

Commit

Permalink
re-compute pending tasks when new host is added
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed May 28, 2014
1 parent a225ac2 commit 3dfae86
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ private[spark] class TaskSchedulerImpl(
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

private val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true)

override def setDAGScheduler(dagScheduler: DAGScheduler) {
this.dagScheduler = dagScheduler
}
Expand Down Expand Up @@ -210,11 +212,14 @@ private[spark] class TaskSchedulerImpl(
SparkEnv.set(sc.env)

// Mark each slave as alive and remember its hostname
//also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
}

Expand All @@ -233,6 +238,9 @@ private[spark] class TaskSchedulerImpl(
// of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
if (delaySchedule && newExecAvail) {
taskSet.reAddPendingTasks()
}
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,7 @@ private[spark] class TaskSetManager(
// of task index so that tasks with low indices get launched first.
val delaySchedule = conf.getBoolean("spark.schedule.delaySchedule", true)
for (i <- (0 until numTasks).reverse) {
//if delay schedule is set, we shouldn't enforce check since executors may haven't registered yet
addPendingTask(i, enforceCheck = !delaySchedule)
addPendingTask(i)
}

// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
Expand All @@ -171,10 +170,8 @@ private[spark] class TaskSetManager(
/**
* Add a task to all the pending-task lists that it should be on. If readding is set, we are
* re-adding the task so only include it in each list if it's not already there.
* If enforceCheck is set, we'll check the availability of executors/hosts before adding a task
* to the pending list, otherwise, we simply add the task according to its preference.
*/
private def addPendingTask(index: Int, readding: Boolean = false, enforceCheck: Boolean = true) {
private def addPendingTask(index: Int, readding: Boolean = false) {
// Utility method that adds `index` to a list only if readding=false or it's not already there
def addTo(list: ArrayBuffer[Int]) {
if (!readding || !list.contains(index)) {
Expand All @@ -185,12 +182,12 @@ private[spark] class TaskSetManager(
var hadAliveLocations = false
for (loc <- tasks(index).preferredLocations) {
for (execId <- loc.executorId) {
if (!enforceCheck || sched.isExecutorAlive(execId)) {
if (sched.isExecutorAlive(execId)) {
addTo(pendingTasksForExecutor.getOrElseUpdate(execId, new ArrayBuffer))
hadAliveLocations = true
}
}
if (!enforceCheck || sched.hasExecutorsAliveOnHost(loc.host)) {
if (sched.hasExecutorsAliveOnHost(loc.host)) {
addTo(pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer))
for (rack <- sched.getRackForHost(loc.host)) {
addTo(pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer))
Expand All @@ -199,7 +196,8 @@ private[spark] class TaskSetManager(
}
}

if (!hadAliveLocations) {
if (tasks(index).preferredLocations.isEmpty ||
(!delaySchedule && !hadAliveLocations)) {
// Even though the task might've had preferred locations, all of those hosts or executors
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
addTo(pendingTasksWithNoPrefs)
Expand Down Expand Up @@ -742,4 +740,12 @@ private[spark] class TaskSetManager(
logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
levels.toArray
}

//Re-compute the pending lists. This should be called when new executor is added
def reAddPendingTasks() {
logInfo("Re-computing pending task lists.")
for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) {
addPendingTask(i, readding = true)
}
}
}

0 comments on commit 3dfae86

Please sign in to comment.