Skip to content

Commit

Permalink
[SPARK-18886][CORE] Make Locality wait time measure resource under ut…
Browse files Browse the repository at this point in the history
…ilization due to delay scheduling

### What changes were proposed in this pull request?

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of apache#26696 for more discussion.

### Why are the changes needed?

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

### How was this patch tested?

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes apache#27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
bmarcott authored and Seongjin Cho committed Apr 14, 2020
1 parent 471b812 commit e320227
Show file tree
Hide file tree
Showing 10 changed files with 506 additions and 154 deletions.
10 changes: 10 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,16 @@ package object config {
.version("1.2.0")
.fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)

private[spark] val LEGACY_LOCALITY_WAIT_RESET =
ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch")
.doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " +
"anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " +
"documentation for more details.")
.internal()
.version("3.0.0")
.booleanConf
.createWithDefault(false)

private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
.version("0.5.0")
.timeConf(TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Utils}

/**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
Expand All @@ -57,11 +57,24 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
* * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
* scheduling
* * task-result-getter threads
*
* Delay Scheduling:
* Delay scheduling is an optimization that sacrifices job fairness for data locality in order to
* improve cluster and workload throughput. One useful definition of "delay" is how much time
* has passed since the TaskSet was using its fair share of resources. Since it is impractical to
* calculate this delay without a full simulation, the heuristic used is the time since the
* TaskSetManager last launched a task and has not rejected any resources due to delay scheduling
* since it was last offered its "fair share". A "fair share" offer is when [[resourceOffers]]'s
* parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource
* is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]).
* The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task,
* and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true.
*/
private[spark] class TaskSchedulerImpl(
val sc: SparkContext,
val maxTaskFailures: Int,
isLocal: Boolean = false)
isLocal: Boolean = false,
clock: Clock = new SystemClock)
extends TaskScheduler with Logging {

import TaskSchedulerImpl._
Expand Down Expand Up @@ -97,6 +110,11 @@ private[spark] class TaskSchedulerImpl(
// on this class. Protected by `this`
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]

// keyed by taskset
// value is true if the task set's locality wait timer was reset on the last resource offer
private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)

// Protected by `this`
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
// Protected by `this`
Expand Down Expand Up @@ -125,7 +143,6 @@ private[spark] class TaskSchedulerImpl(
protected val executorIdToHost = new HashMap[String, String]

private val abortTimer = new Timer(true)
private val clock = new SystemClock
// Exposed for testing
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]

Expand Down Expand Up @@ -319,20 +336,38 @@ private[spark] class TaskSchedulerImpl(
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
}
}
resetOnPreviousOffer -= manager.taskSet
manager.parent.removeSchedulable(manager)
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
s" ${manager.parent.name}")
}

/**
* Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]].
*
* @param taskSet task set manager to offer resources to
* @param maxLocality max locality to allow when scheduling
* @param shuffledOffers shuffled resource offers to use for scheduling,
* remaining resources are tracked by below fields as tasks are scheduled
* @param availableCpus remaining cpus per offer,
* value at index 'i' corresponds to shuffledOffers[i]
* @param availableResources remaining resources per offer,
* value at index 'i' corresponds to shuffledOffers[i]
* @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i]
* @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks
* @return tuple of (had delay schedule rejects?, option of min locality of launched task)
*/
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = {
var launchedTask = false
addressesWithDescs: ArrayBuffer[(String, TaskDescription)])
: (Boolean, Option[TaskLocality]) = {
var noDelayScheduleRejects = true
var minLaunchedLocality: Option[TaskLocality] = None
// nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point
for (i <- 0 until shuffledOffers.size) {
Expand All @@ -348,11 +383,14 @@ private[spark] class TaskSchedulerImpl(
try {
val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality,
taskResAssignments)
val (taskDescOption, didReject) =
taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments)
noDelayScheduleRejects &= !didReject
for (task <- taskDescOption) {
tasks(i) += task
val tid = task.taskId
val locality = taskSet.taskInfos(task.taskId).taskLocality
minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality))
taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid)
Expand All @@ -372,19 +410,18 @@ private[spark] class TaskSchedulerImpl(
// The executor address is expected to be non empty.
addressesWithDescs += (shuffledOffers(i).address.get -> task)
}
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
return (noDelayScheduleRejects, minLaunchedLocality)
}
}
}
}
launchedTask
(noDelayScheduleRejects, minLaunchedLocality)
}

/**
Expand Down Expand Up @@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl(
}.sum
}

private def minTaskLocality(
l1: Option[TaskLocality],
l2: Option[TaskLocality]) : Option[TaskLocality] = {
if (l1.isEmpty) {
l2
} else if (l2.isEmpty) {
l1
} else if (l1.get < l2.get) {
l1
} else {
l2
}
}

/**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster.
*/
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
Expand Down Expand Up @@ -544,18 +597,34 @@ private[spark] class TaskSchedulerImpl(
s"number of available slots is $numBarrierSlotsAvailable.")
} else {
var launchedAnyTask = false
var noDelaySchedulingRejects = true
var globalMinLocality: Option[TaskLocality] = None
// Record all the executor IDs assigned barrier tasks on.
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet,
currentMaxLocality, shuffledOffers, availableCpus,
val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks, addressesWithDescs)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality)
}

if (!legacyLocalityWaitReset) {
if (noDelaySchedulingRejects && launchedAnyTask) {
if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
taskSet.resetDelayScheduleTimer(globalMinLocality)
resetOnPreviousOffer.update(taskSet.taskSet, true)
}
} else {
resetOnPreviousOffer.update(taskSet.taskSet, false)
}
}

if (!launchedAnyTask) {
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// If the taskSet is unschedulable we try to find an existing idle blacklisted
Expand Down
49 changes: 33 additions & 16 deletions core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,11 @@ private[spark] class TaskSetManager(
private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)

// Delay scheduling variables: we keep track of our current locality level and the time we
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task.
// last reset the locality wait timer, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task when resetting the timer
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level
private var lastLocalityWaitResetTime = clock.getTimeMillis() // Time we last reset locality wait

override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null

Expand Down Expand Up @@ -386,6 +387,14 @@ private[spark] class TaskSetManager(
None
}

private[scheduler] def resetDelayScheduleTimer(
minLocality: Option[TaskLocality.TaskLocality]): Unit = {
lastLocalityWaitResetTime = clock.getTimeMillis()
for (locality <- minLocality) {
currentLocalityIndex = getLocalityIndex(locality)
}
}

/**
* Respond to an offer of a single executor from the scheduler by finding a task
*
Expand All @@ -396,14 +405,17 @@ private[spark] class TaskSetManager(
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
*
* @return Tuple containing:
* (TaskDescription of launched task if any, rejected resource due to delay scheduling?)
*/
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality,
taskResourceAssignments: Map[String, ResourceInformation] = Map.empty)
: Option[TaskDescription] =
: (Option[TaskDescription], Boolean) =
{
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
Expand All @@ -422,7 +434,9 @@ private[spark] class TaskSetManager(
}
}

dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
val taskDescription =
dequeueTask(execId, host, allowedLocality)
.map { case (index, taskLocality, speculative) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
Expand All @@ -433,11 +447,8 @@ private[spark] class TaskSetManager(
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) {
resetDelayScheduleTimer(Some(taskLocality))
}
// Serialize and return the task
val serializedTask: ByteBuffer = try {
Expand Down Expand Up @@ -482,8 +493,14 @@ private[spark] class TaskSetManager(
taskResourceAssignments,
serializedTask)
}
val hasPendingTasks = pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty
val hasScheduleDelayReject =
taskDescription.isEmpty &&
maxLocality == TaskLocality.ANY &&
hasPendingTasks
(taskDescription, hasScheduleDelayReject)
} else {
None
(None, false)
}
}

Expand Down Expand Up @@ -547,14 +564,14 @@ private[spark] class TaskSetManager(
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
lastLocalityWaitResetTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
} else if (curTime - lastLocalityWaitResetTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLocalityWaitResetTime so that the next
// locality wait timer doesn't immediately expire
lastLocalityWaitResetTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
(rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
scheduler.resourceOffers(workOffers, true)
}
if (taskDescs.nonEmpty) {
launchTasks(taskDescs)
Expand Down Expand Up @@ -331,7 +331,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId))
scheduler.resourceOffers(workOffers)
scheduler.resourceOffers(workOffers, false)
} else {
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ private[spark] class LocalEndpoint(
// local mode doesn't support extra resources like GPUs right now
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort)))
for (task <- scheduler.resourceOffers(offers).flatten) {
for (task <- scheduler.resourceOffers(offers, true).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,7 @@ class StandaloneDynamicAllocationSuite

val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.resourceOffers(any())).thenReturn(Nil)
when(taskScheduler.resourceOffers(any(), any[Boolean])).thenReturn(Nil)
when(taskScheduler.sc).thenReturn(sc)

val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
"t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]])).thenReturn(taskDescs)
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)

backend.driverEndpoint.send(ReviveOffers)

Expand Down
Loading

0 comments on commit e320227

Please sign in to comment.