Skip to content

Latest commit

 

History

History
145 lines (137 loc) · 5.78 KB

任务提交流程.md

File metadata and controls

145 lines (137 loc) · 5.78 KB

Task提交流程

在划分Stage之后,在对Task进行封装成为TaskSet然后提交给TaskScheduler。

提交流程源码解析

提交TaskSet

查看TaskSchedulerImpl的160行,可以看到submitTasks()方法,主要代码如下:


//TODO 该方法提交TaskSet
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
      }
      hasReceivedTask = true
    }
    //TODO  CoarseGrainedSchedulerBackend类的reviveOffers()
    backend.reviveOffers()

这里主要的的方法是CoarseGrainedSchedulerBackend类的reviveOffers()。

CoarseGrainedSchedulerBackend的reviveOffers()


这里主要是向向DriverActor发送消息
//TODO 向DriverActor发送消息
  override def reviveOffers() {
    driverActor ! ReviveOffers
  }

CoarseGrainedSchedulerBackend中DriverActor的receiveWithLogging()

DriverActor类中的receiveWithLogging()进行模式匹配


//TODO 进行模式匹配,调用makeOffers()向Executor提交Task
case ReviveOffers =>
     makeOffers()

makeOffers()方法向Executor提交Task

Executor运行Task

makeOffers()方法的主要代码如下:


//TODO 调用launchTasks向Executor提交Task
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

这里调用launchTasks(),代码主要的流程是:


 //TODO
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      //TODO Task是一个一个发送给Executor的
      for (task <- tasks.flatten) {
        //TODO 首先拿到序列化器
        val ser = SparkEnv.get.closureSerializer.newInstance()
        //TODO 将Task序列化用来进行网络传输
        val serializedTask = ser.serialize(task)
        //TODO  进行大小判断
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSet.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //TODO 这是一个HashMap
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //TODO 向Executor发送序列化好的Task
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

这里做的工作主要是迭代TaskSet然后一个一个的取出Task进行序列化之后向Executor发送序列化好的Task

Executor执行Task

CoarseGrainedExecutorBackend的模式匹配,主要是DriverActor发送数据给Executor的信息


 //TODO Driver 发送数据给Executor的信息
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        //TODO 拿到序列化器
        val ser = env.closureSerializer.newInstance()
        //TODO 将Task反序列化
        val taskDesc = ser.deserialize[TaskDescription](data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        //TODO 将反序列化的Task放入线程池执行
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

这里做的工作是拿到序列化器,将Task反序列化,将反序列化的Task放入线程池执行

下面是Executor的launchTask()方法,主要的逻辑是将创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行


//TODO Executor执行Task
  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    //TODO 创建一个TaskRunner对象将Task的信息封装信息
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    //TODO runningTasks是一个ConcurrentHashMap保证线程安全
    runningTasks.put(taskId, tr)
    //TODO 使用线程池执行
    threadPool.execute(tr)
  }

总结

1.提交Task主要是迭代TaskSet一个一个的取出Task进行序列化之后向Executor发送序列化好的Task

2.Executor执行Task,创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行