From c7b93b5dce7d92ddd10e4b4c92fc5b7683cc0131 Mon Sep 17 00:00:00 2001 From: Rui Li Date: Thu, 5 Jun 2014 22:25:12 +0800 Subject: [PATCH] revise patch --- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 11 +++++++---- .../org/apache/spark/scheduler/TaskSetManager.scala | 11 ++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 2862542c38c4a..dc7a6aedd989b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -209,16 +209,15 @@ private[spark] class TaskSchedulerImpl( def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) - val sortedTaskSets = rootPool.getSortedTaskSetQueue // Mark each slave as alive and remember its hostname + // Also track if new executor is added + var newExecAvail = false for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) - for (taskSet <- sortedTaskSets) { - taskSet.executorAdded(o.executorId, o.host) - } + newExecAvail = true } } @@ -227,9 +226,13 @@ private[spark] class TaskSchedulerImpl( // Build a list of tasks to assign to each worker. val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray + val sortedTaskSets = rootPool.getSortedTaskSetQueue for (taskSet <- sortedTaskSets) { logDebug("parentName: %s, name: %s, runningTasks: %s".format( taskSet.parent.name, taskSet.name, taskSet.runningTasks)) + if (newExecAvail) { + taskSet.executorAdded() + } } // Take each TaskSet in our scheduling order, and then offer it each node in increasing order diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index 4346a44e071fd..7ce5dcbf415e7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -752,15 +752,12 @@ private[spark] class TaskSetManager( } // Re-compute pendingTasksWithNoPrefs since new preferred locations may become available - def executorAdded(execId: String, host: String) { + def executorAdded() { def newLocAvail(index: Int): Boolean = { for (loc <- tasks(index).preferredLocations) { - if (execId.equals(loc.executorId.getOrElse(null)) || host.equals(loc.host)) { - return true - } - val availRack = sched.getRackForHost(host) - val prefRack = sched.getRackForHost(loc.host) - if (prefRack.isDefined && prefRack.get.equals(availRack.getOrElse(null))) { + if (sched.hasExecutorsAliveOnHost(loc.host) || + (loc.executorId.isDefined && sched.isExecutorAlive(loc.executorId.get)) || + sched.getRackForHost(loc.host).isDefined) { return true } }