Skip to content

1.3_深入Task

elevenqq edited this page Sep 30, 2018 · 6 revisions

Task概述

DataLink真正执行数据同步任务的是每一个具体的Task,由Task从某一个固定类型的数据源读取数据,并同步到若干个目标端数据源,即为一对多的关系。我们将源端数据源类型规定为Task的类型,系统目前支持的Task类型有:MYSQL, FLEXIBLEQ, HBASE,支持同步到的目标端数据源类型有:Rdbms、ElasticSearch、Hdfs、HBase、FlexibleQ、SDDL。

Task生命周期

Task

上图展示了Task进行一次数据同步的流程。

  • 【类拥有关系】
    > WorkerTaskContainer为Task的执行入口类,负责管理WorkerTaskReader和WorkerCombinedTaskWriter的生命周期;WorkerCombinedTaskWriter负责管理多个WorkerTaskWriter;WorkerTaskReader和WorkerTaskWriter分别管理TaskReader和TaskWriter的生命周期。
  • 【Task同步流程】
    > 同步流程从WorkerTaskReader发起,由TaskReader从某个类型的数据源拉取数据,或者收到数据源推送的数据,然后将数据put到队列中,进行callback等待;WorkerCombinedTaskWriter负责从队列take数据,再分别存储到与它所管理的各个WorkerTaskWriter所共享的队列中,进行callback等待;然后,每个WorkerTaskWriter从队列中take数据,由各个TaskWriter根据数据的Record类型加载对应的Handler,等所有Handler处理完数据后,不论同步成功或失败都进行callback通知,结束所有队列的等待;最后,TaskReader进行判断,如果成功则执行commit,然后发起下一轮同步,如果失败则执行rollback,然后重试。
    【注】在Task的Reader和Writer为一对多的情况下,除了第一个Writer用Reader直接传过来的RecordChunk,其他Writer要copy一份RecordChunk来用,而copyRecordChunk使用反射实现,比较耗费系统资源,所以,如果是数据量较大且性能要求较高的同步场景,不建议使用一对多。
  • 【Task生命周期】
    > TargetState
    Task的目标状态,包括STARTED和PAUSED两种,用于在在Manager端控制Task的运行状态。
    > TaskStatus

    Task的实际运行状态类,包括UNASSIGNED、PREPARING、RUNNING、PAUSED、FAILED五种状态以及executionId、generation、startTime、workerId等运行属性,会随Task的启动注册到ZK,用于监控Task的当前运行状态信息。
    例,id=10的Task在ZK上的Status节点路径为“/datalink/tasks/10/status”,节点内容如下图所示:

    zk-status
    > Task生命周期
    Task的实际运行状态默认值设置为UNASSIGNED。Task启动时,首先进入Prepare阶段,即在ZK添加Task的status节点,并将运行状态更新为PREPARING。Prepare成功之后,判断Task的目标状态,若目标状态为PAUSED,则status节点的运行状态更新为PAUSED,同时Task线程阻塞;若目标状态为STARTED,则status节点的运行状态更新为RUNNING,同时Task正常运行。如果Task在运行过程中出现异常,且Task未被cancelled,则status节点的运行状态更新为FAILED,同时将异常堆栈发送到ZK,阻塞等待直到触发stopTask。stopTask完成之后,便会remove掉Task的status节点。在上述过程中,由TaskStatusListener负责监听并通知系统对ZK的status节点做相应的状态变更

  • 【Task状态监控报警】
    在Task生命周期中,对其运行状态的监控与报警有如下几种情况:
    > 在Task启动之前的Prepare阶段,在添加status节点时,若发现ZK上已存在该节点,说明该Task正在其他地方运行,则抛出TaskConflictException异常,并每秒进行一次重试,如果重试时间已经超过了ZK的会话超时时间,则触发报警,并继续重试,直到Task重复运行的情况消失。
    > 除了Task运行冲突异常监控报警,TaskStatusMonitor还对Task的实际运行状态进行监控,当出现以下三种情况之一时,触发报警:
    (1)Task的目标状态运行状态不一致(正常情况下,目标状态STARTED对应运行状态RUNNING,目标状态PAUSED对应运行状态PAUSED);
    (2)Task的status节点消失,运行状态变为UNASSIGNED;
    (3)Task出现异常,运行状态变为FAILED。
    并且,如果Task的目标状态为STARTED而运行状态为FAILED时,会重启Task,即结束线程,重新启动

TaskContext&TaskSession

  • 【TaskContext】
    > 每个Task都会绑定一个TaskContext,通过context可以实现Task和Runtime之间的交互,可以供TaskReader和TaskWriter随时获取Task的Id、ExecutionId、Service、Global-Session、Global-Attributes与开启Session功能。TaskContext的生命周期和Task的运行生命周期保持一致,当Task启动时TaskContext随之创建,当Task关闭时TaskContext也随之销毁。TaskAttributes和TaskContext的生命周期是一致的,主要用来支持Task整个生命周期内的数据共享。而TaskSession是会话级的数据共享机制,完成一次RecordChunk数据同步的过程定义为一次会话,TaskSession的生命周期即为一次数据同步。
  • 【TaskSession】
    > 如上图所示,TaskSession主要用来支持一次数据同步过程中的数据共享,即TaskReader和TaskWriter之间可以通过TaskSession共享数据,而TaskReaderSession和TaskWriterSession则分别供TaskReader和TaskWriter用于其内部共享数据。一次会话的起始和结束均为TaskReader,所以,在此触发TaskSession的begin操作(首先进行reset)。
  • 【TaskSession应用示例】
    > MySQL-Reader插件dump获取到的原始数据:存入TaskReaderSession中的key为MESSAGE_KEY,value为本次同步fetch到的原始数据,并根据需要进行dump。
    > Reader端和Writer端各自的数据同步性能统计:存入TaskReaderSession中的key为ReaderStatistic,TaskWriterSession中的key为WriterStatistic,value分别为各自的统计指标,通过在程序中按需存取,计算到本次同步的Record数量、延迟时间、load时间、TPS等,为Task的性能监控提供基础数据。
    > 映射拦截器OrderEntRecordInterceptor:该拦截器的作用是拦截非企业订单,只同步企业订单。存入TaskWriterSession中的key为PREFIX+id,value为企业订单id的值,当同步订单子表数据时,只有子表的订单id在session中存在,才进行同步。这样利用TaskSession,并依赖主表子表关系,可以实现企业订单的快速过滤,提高了同步效率。

Task消费位点管理机制

  • 【TaskPositionManager】
    PositionManager是Datalink提供的一个公用的数据同步位点管理组件,负责查询、更新消费位点。如果TaskReader有自己的位点管理机制,用自己的机制即可。TaskPositionManager是PositionManager的默认实现,启动和停止跟随Worker主线程,其主要功能如下:
    > 查询消费位点主要用于实时监控Task的同步情况(当前消费位点);
    > 更新消费位点有定时更新和实时更新两种。正常同步过程中采用定时更新,首先将每个Task的位点变更信息存入内存,然后每隔1秒将内存中Task的最新位点信息持久化到ZK节点(多次变更只刷一次),表示当前已经消费到的位置。在重启Task时可以重置消费位点,因此采用实时更新,使其启动之后能够从正确的位点开始消费。
    > 例,id=10的Task在ZK上的Position节点路径为“/datalink/tasks/10/position”,节点内容如下图所示:

    zk-position

  • 【位点管理机制应用示例】
    > Task的消费位点变更发生在一次同步成功之后。Reader端进行commit时,会判断是否自带commit机制,若有,则Reader插件自己进行commit和刷新位点;若没有,则使用DataLink公用的位点管理机制刷新ZK的位点信息。一般情况下,具体的Reader插件不需要实现commit功能,由DataLink框架自动记录records消费的偏移量;若需要在自己系统内部存储偏移量,则可以选择实现commit功能。
    > DataLink的MySQL-Reader插件选择使用canal自带的位点更新机制,同时自定义CanalTaskMetaManager,重写了canal自带的MetaManager,将canal的消费位点纳入统一管理。
    > DataLink的HBase-Reader插件和Fq-Reader插件均使用自己的位点管理机制。

Task配置信息管理机制

  • 【TaskConfigManager】
    > TaskConfigManager是Task配置信息管理器,负责发现Task配置变更并进行事件通知,并以组为单位进行监控管理,不属于本组的变更不予关注。

    > TaskConfigManager随Worker线程启动后,首先进行强制refresh,初始化本组的taskConfigList和version,获取最新配置。然后开始每隔500ms循环刷新Task配置信息,若version发生变化,则通过TaskConfigUpdateListener通知系统做相应的处理:若有Task新增或删除,则触发分组进行ReBalance;若有Task参数配置更新(包括Task参数、Reader参数或者Writer参数的变化),则重启Task;若有Task目标状态更新,则更新Task的目标状态,并对mustRestartWhenPause == true类型的Task进行重启(具体原因请见Task常用参数)。

LeaderTask机制

MYSQL类型数据同步,DataLink的Task和Reader端的数据源是一对一的关系,一个同步任务对应一个Task的配置信息。但是分布式集群的同步,例如HBASE,DataLink的Task模拟是HBASE的从集群,与Reader端的HBASE集群是多对一的关系,若为每个Task进行一次配置,同步将过于繁琐。因此针对集群对集群、需要多个Task的数据同步模型,引入LeaderTask机制,简化Task配置。

  • 【LeaderTask机制应用示例】
    > 每个要同步的HBASE集群对应Reader端一个Repl-ZNode-Parent,同一ZNode下的所有Task模拟的是一个HBASE从集群,每个Task模拟的是一个RegionServer,其中创建的第一个Task是LeaderTask。
    > 所有Task的相关配置,包括同步映射和Task监控等,只需要在LeaderTask上配置即可,其它FollowerTask会复用LeaderTask的配置信息。

Task常用参数

  • 【Task基本参数】

    Task基本参数是DataLink所有类型Task的通用参数。

    Task基本参数 参数描述 默认值 备注

    id

    Task在数据库中的唯一标识


    groupId

    Task所属分组的id

    每个Task必须且只能属于一个分组

    TaskType

    Task的类型

    按照Reader端类型划分为三种:MYSQL, FLEXIBLEQ, HBASE;
    mustRestartWhenPause mustRestartWhenPause==true时,
    若Task的目标状态被置为PAUSED,则必须重启Task
    HBASE TaskType中的一个公用方法属性

    TargetState

    Task的目标状态

    STARTED,PAUSED;

    TaskStatus

    Task的当前状态信息 UNASSIGNED,PREPARING,RUNNING,PAUSED,FAILED;
    当前状态与运行信息均在ZK上注册,显示Task的当前运行状况
    readerMediaSourceId Task所关联的Reader的数据源id

    taskParameter

    Task的参数 目前只有taskId

    taskReaderParameter

    Task的Reader插件的基本参数 PluginReaderParameter类型,

    不同的Reader插件可以定义各自的扩展参数

    taskWriterParameter

    Task的Writer插件的基本参数

    List<PluginWriterParameter>类型,

    不同的Writer插件可以定义各自的扩展参数

    isLeaderTask

    Task是否为LeaderTask 适用于集群类型的Task

    leaderTaskId

    是FollowerTask时,其LeaderTask的id 适用于集群类型的Task
    version Task配置的版本 即整个分组配置的版本

    【注】HBASE类型的Task被设为mustRestartWhenPause==true的原因:

    > Task被置为PAUSE之后,其ZK节点还存在,因此HBase-Master会继续往Task推送Log,但是Task收到Log之后会阻塞住,HBase-Master等待超时,继续推,再等待超时。从实际测试结果来看,此Task会导致所有Task的同步出现严重延迟。所以必须重启改Task,WorkerTaskReader检测到PAUSE状态,会阻塞在HBase-Reader启动之前,这样Task便不会在ZK注册,避免HBase-Master往Task推送Log,同时降低系统资源的占用。


  • 【插件基本参数】

    PluginParameter是DataLink所有插件的通用参数,PluginReaderParameter和PluginWriterParameter均继承了PluginParameter。

    PluginParameter参数 参数描述 默认值 备注

    pluginName

    插件的名字 插件自定义

    pluginClass

    插件的实现类 插件实现类的路径

    pluginListenerClass

    插件的Listener实现类 插件Listener实现类的路径

    supportedSourceTypes

    插件支持的数据源类型 不同的插件支持不同的数据源类型 可以为多个,例如Rdbms-Writer支持的类型有
    MYSQL、SQLSERVER、POSTGRESQL、SDDL

    perfStatistic

    是否开启性能统计

    false

    用于监控Reader端和Writer端的数据同步性能


  • 【Reader插件基本参数】

    PluginReaderParameter是DataLink所有Reader插件的通用参数,具体类型的Reader插件可以自定义参数,但是必须继承PluginReaderParameter。

    PluginReaderParameter参数 参数描述 默认值 备注

    mediaSourceId

    Reader关联的数据源的id

    冗余存储

    dump

    是否需要dump fetch到的数据

    false

    用于确认Reader端获取的Binlog数据,有助于排查问题

    ddlSync

    是否同步ddl操作

    true

    主要针对关系型数据库


  • 【Writer插件基本参数】

    PluginWriterParameter是DataLink所有Writer插件的通用参数,具体类型的Writer插件可以自定义参数,但是必须继承PluginWriterParameter。

    PluginWriterParameter参数 参数描述 默认值 备注

    poolSize

    Writer的线程池大小

    5

    根据MediaMapping的配置情况设置一个合理的值

    dryRun

    dryRun==true时不会进行实际写入操作

    false


    useBatch

    是否开启批量写入

    true

    不同的Writer对批量写入会有不同的定义

    batchSize

    批量写入时每个批次的大小

    50


    merging

    是否可以对数据进行合并

    false


    maxRetryTimes

    最大重试次数

    3


    retryMode

    重试模式

    Always

    当Task写入出现异常时,有四种处理方式:

    Always,//一直重试
    TimesOutDiscard,//超过重试次数后丢弃数据
    TimesOutError,//超过重试次数后抛异常,终止任务
    NoAndError;//不重试,直接抛异常,终止Task

    syncAutoAddColumn

    目标端缺少列时,是否自动加列

    true

    用于源端加字段时,在目标端自动补全相应字段

    【注】异常处理方式默认为一直重试,直到恢复成功为止,否则一直抛异常和报警。在一直重试的情况下,多数异常可以自动恢复正常,如mysql到mysql的同步目标表缺字段,或者目标库因自身原因导致连接失败等问题,一些特殊异常需要人工介入处理,比如暂不支持的异构数据源之间的ddl同步