Skip to content

Commit

Permalink
fix computing valid locality levels
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed Jun 10, 2014
1 parent 685ed3d commit fff4123
Showing 1 changed file with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ private[spark] class TaskSetManager(
}

// Figure out which locality levels we have in our TaskSet, so we can do delay scheduling
val myLocalityLevels = computeValidLocalityLevels()
var myLocalityLevels = computeValidLocalityLevels()
val localityWaits = myLocalityLevels.map(getLocalityWait) // Time to wait at each level

// Delay scheduling variables: we keep track of our current locality level and the time we
Expand Down Expand Up @@ -386,7 +386,7 @@ private[spark] class TaskSetManager(
val curTime = clock.getTime()

var allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
if (allowedLocality > maxLocality && myLocalityLevels.contains(maxLocality)) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}

Expand Down Expand Up @@ -723,10 +723,12 @@ private[spark] class TaskSetManager(
private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY}
val levels = new ArrayBuffer[TaskLocality.TaskLocality]
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0) {
if (!pendingTasksForExecutor.isEmpty && getLocalityWait(PROCESS_LOCAL) != 0 &&
pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
levels += PROCESS_LOCAL
}
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0) {
if (!pendingTasksForHost.isEmpty && getLocalityWait(NODE_LOCAL) != 0 &&
pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
levels += NODE_LOCAL
}
if (!pendingTasksForRack.isEmpty && getLocalityWait(RACK_LOCAL) != 0) {
Expand All @@ -750,5 +752,6 @@ private[spark] class TaskSetManager(
}
logInfo("Re-computing pending task lists.")
pendingTasksWithNoPrefs = pendingTasksWithNoPrefs.filter(!newLocAvail(_))
myLocalityLevels = computeValidLocalityLevels()
}
}

0 comments on commit fff4123

Please sign in to comment.