You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// key为executorId,value为在该executor上有缓存的数据块对应的taskid数组
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
...
//遍历所有该TaskSet的所有task进行添加
for (i <- (0 until numTasks).reverse) {
addPendingTask(i)
}
...
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
}
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
}
case _ =>
}
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
}
}
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
}
allPendingTasks += index // No point scanning this whole list to find the old task there
}
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
// Remove the scheduled or finished tasks lazily
def tasksNeedToBeScheduledFrom(pendingTaskIds: ArrayBuffer[Int]): Boolean = {
var indexOffset = pendingTaskIds.size
while (indexOffset > 0) {
indexOffset -= 1
val index = pendingTaskIds(indexOffset)
if (copiesRunning(index) == 0 && !successful(index)) {
return true
} else {
pendingTaskIds.remove(indexOffset)
}
}
false
}
// Walk through the list of tasks that can be scheduled at each location and returns true
// if there are any tasks that still need to be scheduled. Lazily cleans up tasks that have
// already been scheduled.
def moreTasksToRunIn(pendingTasks: HashMap[String, ArrayBuffer[Int]]): Boolean = {
val emptyKeys = new ArrayBuffer[String]
val hasTasks = pendingTasks.exists {
case (id: String, tasks: ArrayBuffer[Int]) =>
if (tasksNeedToBeScheduledFrom(tasks)) {
true
} else {
emptyKeys += id
false
}
}
// The key could be executorId, host or rackId
emptyKeys.foreach(id => pendingTasks.remove(id))
hasTasks
}
while (currentLocalityIndex < myLocalityLevels.length - 1) {
val moreTasks = myLocalityLevels(currentLocalityIndex) match {
case TaskLocality.PROCESS_LOCAL => moreTasksToRunIn(pendingTasksForExecutor)
case TaskLocality.NODE_LOCAL => moreTasksToRunIn(pendingTasksForHost)
case TaskLocality.NO_PREF => pendingTasksWithNoPrefs.nonEmpty
case TaskLocality.RACK_LOCAL => moreTasksToRunIn(pendingTasksForRack)
}
if (!moreTasks) {
// This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality
// wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1
} else {
return myLocalityLevels(currentLocalityIndex)
}
}
myLocalityLevels(currentLocalityIndex)
}
前言
Spark数据本地化即移动计算而不是移动数据,而现实又是残酷的,不是想要在数据块的地方计算就有足够的资源提供,为了让task能尽可能的以最优本地化级别(Locality Levels)来启动,Spark的延迟调度应运而生,资源不够可在该Locality Levels对应的限制时间内重试,超过限制时间后还无法启动则降低Locality Levels再尝试启动……
本地化级别(Locality Levels)
这些Task的本地化级别其实描述的就是计算与数据的位置关系,这个最终的关系是如何产生的呢?接下来对其来龙去脉进行详细的讲解。
DAGScheduler提交tasks
DAGScheduler对job进行stage划分完后,会通过submitMissingTasks方法将Stage以TaskSet的形式提交给TaskScheduler,看看该方法关于位置优先的一些代码:
注意这里提交的TaskSet里面的Task已经包含了该Task的优先位置,而该优先位置是通过getPreferredLocs方法获取,可以简单看看其实现:
无论是通过哪种方式获取RDD分区的优先位置,第一次计算的数据来源肯定都是通过RDD的preferredLocations方法获取的,不同的RDD有不同的preferredLocations实现,但是数据无非就是在三个地方存在,被cache到内存、HDFS、磁盘,而这三种方式的TaskLocation都有具体的实现:
所以,在实例化Task的时候传的优先位置就是这三种的其中一种。
Locality levels生成
DAGScheduler将TaskSet提交给TaskScheduler后,TaskScheduler会为每个TaskSet创建一个TaskSetMagager来对其Task进行管理,在初始化TaskSetMagager的时候就会通过computeValidLocalityLevels计算该TaskSet包含的locality levels:
程序会依次判断该TaskSetMagager是否包含各个级别,逻辑都类似,我们就细看第一个,pendingTasksForExecutor的定义与添加:
注意这里的addPendingTask方法,会遍历该TaskSetMagager管理的所有Task的优先位置(上文已解析),若是ExecutorCacheTaskLocation (缓存在内存中)则添加对应的executorId和taskId到pendingTasksForExecutor,同时还会添加到低级别需要的pendingTasksForHost、pendingTasksForRack中,说明假设一个 task 的最优本地性为 X,那么该 task 同时也具有其他所有本地性比X差的本地性。
回到上面的本地性级别判断:
只要是看第三个判断 pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive())),其中,pendingTasksForExecutor.keySet就是上面说明的存在有与task对应的数据块被缓存在executor中的executorId,sched.isExecutorAlive()就是判断参数中的 executor id 当前是否 active。所以整行代码意思是存在有与task对应的数据块被缓存在executor中的executors是否有active的,若有则添加PROCESS_LOCAL级别到该TaskSet的LocalityLevels中。
后面的其他本地性级别是同样的逻辑就不细讲了,区别是如判断存在有与task对应的数据块在某些节点中的hosts是否有Alive的等……
至此,TaskSet包含的LocalityLevels就已经计算完。
延迟调度策略
若spark跑在yarn上,也有两层延迟调度,第一层就是yarn尽量将spark的executor分配到有数据的nodemanager上,这一层没有做到data locality,到spark阶段,data locality更不可能了。
延迟调度的目的是为了较小网络及IO开销,在数据量大而计算逻辑简单(task执行时间小于数据传输时间)的情况下表现明显。
Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动。
TaskSetMagager会以某一种TaskSet包含的本地性级别遍历每个可用executor资源尝试在该executor上启动当前管理的tasks,那么是如何决定某个task能否在该executor上启动呢?首先都会通过getAllowedLocalityLevel(curTime)方法计算当前TaskSetMagager中未执行的tasks的最高本地级别:
循环条件里的currentLocalityIndex是getAllowedLocalityLevel 前一次被调用返回的LocalityIndex在 myLocalityLevels 中的索引,初始值为0,myLocalityLevels则是TaskSetMagager所有tasks包含的本地性级别。
至此,就取出了该TaskSetMagager中未执行的tasks的最高本地性级别(取和maxLocality中级别高的作为最终的allowedLocality)。
最终决定是否在某个executor上启动某个task的是方法dequeueTask(execId, host, allowedLocality)
通过TaskLocality.isAllowed方法来保证只以比allowedLocality级别高(可相等)的locality来启动task,因为一个 task 拥有比最优本地性 差的其他所有本地性。这样就保证了能尽可能的以高本地性级别来启动一个task。
优化建议
可用过Spark UI来查看某个job的task的locality level,若都是NODE_LOCAL、ANY,那么可调整数据本地化的等待时长:
The text was updated successfully, but these errors were encountered: