Skip to content

Commit

Permalink
fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed Jun 4, 2014
1 parent 3dfae86 commit cf0d6ac
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ private[spark] class TaskSchedulerImpl(
SparkEnv.set(sc.env)

// Mark each slave as alive and remember its hostname
//also track if new executor is added
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
Expand All @@ -232,15 +232,15 @@ private[spark] class TaskSchedulerImpl(
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (delaySchedule && newExecAvail) {
taskSet.reAddPendingTasks()
}
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
if (delaySchedule && newExecAvail) {
taskSet.reAddPendingTasks()
}
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,7 @@ private[spark] class TaskSetManager(
}
}

if (tasks(index).preferredLocations.isEmpty ||
(!delaySchedule && !hadAliveLocations)) {
if (tasks(index).preferredLocations.isEmpty || (!delaySchedule && !hadAliveLocations)) {
// Even though the task might've had preferred locations, all of those hosts or executors
// are dead; put it in the no-prefs list so we can schedule it elsewhere right away.
addTo(pendingTasksWithNoPrefs)
Expand Down Expand Up @@ -744,7 +743,8 @@ private[spark] class TaskSetManager(
//Re-compute the pending lists. This should be called when new executor is added
def reAddPendingTasks() {
logInfo("Re-computing pending task lists.")
for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0 && !successful(index))) {
for (i <- (0 until numTasks).reverse.filter(index => copiesRunning(index) == 0
&& !successful(index))) {
addPendingTask(i, readding = true)
}
}
Expand Down

0 comments on commit cf0d6ac

Please sign in to comment.