diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 468aa8268aa4d..3db70bb565f92 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -462,14 +462,14 @@ private[spark] class TaskSchedulerImpl( } override def executorLost(executorId: String, reason: ExecutorLossReason): Unit = { - var failedExecutor: Option[String] = None + var dagSchedulerShouldRemoveExecutor: Boolean = false synchronized { if (executorIdToRunningTaskIds.contains(executorId)) { val hostPort = executorIdToHost(executorId) logExecutorLoss(executorId, hostPort, reason) removeExecutor(executorId, reason) - failedExecutor = Some(executorId) + dagSchedulerShouldRemoveExecutor = true } else { executorIdToHost.get(executorId) match { case Some(hostPort) => @@ -489,14 +489,14 @@ private[spark] class TaskSchedulerImpl( } } } - // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock - // When an executor is never used, we should ask dag scheduler to clean it up so that it can be - // removed from BlockManager - if (failedExecutor.isDefined) { + // Call dagScheduler.executorLost without holding the lock on this to prevent deadlock. + if (dagSchedulerShouldRemoveExecutor) { dagScheduler.executorLost(executorId) backend.reviveOffers() } else { - logInfo(s"Call BlockManager Master to remove executor $executorId") + // When an executor is never used, we should ask BlockManagerMaster to remove this executor + // from its accounting. + logInfo(s"Call BlockManagerMaster to remove executor $executorId") sc.env.blockManager.master.removeExecutorAsync(executorId) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d75243957c4b4..a3355d91ed45c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -271,8 +271,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // SPARK-15262: If an executor is still alive even after the scheduler has removed // its metadata, we may receive a heartbeat from that executor and tell its block // manager to reregister itself. If that happens, the block manager master will know - // about the executor, but the scheduler will not. Therefore, we should remove the - // executor from the block manager when we hit this case. + // about the executor, but the scheduler will not. (The same state will occur if an + // executor has been dynamically allocated but never assigned any task: SPARK-21876.) + // Therefore, we should remove the executor from the block manager when we hit this case. scheduler.sc.env.blockManager.master.removeExecutorAsync(executorId) logInfo(s"Asked to remove non-existent executor $executorId") }