Skip to content

Commit

Permalink
[SPARK-27112][CORE] : Create a resource ordering between threads to r…
Browse files Browse the repository at this point in the history
…esolve the deadlocks encountered when trying to kill executors either due to dynamic allocation or blacklisting

Closes apache#24072 from pgandhi999/SPARK-27112-2.

Authored-by: pgandhi <pgandhiverizonmedia.com>
Signed-off-by: Imran Rashid <irashidcloudera.com>

## What changes were proposed in this pull request?

There are two deadlocks as a result of the interplay between three different threads:

**task-result-getter thread**

**spark-dynamic-executor-allocation thread**

**dispatcher-event-loop thread(makeOffers())**

The fix ensures ordering synchronization constraint by acquiring lock on `TaskSchedulerImpl` before acquiring lock on `CoarseGrainedSchedulerBackend` in `makeOffers()` as well as killExecutors() method. This ensures resource ordering between the threads and thus, fixes the deadlocks.

## How was this patch tested?

Manual Tests

Closes apache#24134 from pgandhi999/branch-2.4-SPARK-27112.

Authored-by: pgandhi <pgandhi@verizonmedia.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
  • Loading branch information
pgandhi authored and kai-chi committed Jul 23, 2019
1 parent 54c34ee commit e329e1f
Showing 1 changed file with 10 additions and 3 deletions.
Expand Up @@ -237,7 +237,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
Expand All @@ -263,7 +263,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make fake resource offers on just one executor
private def makeOffers(executorId: String) {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
val executorData = executorDataMap(executorId)
Expand Down Expand Up @@ -607,7 +607,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
force: Boolean): Seq[String] = {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")

val response = synchronized {
val response = withLock {
val (knownExecutors, unknownExecutors) = executorIds.partition(executorDataMap.contains)
unknownExecutors.foreach { id =>
logWarning(s"Executor to kill $id does not exist!")
Expand Down Expand Up @@ -685,6 +685,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None }

// SPARK-27112: We need to ensure that there is ordering of lock acquisition
// between TaskSchedulerImpl and CoarseGrainedSchedulerBackend objects in order to fix
// the deadlock issue exposed in SPARK-27112
private def withLock[T](fn: => T): T = scheduler.synchronized {
CoarseGrainedSchedulerBackend.this.synchronized { fn }
}
}

private[spark] object CoarseGrainedSchedulerBackend {
Expand Down

0 comments on commit e329e1f

Please sign in to comment.