Skip to content

2.5_ElasticSearch Writer

elevenqq edited this page Sep 30, 2018 · 2 revisions

一、使用场景

ES-Writer插件主要用于将业务系统其他类型数据源的增量数据同步到ES,应用场景主要有两种:

1)由于订单的海量数据(Mysql)而使查询的速度降低到无法满足业务需要时(当然慢查询,也有可能拖垮整个数据库),我们将使用ElasticSearch作为海量数据的存储介质,作为二级系统来解决查询的性能问题。

2)由于HBase的索引局限性(比较单一,不好扩展),需要将HBase的数据同步到ElasticSearch,来解决该类问题。(建设中)

二、设计原理

目前,Hdfs-Writer插件只实现了将Mysql类型的增量数据同步到ES。自定义的RdbEventRecordHandler继承自com.ucar.datalink.worker.api.handle.AbstractHandler,使用了系统提供的默认流程(详见深入领域),并根据需要自行扩展了一些功能,将RDBMS类型的数据进行转换、加工等处理,最终调用REST_client写入ES相应的索引中。具体的数据写入流程如下图所示:

ES-Writer

  • 【Records分组】
    > 将Records按表名进行分组,不同分组并发写入,保证单表内的局部有序。
    > 若ES-Writer开启了批量写入,则将每张表的Records按照设定的batchSize进行拆分,分批进行数据转换与写入。
  • 【Records转换】
    > Hdfs-Writer暂时只支持insert和update类型的Records数据同步。并且,由于涉及到多表聚合操作,所以不管原始操作是insert还是update,Action都设置为Update。(内置实现自动创建index功能??)
    > 根据Records构建批量提交ES的动作描述信息。首先设置转化json串时,保留值为null的字段,使其能将值为null的字段传递到服务端。并且设置采用前缀处理EsUsePrefix = true,提前根据原始表名构造列名前缀,用于构建列内容。构造ES动作描述具体步骤如下
    1)填充Index和Type:在Transform阶段tableName已经转化成了"index.type"形式的别名,所以从映射表别名中获取相应的字符串填充ES的Index和Type。
    2)填充主键:若指定了joinColumn,即多表聚合的情况,主表和子表均用joinColumn的值填充主键;若未指定joinColumn,则认为主键就是id,用id的值填充主键。
    3)填充列内容:将RdbEventRecord数据转化为ES数据,其中,主键信息和joinColumn列必须要同步,insert类型的Record所有列全部同步,update类型的Record只同步发生变更的列。
    4)合并地理位置信息:取出映射中配置的地理位置合并信息,将经纬度的值格式化后放到一个对象中,并赋值给合并之后的列。
  • 【ES批量更新】
    > 批量提交:将转换好的数据批量发送到ES,并对ES批量操作返回结果进行校验和解析。
    > 处理结果:若出现写入失败或者更新失败,则进行重试,超过最大重试次数还有错则直接抛异常(默认MAX_TRIES 3)。其中,当出现更新失败且状态码为404时,将update转为index操作,以写入模式重试。

三、功能介绍

  • 【按表并发】
    > 一批次的Records支持按表分组并发写入。ES-Writer按表开启了多个线程,每个线程负责一张表的ES写入操作,单表内同步是有序的,保证了数据的一致性,同时提高写数据的吞吐量。
  • 【多表聚合】
    > 在同步Mysql数据过程中,可以实现多表聚合功能,即通过设置聚合列名称,将Mysql中多张有外键关系的表,在同步过程中进行聚合,到ES端,多张表的数据合并成一条。实现了将多个相关业务表的数据聚合到ES的同一个索引里,提高了查询性能。
  • 【字段合并】
    > 地理位置信息合并:为了方便查询和展示,将业务数据中的经纬度信息合并成ES中的一个字段,来表示各个场景的地理位置信息。
    > 扩展列:指的是源端并不存在该列,而是在特定业务场景下,凭空创造出的列,如A和B两个Column合并成一个新的Column。例如,自定义拦截器YccOrder2EsInterceptor,用于将源表的若干个字段整合成ES中的一个字段。
    注:当我们既用到了[白名单]又用到了[扩展列]时,需要在Record的元数据中,把扩展列的名字放进去,否则扩展列不在白名单中,将被忽略。
  • 【ddl拦截器】
    > ES-Writer在AbstractHandler内置的基础拦截器前面增加自定义拦截器DdlEventInterceptor,对ddl类型的Record进行sql解析,实现自动同步加字段功能。
    首先将sql中新增的字段名称和类型转换为ES中的字段名称和对应类型,并序列化为columnJson(格式实例:{"properties":{"t_dl_test_source|yy":{"type":"integer"}}}),然后通过映射信息获取要同步到的ES的index和type,以及ES的配置信息,最后调用EsClient的updateMappingIndex方法连接ES集群进行更新。
  • 异常重试
    ES的批量写入,利用返回结果,采用了异常重试机制,最大限度的保证了写入数据的一致性。

四、插件参数说明

在继承Writer插件通用参数基类(PluginWriterParameter,详见深入Task)的基础上,EsWriterParameter没有扩展自己的参数类,只是实现了插件参数类PluginParameter的基本方法。

五、Mapping参数说明

在Mapping通用配置参数基础上,ES-Writer插件还有自己的专用参数:聚合列名称和列前缀配置参数,用来实现多表合一,地理位置合并配置参数,用来将经度和纬度合并成一个字段。相关Mapping配置参数如下:

Mapping参数 参数描述 默认值 备注

targetMediaSourceId

目标端数据源的id

目标数据源,这里指要同步到的ES集群

targetMediaName

ES中的表别名

支持目标端有表别名,在ES-Writer插件中表示为其自定义的“index”和“type”,即表别名格式为:"index.type"

targetMediaNamespace

目标端数据源的namespace

目标端数据源schema,对ES来说默认为空

ColumnMappingMode

列映射模式

NONE

支持列名黑白名单与列别名:

NONE,//所有列均同步到目标端

INCLUDE,//只同步白名单中的列,可以设置列别名

EXCLUDE;//黑名单中的列不同步

writePriority

同步优先级

5

数值越小优先级越高

interceptorId

拦截器id

拦截器可以对Records进行特殊处理,满足少数特定功能的需求。例如YccOrder2EsInterceptor,用于将源表的若干个字段整合成ES中的一个字段

skipIds

主键黑名单

可以通过指定主键id来过滤源端的某些异常Records

valid

是否有效

同步映射有效时,才进行同步

joinColumn

聚合列名称 Mysql多表聚合成ES的一张表时,关联各表的列,注:Mysql主表和子表必须是一对一的关系

esUsePrefix

ES中的列前缀配置 true 当使用前缀时,ES中的每个列使用 “表名|” 做前缀

geoPositionConf

地理位置合并配置

将业务中的经纬度两个字段表示的地理位置信息合并为ES的一个字段,类型为List<GeoPositionMapping>,可以配置多个

注:地理位置合并配置信息举例如下所示,将"latColumnName"和"lonColumnName"合并成为ES中的"columnName"。如专车订单详情中的"预计上/下车经纬度"、"实际上/下车经纬度"等位置信息各为两个字段,合并到ES中的字段为"预计上/下车地理位置"、"实际上/下车地理位置"。

[
{
"columnName": "estimate_board",
"latColumnName": "estimate_board_lat",
"lonColumnName": "estimate_board_lon"
},
{
"columnName": "estimate_off",
"latColumnName": "estimate_off_lat",
"lonColumnName": "estimate_off_lon"
},
{
"columnName": "actual_board",
"latColumnName": "actual_board_lat",
"lonColumnName": "actual_board_lon"
},
{
"columnName": "actual_off",
"latColumnName": "actual_off_lat",
"lonColumnName": "actual_off_lon"
},
{
"columnName": "origin_estimate_off",
"latColumnName": "origin_estimate_off_lat",
"lonColumnName": "origin_estimate_off_lon"
}
]

六、版本说明

关联技术 稳定版本 待测版本
ElasticSearch 2.X > .2X
esClient REST
reader mysql

七、注意事项:

> 配置的ES的用户如果没有index、type、列等的操作权限,需要在目标端手动添加。

八、FAQ