You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes.*/privatedefschedule():Unit= {
if (state !=RecoveryState.ALIVE) {
return
}
// Drivers take strict precedence over executorsvalshuffledAliveWorkers=Random.shuffle(workers.toSeq.filter(_.state ==WorkerState.ALIVE))
valnumWorkersAlive= shuffledAliveWorkers.size
varcurPos=0for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers// We assign workers to each waiting driver in a round-robin fashion. For each driver, we// start from the last worker that was assigned a driver, and continue onwards until we have// explored all alive workers.varlaunched=falsevarnumWorkersVisited=0while (numWorkersVisited < numWorkersAlive &&!launched) {
valworker= shuffledAliveWorkers(curPos)
numWorkersVisited +=1if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched =true
}
curPos = (curPos +1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}
/** * Schedule and launch executors on workers*/privatedefstartExecutorsOnWorkers():Unit= {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app// in the queue, then the second app, etc.for (app <- waitingApps if app.coresLeft >0) {
valcoresPerExecutor:Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executorvalusableWorkers= workers.toArray.filter(_.state ==WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(_.coresFree).reverse
valassignedCores= scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)
// Now that we've decided how many cores to allocate on each worker, let's allocate themfor (pos <-0 until usableWorkers.length if assignedCores(pos) >0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
}
}
}
进行具体的当前Application在Worker上给executor分配几个cpu内核
/** * Schedule executors to be launched on the workers. * Returns an array containing number of cores assigned to each worker. * * There are two modes of launching executors. The first attempts to spread out an application's * executors on as many workers as possible, while the second does the opposite (i.e. launch them * on as few workers as possible). The former is usually better for data locality purposes and is * the default. * * The number of cores assigned to each executor is configurable. When this is explicitly set, * multiple executors from the same application may be launched on the same worker if the worker * has enough cores and memory. Otherwise, each executor grabs all the cores available on the * worker by default, in which case only one executor may be launched on each worker. * * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core * at a time). Consider the following example: cluster has 4 workers with 16 cores each. * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is * allocated at a time, 12 cores from each worker would be assigned to each executor. * Since 12 < 16, no executors would launch [SPARK-8881].*/privatedefscheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean):Array[Int] = {
valcoresPerExecutor= app.desc.coresPerExecutor
valminCoresPerExecutor= coresPerExecutor.getOrElse(1)
valoneExecutorPerWorker= coresPerExecutor.isEmpty
valmemoryPerExecutor= app.desc.memoryPerExecutorMB
valnumUsable= usableWorkers.length
valassignedCores=newArray[Int](numUsable) // Number of cores to give to each workervalassignedExecutors=newArray[Int](numUsable) // Number of new executors on each workervarcoresToAssign= math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
/** Return whether the specified worker can launch an executor for this app. */defcanLaunchExecutor(pos: Int):Boolean= {
valkeepScheduling= coresToAssign >= minCoresPerExecutor
valenoughCores= usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
// If we allow multiple executors per worker, then we can always launch new executors.// Otherwise, if there is already an executor on this worker, just give it more cores.vallaunchingNewExecutor=!oneExecutorPerWorker || assignedExecutors(pos) ==0if (launchingNewExecutor) {
valassignedMemory= assignedExecutors(pos) * memoryPerExecutor
valenoughMemory= usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
valunderLimit= assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && underLimit
} else {
// We're adding cores to an existing executor, so no need// to check memory and executor limits
keepScheduling && enoughCores
}
}
// Keep launching executors until no more workers can accommodate any// more executors, or if we have reached this application's limitsvarfreeWorkers= (0 until numUsable).filter(canLaunchExecutor)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>varkeepScheduling=truewhile (keepScheduling && canLaunchExecutor(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor
// If we are launching one executor per worker, then every iteration assigns 1 core// to the executor. Otherwise, every iteration assigns cores to a new executor.if (oneExecutorPerWorker) {
assignedExecutors(pos) =1
} else {
assignedExecutors(pos) +=1
}
// Spreading out an application means spreading out its executors across as// many workers as possible. If we are not spreading out, then we should keep// scheduling executors on this worker until we use all of its resources.// Otherwise, just move on to the next worker.if (spreadOutApps) {
keepScheduling =false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutor)
}
assignedCores
}
分配worker资源给executor
给worker发送启动executor消息: LaunchExecutor
给driver发送Executor已增加消息:ExecutorAdded
/** * Allocate a worker's resources to one or more executors. * @paramapp the info of the application which the executors belong to * @paramassignedCores number of cores on this worker for this application * @paramcoresPerExecutor number of cores per executor * @paramworker the worker info*/privatedefallocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo):Unit= {
// If the number of cores per executor is specified, we divide the cores assigned// to this worker evenly among the executors with no remainder.// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.valnumExecutors= coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
valcoresToAssign= coresPerExecutor.getOrElse(assignedCores)
for (i <-1 to numExecutors) {
valexec= app.addExecutor(worker, coresToAssign)
launchExecutor(worker, exec)
app.state =ApplicationState.RUNNING
}
}