Skip to content

Commit

Permalink
[SPARK-45227][CORE] Fix an issue with CoarseGrainedExecutorBackend wh…
Browse files Browse the repository at this point in the history
…ere an executor process randomly gets stuck
  • Loading branch information
Bo Xiong committed Sep 29, 2023
1 parent 48138eb commit 9798678
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.mutable
import scala.util.{Failure, Success}
import scala.util.control.NonFatal

Expand Down Expand Up @@ -71,9 +71,12 @@ private[spark] class CoarseGrainedExecutorBackend(
/**
* Map each taskId to the information about the resource allocated to it, Please refer to
* [[ResourceInformation]] for specifics.
* CHM is used to ensure thread-safety (https://issues.apache.org/jira/browse/SPARK-45227)
* Exposed for testing only.
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String, ResourceInformation]]
private[executor] val taskResources = new ConcurrentHashMap[
Long, Map[String, ResourceInformation]
]

private var decommissioned = false

Expand Down Expand Up @@ -184,7 +187,7 @@ private[spark] class CoarseGrainedExecutorBackend(
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
taskResources(taskDesc.taskId) = taskDesc.resources
taskResources.put(taskDesc.taskId, taskDesc.resources)
executor.launchTask(this, taskDesc)
}

Expand Down Expand Up @@ -261,7 +264,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit = {
val resources = taskResources.getOrElse(taskId, Map.empty[String, ResourceInformation])
val resources = taskResources.getOrDefault(taskId, Map.empty[String, ResourceInformation])
val msg = StatusUpdate(executorId, taskId, state, data, resources)
if (TaskState.isFinished(state)) {
taskResources.remove(taskId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
resourceProfile = ResourceProfile.getOrCreateDefaultProfile(conf))
assert(backend.taskResources.isEmpty)

val taskId = 1000000
val taskId = 1000000L
// We don't really verify the data, just pass it around.
val data = ByteBuffer.wrap(Array[Byte](1, 2, 3, 4))
val taskDescription = new TaskDescription(taskId, 2, "1", "TASK 1000000", 19,
Expand All @@ -314,14 +314,14 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
backend.self.send(LaunchTask(new SerializableBuffer(serializedTaskDescription)))
eventually(timeout(10.seconds)) {
assert(backend.taskResources.size == 1)
val resources = backend.taskResources(taskId)
val resources = backend.taskResources.get(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))
}

// Update the status of a running task shall not affect `taskResources` map.
backend.statusUpdate(taskId, TaskState.RUNNING, data)
assert(backend.taskResources.size == 1)
val resources = backend.taskResources(taskId)
val resources = backend.taskResources.get(taskId)
assert(resources(GPU).addresses sameElements Array("0", "1"))

// Update the status of a finished task shall remove the entry from `taskResources` map.
Expand Down

0 comments on commit 9798678

Please sign in to comment.