Skip to content

Commit

Permalink
revise patch
Browse files Browse the repository at this point in the history
  • Loading branch information
Rui Li committed Jun 5, 2014
1 parent 3d7da02 commit c7b93b5
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -209,16 +209,15 @@ private[spark] class TaskSchedulerImpl(
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)

val sortedTaskSets = rootPool.getSortedTaskSetQueue
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
for (taskSet <- sortedTaskSets) {
taskSet.executorAdded(o.executorId, o.host)
}
newExecAvail = true
}
}

Expand All @@ -227,9 +226,13 @@ private[spark] class TaskSchedulerImpl(
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}

// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -752,15 +752,12 @@ private[spark] class TaskSetManager(
}

// Re-compute pendingTasksWithNoPrefs since new preferred locations may become available
def executorAdded(execId: String, host: String) {
def executorAdded() {
def newLocAvail(index: Int): Boolean = {
for (loc <- tasks(index).preferredLocations) {
if (execId.equals(loc.executorId.getOrElse(null)) || host.equals(loc.host)) {
return true
}
val availRack = sched.getRackForHost(host)
val prefRack = sched.getRackForHost(loc.host)
if (prefRack.isDefined && prefRack.get.equals(availRack.getOrElse(null))) {
if (sched.hasExecutorsAliveOnHost(loc.host) ||
(loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) ||
sched.getRackForHost(loc.host).isDefined) {
return true
}
}
Expand Down

0 comments on commit c7b93b5

Please sign in to comment.