Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Standalone模式下Master、WorKer启动流程 #12

Open
teeyog opened this issue Feb 2, 2018 · 0 comments
Open

Standalone模式下Master、WorKer启动流程 #12

teeyog opened this issue Feb 2, 2018 · 0 comments
Labels

Comments

@teeyog
Copy link
Owner

teeyog commented Feb 2, 2018

本文基于spark2.1进行解析

前言

Spark作为分布式的计算框架可支持多种运行模式:

  • 本地运行模式 (单机)
  • 本地伪集群运行模式(单机模拟集群)
  • Standalone Client模式(集群)
  • Standalone Cluster模式(集群)
  • YARN Client模式(集群)
  • YARN Cluster模式(集群)

而Standalone 作为spark自带cluster manager,需要启动Master和Worker守护进程,本文将从源码角度解析两者的启动流程。Master和Worker之间的通信使用的是基于netty的RPC,Spark的Rpc推荐看深入解析Spark中的RPC

Master 启动

启动Master是通过脚本start-master.sh启动的,里面实际调用的类是:

org.apache.spark.deploy.master.Master

看看其main方法:

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    // 创建RpcEnv,启动Rpc服务
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    //阻塞等待
    rpcEnv.awaitTermination()
  }

main方法先获取配置参数创建SparkConf,通过startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。下面细看startRpcEnvAndEndpoint方法:

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    // 创建RpcEnv
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    //通过rpcEnv 创建一个Endpoint
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }

首先创建了RpcEnv,RpcEnv是整个Spark RPC的核心所在,RPCEndpoint定义了处理消息的逻辑,被创建后就被RpcEnv所管理,整个生命周期顺序为onStart,receive,onStop,其中receive可以被同时调用,ThreadSafeRpcEndpoint中的receive是线程安全的,同一时刻只能被一个线程访问。

该方法中向rpcEnv 注册的Endpoint是Master(继承了ThreadSafeRpcEndpoint),Master的构造器中创建了保存各种信息的变量。

 ...
  //一个HashSet用于保存WorkerInfo
  val workers = new HashSet[WorkerInfo]
 //一个HashSet用于保存客户端(SparkSubmit)提交的任务
  val apps = new HashSet[ApplicationInfo]
 //等待调度的App
  val waitingApps = new ArrayBuffer[ApplicationInfo]
 //保存DriverInfo
  val drivers = new HashSet[DriverInfo]
 ...

由于Master是一个Endpoint并被RpcEnv管理,需要先执行生命周期的onStart方法:

override def onStart(): Unit = {
   ...
    checkForWorkerTimeOutTask = forwardMessageThread.scheduleAtFixedRate(new Runnable {
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(CheckForWorkerTimeOut)
      }
    }, 0, WORKER_TIMEOUT_MS, TimeUnit.MILLISECONDS)
   ...
  }

向线程池中加入了一个线程,每隔WORKER_TIMEOUT_MS(默认60秒)时间去检测是否有Worker超时,其实就是向自己发送了一个CheckForWorkerTimeOut事件,稍后再细讲。

Worker启动

多个节点上的Worker是通过脚本start-slaves.sh启动,底层调用的类是:

org.apache.spark.deploy.worker.Worker

看看其main方法:

def main(argStrings: Array[String]) {
    Utils.initDaemon(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir, conf = conf)
    rpcEnv.awaitTermination()
  }

和Master类似,也是先获取配置参数创建SparkConf,接着调用startRpcEnvAndEndpoint启动一个RPCEnv并创建一个Endpoint,调用awaitTermination来阻塞服务端监听请求并且处理。

 def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
    rpcEnv
  }

这里是通过new了一个Worker实例来作为Endpoint并注册到RpcEnv中,Worker的构造器中初始化了心跳超时时间为Master端的1/4及其他变量

Worker向Master注册

Worker需要根据生命周期执行onStart()方法:

override def onStart() {
   ...
    registerWithMaster()
   ...
  }

在onStart()方法中调用了registerWithMaster来向Master来注册自己:

private def registerWithMaster() {
    // onDisconnected may be triggered multiple times, so don't attempt registration
    // if there are outstanding registration attempts scheduled.
    registrationRetryTimer match {
      case None =>
        // 是否已注册
        registered = false
        // 尝试向所有Master注册自己
        registerMasterFutures = tryRegisterAllMasters()
        // 尝试连接次数
        connectionAttemptCount = 0
        // 网络或者Master故障的时候就需要重新注册自己
        // 注册重试次数超过阈值则直接退出
        registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate(
          new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReregisterWithMaster))
            }
          },
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
          TimeUnit.SECONDS))
      case Some(_) =>
        logInfo("Not spawning another attempt to register with the master, since there is an" +
          " attempt scheduled already.")
    }
  }

registrationRetryTimer第一次调用肯定为None,通过tryRegisterAllMasters向Master注册自己,后面还启动了一个线程在有限次数内去尝试重新注册(网络或者Master出现故障是需要重新注册)。这里先看tryRegisterAllMasters方法是如何向Master注册的:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
    masterRpcAddresses.map { masterAddress =>
      registerMasterThreadPool.submit(new Runnable {
        override def run(): Unit = {
          try {
            logInfo("Connecting to master " + masterAddress + "...")
            val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            registerWithMaster(masterEndpoint)
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        }
      })
    }
  }

这里调用了rpcEnv.setupEndpointRef,RpcEndpointRef 是 RpcEnv 中的 RpcEndpoint 的引用,是一个序列化的实体以便于通过网络传送或保存以供之后使用。一个 RpcEndpointRef 有一个地址和名字。可以调用 RpcEndpointRef 的 send 方法发送异步的单向的消息给对应的 RpcEndpoint 。

这里整段代码意思即是:遍历所有masterRpcAddresses,调用registerWithMaster方法,并传入master端的RpcEndpoint引用RpcEndpointRef ,继续看看registerWithMaster方法:

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

通过RpcEndpointRef 和Master建立通信向Master发送RegisterWorker消息,并带入workerid,host,Port,cores,内存等参数信息,并有成功或者失败的回调函数稍后讲解。

Master 接收Worker注册

在Master中通过receiveAndReply方法处理各种需要回应的事件(单向消息通过receive),对于Worker注册消息RegisterWorker处理逻辑:

case RegisterWorker(
        id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      // 当前Master处于STANDBY
      if (state == RecoveryState.STANDBY) {
        context.reply(MasterInStandby)
      // Worker已经注册过了
      } else if (idToWorker.contains(id)) {
        context.reply(RegisterWorkerFailed("Duplicate worker ID"))
      } else {
        // 根据Worker注册信息为Worker创建WorkerInfo
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          workerRef, workerWebUiUrl)
        if (registerWorker(worker)) {
          // 持久化记录Worker信息
          persistenceEngine.addWorker(worker)
          // 向Worker回复注册成功消息
          context.reply(RegisteredWorker(self, masterWebUiUrl))
          // 有了新的Worker,资源新增,为等待的app进行调度
          schedule()
        } else {
          val workerAddress = worker.endpoint.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          // 向Worker回复注册失败消息
          context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress))
        }
      }
  1. 若当前Master处于STANDBY状态,直接返回MasterInStandby消息
  2. 若Worker已经注册过了,直接返回RegisterWorkerFailed消息
  3. 根据Worker注册信息为Worker创建WorkerInfo,调用registerWorker方法进行注册:
  • 若注册成功则持久化这个Worker信息,并向Worker回复注册成功消息,另外,多了一个Worker意味着资源的增加会通过schedule()去调度等待调度的apps。
  • 若注册失败,则直接向Worker回复注册失败消息。

那是怎么判断是否注册成功呢?跟进registerWorker方法:

private def registerWorker(worker: WorkerInfo): Boolean = {
    // There may be one or more refs to dead workers on this same node (w/ different ID's),
    // remove them.
    workers.filter { w =>
      (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
    }.foreach { w =>
      workers -= w
    }
    // 获取新worker的workerAddress 
    val workerAddress = worker.endpoint.address
    if (addressToWorker.contains(workerAddress)) {
      // 根据workerAddress 获取以前注册的老Worker
      val oldWorker = addressToWorker(workerAddress)
      // 若为UNKNOWN则说明是Master 处于recovery,Worker处于恢复中
      if (oldWorker.state == WorkerState.UNKNOWN) {
        // 移除老Worker,接受新注册的Worker
        removeWorker(oldWorker)
      } else {
        logInfo("Attempted to re-register worker at same address: " + workerAddress)
        return false
      }
    }
    // 跟新变量
    workers += worker
    idToWorker(worker.id) = worker
    addressToWorker(workerAddress) = worker
    true
  }

遍历所有管理的Worker,若有与新注册的Worker相同的host,port且处于Dead(超时)状态的Worker则直接从workers中移除。若管理的addressToWorker已经存在新注册的Worker一样的workerAddress,则获取老Worker,若状态是UNKNOWN说明Master 处于recovery,Worker正处于恢复中,则将老Worker移除,将新Worker直接加入并成功返回,若老Worker是其他状态则说明已经重复注册了,返回失败。

Worker接收Master注册反馈消息

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
    masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
      workerId, host, port, self, cores, memory, workerWebUiUrl))
      .onComplete {
        // This is a very fast action so we can use "ThreadUtils.sameThread"
        case Success(msg) =>
          Utils.tryLogNonFatalError {
            handleRegisterResponse(msg)
          }
        case Failure(e) =>
          logError(s"Cannot register with master: ${masterEndpoint.address}", e)
          System.exit(1)
      }(ThreadUtils.sameThread)
  }

在Worker向Master注册的时候就是调用的这个registerWithMaster方法,后随有回调方法处理结果,通过handleRegisterResponse来处理各种类型的反馈消息:

private def handleRegisterResponse(msg: RegisterWorkerResponse): Unit = synchronized {
    msg match {
      // 成功注册
      case RegisteredWorker(masterRef, masterWebUiUrl) =>
        logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
        // 标记成功注册
        registered = true
        // 跟新映射,删除其他的registeration retry
        changeMaster(masterRef, masterWebUiUrl)
        // 向Master发送心跳
        forwordMessageScheduler.scheduleAtFixedRate(new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(SendHeartbeat)
          }
        }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
       ...
      // 注册失败,直接退出进程
      case RegisterWorkerFailed(message) =>
        if (!registered) {
          logError("Worker registration failed: " + message)
          System.exit(1)
        }
      // Master不是处于Active的Master,忽略
      case MasterInStandby =>
        // Ignore. Master not yet ready.
    }
  }
  1. 当注册Worker失败收到RegisterWorkerFailed消息,则退出。
  2. 当注册的Master处于Standby状态,直接忽略。
  3. 注册Worker成功返回RegisteredWorker消息时,先标记注册成功,然后通过changeMaster更改一些变量(如activeMasterUrl,master,connected等),并删除当前其他正在重试的注册。然后新建了一个task到线程池执行,该线程每隔HEARTBEAT_MILLIS时间向自己发送一个SendHeartbeat消息,在消息处理方法receive里面可看到消息处理方法,即向Master发送心跳:
 case SendHeartbeat =>
      if (connected) { sendToMaster(Heartbeat(workerId, self)) }

Master 接收心跳

case Heartbeat(workerId, worker) =>
      idToWorker.get(workerId) match {
        case Some(workerInfo) =>
          workerInfo.lastHeartbeat = System.currentTimeMillis()
        case None =>
          if (workers.map(_.id).contains(workerId)) {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " Asking it to re-register.")
            worker.send(ReconnectWorker(masterUrl))
          } else {
            logWarning(s"Got heartbeat from unregistered worker $workerId." +
              " This worker was never registered, so ignoring the heartbeat.")
          }
      }

master端获取对应的workerInfo,若有则跟新上次获取心跳时间lastHeartbeat,若没有则向Worker发送需要重新建立连接的消息。

Master 检测Worker心跳超时

另外,由上文可知在Master的生命周期onStart里专门启动了一个线程检查worker是否超时,看看Master是如何处理的:

case CheckForWorkerTimeOut =>
      timeOutDeadWorkers()

private def timeOutDeadWorkers() {
    // Copy the workers into an array so we don't modify the hashset while iterating through it
    val currentTime = System.currentTimeMillis()
    val toRemove = workers.filter(_.lastHeartbeat < currentTime - WORKER_TIMEOUT_MS).toArray
    for (worker <- toRemove) {
      if (worker.state != WorkerState.DEAD) {
        logWarning("Removing %s because we got no heartbeat in %d seconds".format(
          worker.id, WORKER_TIMEOUT_MS / 1000))
        removeWorker(worker)
      } else {
        if (worker.lastHeartbeat < currentTime - ((REAPER_ITERATIONS + 1) * WORKER_TIMEOUT_MS)) {
          workers -= worker // we've seen this DEAD worker in the UI, etc. for long enough; cull it
        }
      }
    }
  }

遍历所有管理的Worker,若上次心跳时间离现在已经超过超时时间则判断为超时,将从worker列表里移除。

@teeyog teeyog added spark and removed spark labels Apr 13, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant