Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

【有奖征文】Linkis 新引擎实现分享 #11

Open
wForget opened this issue Nov 12, 2020 · 0 comments
Open

【有奖征文】Linkis 新引擎实现分享 #11

wForget opened this issue Nov 12, 2020 · 0 comments

Comments

@wForget
Copy link

wForget commented Nov 12, 2020

Linkis 新引擎实现分享

在社区大佬的帮助下,我们完成了 0.11 版本的开发,实现了 ElasticSearch 和 Presto 引擎。具体的开发文档可以参考: Linkis引擎开发文档

执行引擎架构的选择

目前 Linkis 的架构可以分为两种,一种是 Entrance-EngineManger-Engine 的模式,一种是 Entrance 模式。统一执行服务的架构可以参考官方文档: Linkis-UJES设计文档

Entrance 服务作为执行的入口,主要负责任务的持久化工作,日志的输出,进行脚本的校验和变量替换,并与 Engine、EngineManager 服务交互,向可用的 Engine 发送执行任务的请求,或者向 EngineManager 发送启动 Engine 的请求。

EngineManager 服务主要负责 Engine 的启动,进行 Engine 请求资源的请求与释放,并持续监控 Engine 的状态。

Engine 服务负责任务的具体执行,包括了任务执行的一些初始化操作、任务脚本的切分、任务的执行、任务的进度监控和结果集的保存等工作。

Spark、Hive 引擎是 Entrance-EngineManger-Engine 模式实现,在这个模式中 Engine 作为 Spark 、Hive 任务的 Driver 端,向外暴露接口可持续的接受 Entrance 发来的请求,完成任务的执行。这个模式中不仅实现了多租户的任务隔离,还提供了单用户的引擎复用,尽量减少 Engine 的启动,大大提高了执行的效率。

上面的各个服务可以看到每个服务的职责非常的明确,不过多个服务也让整个的架构变的比较重,有一些轻量的执行没有必要通过 Entrance-EngineManger-Engine 模式进行实现。例如 Linkis JDBC 引擎的实现就是通过 Entrance 的模式。JDBC 引擎的职责就是作为 JDBC 连接的客户端向服务端发送请求,并进行连接的维护。JDBC 连接的维护是比较轻量级的,而且 JDBC 连接的复用也不是根据平台用户进行区分的,所以单独为每个用户启动一个引擎是没有必要的。

ElasticSearch 和 Presto 的客户端实际上就是 Http Client,所以 ElasticSearch 和 Presto 引擎的实现也应该是比较轻量的,最终我们实现的 ElasticSearch 和 Presto 引擎也是通过 Entrance 的模式实现的。

引擎资源控制

Linkis 的资源管理服务,用来管理用户、系统的资源和并发的控制,实现新的引擎需要考虑到引擎资源相关接口的实现。具体架构可参考:Linkis RM设计文档

Entrance-EngineManger-Engine 模式资源控制

Entrance-EngineManger-Engine 的模式,资源相关主要需要下面两个实现:

  1. EngineManger 注册资源
    Linkis RM设计文档中可以看到,EngineManger 作为 Engine 资源的管理者,需要先向 ResourceManger 进行管理资源的注册。
    Linkis 已经将 EngineManger 注册资源的逻辑进行了抽象,实现的时候只需要在 SpringConfiguration 中进行配置创建 resources 的 spring bean 对象,可以参考 SparkEngineManagerSpringConfiguration 的实现。
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineManagerSpringConfiguration

@Configuration
class SparkEngineManagerSpringConfiguration {

  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    val totalResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_MAX_MEMORY_AVAILABLE.getValue.toLong,
        ENGINE_MANAGER_MAX_CORES_AVAILABLE.getValue, ENGINE_MANAGER_MAX_CREATE_INSTANCES.getValue),
      null
    )

    val protectedResource = new DriverAndYarnResource(
      new LoadInstanceResource(ENGINE_MANAGER_PROTECTED_MEMORY.getValue.toLong, ENGINE_MANAGER_PROTECTED_CORES.getValue,
        ENGINE_MANAGER_PROTECTED_INSTANCES.getValue),
      null
    )

    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectedResource, ResourceRequestPolicy.DriverAndYarn)
  }
  // ...
}
  1. EngineResourceFactory 实现
    EngineManager 创建 Engine 的时候需要先向 ResourceManger 去请求资源,所以新引擎需要提供 EngineResourceFactory 的实现,用来初始化创新 Engine 所需要的资源,再向 ResourceManger 进行请求。
    Linkis 中提供了 AbstractEngineResourceFactory 的抽象,实现的时候只需要从 AbstractEngineResourceFactory 继承。具体可参考 SparkEngineResourceFactory 的实现:
// com.webank.wedatasphere.linkis.enginemanager.configuration.SparkEngineResourceFactory

@Component("engineResourceFactory")
class SparkEngineResourceFactory extends AbstractEngineResourceFactory {

  override protected def getRequestResource(properties: java.util.Map[String, String]): DriverAndYarnResource = {
    val executorNum = DWC_SPARK_EXECUTOR_INSTANCES.getValue(properties)
    new DriverAndYarnResource(
      new LoadInstanceResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_DRIVER_MEMORY.getValue(properties) + "G"),
        DWC_SPARK_DRIVER_CORES,
        1),
      new YarnResource(ByteTimeUtils.byteStringAsBytes(DWC_SPARK_EXECUTOR_MEMORY.getValue(properties) * executorNum + "G"),
        DWC_SPARK_EXECUTOR_CORES.getValue(properties) * executorNum,
        0,
        DWC_QUEUE_NAME.getValue(properties))
    )
  }
}

Entrance 模式并发控制

Lnkis 中将 Engine 的实例数作为资源的一种,目前用户请求的并发是通过 Engine 的实例数进行控制的,那么在 Entrance 的模式下,就没有很好的对用户的并发进行控制。

在 ElasticSearch 和 Presto 的实现中,我们参考了 EngineManager 的资源控制,将并发数作为资源的一种,在 Entrance 启动时进行模块资源注册。将每个执行作为一个实例,执行发生时先进行资源的请求和锁定,执行完成后进行资源的释放,从而达到用户并发的控制。

主要包括了以下步骤:

  1. Entrance 注册并发资源
    Entrance 注册并发资源,需要创建资源实例,将并发作为资源的一部分,然后配合 @EnableResourceManager 和 @RegisterResource 注解进行资源注册。
  // 定义资源
  @Bean(Array("resources"))
  def createResource(): ModuleInfo = {
    // 创建并发资源实例,分为总资源和受保护的资源
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }
  
  // 注册资源
  @RegisterResource
  def registerResources(): ModuleInfo = resources
  1. 执行前请求锁定资源
    执行实例初始化前,先通过 ResourceManagerClient#requestResource 方法请求锁定并发实例资源。
rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
  case NotEnoughResource(reason) =>
    // 没有请求到资源,抛出异常
    throw EsEngineException(LogUtils.generateWarn(reason))
  case AvailableResource(ticketId) => {
    // 请求到资源,创建执行实例,并保存 ticketId 用于释放资源
    // ...
    // 当资源被实例化后,返回实际占用的资源总量
    rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
  }
}
  1. 执行完成释放资源
    执行完成后销毁执行实例,并通过 ResourceManagerClient#resourceReleased 方法释放锁定的资源。
// 使用 ticketId 释放对应的资源
rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))

ElasticSearch 引擎的实现

下面是微众王和平大佬帮忙画的 ElasticSearch 引擎整体的架构图:

ElasticSearch引擎架构图

Linkis 新引擎的实现还是比较容易的,ElasticSearch 引擎的代码结构如下,整体的代码量也是比较少。主要包括了资源的配置、执行器的实例化和ElasticSearch请求与结果解析的相关代码。

Es引擎代码结构

  1. 资源注册
    ElasticSearch 引擎需要考虑到用户请求的并发和 Entrance 整体并发的控制。
    Entrance 启动时,需要对 Entrance 可用资源进行注册,主要包括了最大实例数和保护的阈值。在 EsSpringConfiguration 中生成资源的 bean 对象,并传入 EsEngineManager 进行注册,配置 @EnableResourceManager 和 @RegisterResource 就会自动进行注册。
// com.webank.wedatasphere.linkis.entrance.conf.EsSpringConfiguration
class EsSpringConfiguration extends Logging{

  @Bean(Array("resources"))
  def createResource(@Autowired rmClient: ResourceManagerClient): ModuleInfo = {
    // Clean up resources before creating resources to prevent dirty data when exiting abnormally (创造资源之前进行资源清理,防止异常退出时产生了脏数据)
    Utils.tryQuietly(rmClient.unregister())
    Utils.addShutdownHook({
      info("rmClient shutdown, unregister resource...")
      rmClient.unregister
    })
    val totalResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_MAX_JOB_INSTANCE.getValue)
    val protectResource = new InstanceResource(EsEntranceConfiguration.ENTRANCE_PROTECTED_JOB_INSTANCE.getValue)
    info(s"create resource for es engine totalResource is $totalResource, protectResource is $protectResource")
    ModuleInfo(Sender.getThisServiceInstance, totalResource, protectResource, ResourceRequestPolicy.Instance)
  }

}

// com.webank.wedatasphere.linkis.entrance.execute.EsEngineManager
@EnableResourceManager
class EsEngineManager(resources: ModuleInfo) extends EngineManager with Logging {

  @RegisterResource
  def registerResources(): ModuleInfo = resources

}
  1. 请求执行器
    EsEngineRequester 启动一个执行器,用于任务的执行,通过 request 方法对传入的 job 生成一个执行的 EsEntranceEngine,请求时先向 ResourceManager 请求并锁定一个实例的资源,在 EsEntranceEngine 执行结束后会进行释放。
// com.webank.wedatasphere.linkis.entrance.execute.EsEngineRequester
class EsEngineRequester(groupFactory: GroupFactory, rmClient: ResourceManagerClient) extends EngineRequester {
  override def request(job: Job): Option[EntranceEngine] = job match {
    case entranceJob: EntranceJob => {
      val requestEngine = createRequestEngine(job);
      // request resource manager
      rmClient.requestResource(requestEngine.user, requestEngine.creator, new InstanceResource(1)) match {
        case NotEnoughResource(reason) =>
          throw EsEngineException(LogUtils.generateWarn(reason))
        case AvailableResource(ticketId) => {
          val engine = new EsEntranceEngine(idGenerator.incrementAndGet(), new util.HashMap[String, String](requestEngine.properties)
            , () => {rmClient.resourceReleased(UserResultResource(ticketId, requestEngine.user))})
          engine.setGroup(groupFactory.getOrCreateGroup(getGroupName(requestEngine.creator, requestEngine.user)))
          engine.setUser(requestEngine.user)
          engine.setCreator(requestEngine.creator)
//          engine.updateState(ExecutorState.Starting, ExecutorState.Idle, null, null)
          engine.setJob(entranceJob)
          engine.init()
          executorListener.foreach(_.onExecutorCreated(engine))
          rmClient.resourceInited(UserResultResource(ticketId, requestEngine.user), new InstanceResource(1))
          Option(engine)
        }
      }
    }
    case _ => None
  }
}
// com.webank.wedatasphere.linkis.entrance.execute.EsEntranceEngine
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  override def close(): Unit = {
    try {
      this.job.setResultSize(0)
      this.engineExecutor.close
      // 释放资源
      resourceRelease()
      // ......
}
  1. 任务执行
    EsEntranceEngine 是 com.webank.wedatasphere.linkis.entrance.execute.EntranceEngine 的实现,进行脚本的执行。在这里抽出一层 EsEngineExecutor 作为 Es 任务的具体执行。EsEntranceEngine 则负责 EsEngineExecutor 的初始化、脚本解析切分等实现。
class EsEntranceEngine(id: Long, properties: JMap[String, String], resourceRelease: () => Unit) extends EntranceEngine(id) with SingleTaskOperateSupport with SingleTaskInfoSupport {
  private var engineExecutor: EsEngineExecutor = _
  // ...
  override def execute(executeRequest: ExecuteRequest): ExecuteResponse   // ...
  protected def executeLine(code: String): ExecuteResponse = this.engineExecutor.executeLine(code, storePath, s"_$codeLine")
  
}
  1. ElasticSearch 脚本执行
    entrance.executor 包中就是 ElasticSearch 客户端的封装、请求的封装和结果的解析等相关代码。
    ElasticSearch 客户端封装在 EsClient 中,通过 EsClientFactory 进行实例化,并将 datasourceName 作为唯一 Key 进行缓存。
    EsEngineExecutorImpl 是 EsEngineExecutor 的实现,用于任务的执行。
    ResponseHandlerImpl 用于结果的处理,会根据 ElasticSearch 的返回类型进行反序列化,并保存为 Linkis 的 ResultSet。

DataSource 路由

在与微众大佬的讨论交流中得知后面 Linkis 的架构将会引入 DataSource 的概念,DataSource 模块维护引擎的连接信息和集群等信息,可以减少一些数据源运行配置,方便数据源配置和权限管理,为数据平台提供元数据信息,并可根据 DataSource 进行路由实现多集群的路由。

在 Linkis-0.11.0版本中添加了 linkis-gateway-ujes-datasource-ruler 模块,作为一个 Gateway 插件的形式简单实现了,请求和 Entrance 的路由。

linkis-gateway-ujes-datasource-ruler 模块的实现

抽象出 EntranceGatewayRouterRuler 接口用于执行路由规则,在 Gateway 模块的 EntranceGatewayRouter 中注入 EntranceGatewayRouterRuler 实例。

@Component
class EntranceGatewayRouter extends AbstractGatewayRouter {

  @Autowired(required = false)
  private var rules: Array[EntranceGatewayRouterRuler] = _
  
  override def route(gatewayContext: GatewayContext): ServiceInstance = {
    gatewayContext.getGatewayRoute.getRequestURI match {
      case EntranceGatewayRouter.ENTRANCE_REGEX(_) =>
        // ...
        serviceId.map(applicationName => {
          rules match {
            case array: Array[EntranceGatewayRouterRuler] => array.foreach(_.rule(applicationName, gatewayContext))
            case _ =>
          }
          ServiceInstance(applicationName, gatewayContext.getGatewayRoute.getServiceInstance.getInstance)
        }).orNull
      case _ => null
    }
  }
  
}

linkis-gateway-ujes-datasource-ruler 模块,主要是做了一个 DataSource 和 Entrance Instance 的映射,并保存在 Mysql 中。DatasourceGatewayRouterRuler 实现了具体的路由策略,DatasourceMapService 接口维护 DataSource 映射。

// 维护 DataSource 映射的接口
public interface DatasourceMapService {

    String getInstanceByDatasource(String datasourceName);

    long countByInstance(String instance);

    String insertDatasourceMap(String datasourceName, String instance, String serviceId);

}

// EntranceGatewayRouterRuler 的实现类,执行具体的路由逻辑
class DatasourceGatewayRouterRuler extends EntranceGatewayRouterRuler with Logging {

  // 路由的方法
  override def rule(serviceId: String, gatewayContext: GatewayContext): Unit = if(StringUtils.isNotBlank(gatewayContext.getRequest.getRequestBody)) {
    // 从请求中获取 datasourceName
    val datasourceName = getDatasourceName(gatewayContext.getRequest.getRequestBody)
    if (StringUtils.isBlank(datasourceName)) return
    debug(s"datasourceName: $datasourceName")
    // 通过 datasourceName 获取映射
    datasourceMapService.getInstanceByDatasource(datasourceName) match {
      case i: String if StringUtils.isNotBlank(i) => 
        // 存在映射直接返回 Instance
        gatewayContext.getGatewayRoute.getServiceInstance.setInstance(i)
      case _ => {
        // 不存在映射时,先获取 Instance 列表,并根据已经存在映射的数据按照从小到大排序,获取最少映射的 Instance,插入 DataSource 映射并返回
        val newInstance = ServiceInstanceUtils.getRPCServerLoader.getServiceInstances(serviceId)
          .map(item => (item, datasourceMapService.countByInstance(item.getInstance)))
          .sortBy(_._2).map(_._1.getInstance).headOption match {
            case Some(item) => datasourceMapService.insertDatasourceMap(datasourceName, item, serviceId)
            case None => null
          }
        debug(s"newInstance: $newInstance")
        if (StringUtils.isNotBlank(newInstance)) {
          gatewayContext.getGatewayRoute.getServiceInstance.setInstance(newInstance)
        }
      }
    }
  }
  
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant