Skip to content

Commit

Permalink
remove delay shedule for pendingTasksWithNoPrefs
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed Jun 7, 2014
1 parent 7b0177a commit 685ed3d
Showing 1 changed file with 1 addition and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,8 @@ private[spark] class TaskSetManager(
clock: Clock = SystemClock)
extends Schedulable with Logging
{
// Remember when this TaskSetManager is created
val creationTime = clock.getTime()
val conf = sched.sc.conf

// The period we wait for new executors to come up
// After this period, tasks in pendingTasksWithNoPrefs will be considered as PROCESS_LOCAL
private val WAIT_NEW_EXEC_TIMEOUT = conf.getLong("spark.scheduler.waitNewExecutorTime", 3000L)
private var waitingNewExec = true

/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
Expand Down Expand Up @@ -366,8 +359,7 @@ private[spark] class TaskSetManager(
}

// Look for no-pref tasks after rack-local tasks since they can run anywhere.
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)
if (!waitingNewExec || tasks(index).preferredLocations.isEmpty)) {
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}

Expand Down Expand Up @@ -397,9 +389,6 @@ private[spark] class TaskSetManager(
if (allowedLocality > maxLocality) {
allowedLocality = maxLocality // We're not allowed to search for farther-away tasks
}
if (waitingNewExec && curTime - creationTime > WAIT_NEW_EXEC_TIMEOUT) {
waitingNewExec = false
}

findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
Expand Down

0 comments on commit 685ed3d

Please sign in to comment.