Skip to content

Latest commit

 

History

History
353 lines (294 loc) · 15.4 KB

7.4-6-how-to-use-datax-to-load-csv-data-files.md

File metadata and controls

353 lines (294 loc) · 15.4 KB

4.6 如何使用 DataX 加载 CSV 数据文件到 OceanBase

DataX 简介

DataX 是阿里云 DataWorks 数据集成的开源版本,是阿里巴巴集团内被广泛使用的离线数据同步工具/平台。DataX 实现了包括 MySQL、Oracle、SQLserver、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、Hologres、DRDS 、OceanBase 等各种异构数据源之间高效的数据同步功能。

OceanBase 企业版客户可以向 OceanBase 的技术人员索取 DataX 内部版本(RPM 包)。OceanBase 社区版客户,可以在 DataX 开源网站 内下载源码,自行编译。编译时,注意在 pom.xml 中剔除掉不用的数据库插件,否则编译出来的包会非常大。

DataX 配置文件

DataX 以任务的形式迁移数据。每个任务只处理一个表,每个任务有一个 json 格式的配置文件,配置文件里会包含 readerwriter 两节。具体的 readerwriter 都是 DataX 支持的数据库插件,可以随意搭配使用。

最新版本的 DataX 还提供了一个 WEB 管理界面。

配置文件示例:

{
  "job": {
    "content": [
      {
        "reader": {
          "name": "streamreader",
          "parameter": {
            "sliceRecordCount": 10,
            "column": [
              {
                "type": "long",
                "value": "10"
              },
              {
                "type": "string",
                "value": "hello,你好,世界-DataX"
              }
            ]
          }
        },
        "writer": {
          "name": "streamwriter",
          "parameter": {
            "encoding": "UTF-8",
            "print": true
          }
        }
      }
    ],
    "setting": {
      "speed": {
        "channel": 2
       }
    }
  }
}

json 配置文件放到 DataX 的目录 job 下,或者自定义路径。执行方法如下:

$python bin/datax.py job/stream2stream.json

输出信息:

<.....>

2021-08-26 11:06:09.217 [job-0] INFO  JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO  StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes |  All Task WaitWriterTime 0.000s |  All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO  JobContainer -
任务启动时刻                    : 2021-08-26 11:05:59
任务结束时刻                    : 2021-08-26 11:06:09
任务总计耗时                    :                 10s
任务平均流量                    :               38B/s
记录写入速度                    :              2rec/s
读出记录总数                    :                  20
读写失败总数                    :                   0

DataX 任务执行结束会有个简单的任务报告,包含平均流量、写入速度和读写失败总数等。

DataX 的 job 的参数 settings 可以指定速度参数和错误记录容忍度等。

"setting": {
            "speed": {
                "channel": 10 
            },
            "errorLimit": {
                "record": 10,
                "percentage": 0.1
            }
        }

参数说明:

  • speed 限速 bytes 有 bug,不建议使用。errorLimit 表示报错记录数的容忍度,超出这个限制后任务就中断退出。

  • channel 是并发数,理论上并发越大,迁移性能越好。但实际操作中也要考虑源端的读压力、网络传输性能以及目标端写入性能。

下面介绍常用数据源(MySQL、Oracle 、CSV 和 OceanBase)的读写插件。

txtfilereader 插件

txtfilereader 插件提供了读取本地文件系统数据存储的能力。在底层实现上,txtfilereader 获取本地文件数据,并转换为 DataX 传输协议传递给 Writer。

本地文件内容存放的是一张逻辑意义上的二维表,例如 CSV 格式的文本信息。

txtfilereader 有一些功能限制和参数,请先参考官方说明:https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md

下面是 txtfilereader 的配置示例。

"reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                }

参数说明:

  • path 指定到路径即可。

  • fileName 是生成的文件前缀,完整的文件名很长,有随机字符串(避免重复)。

  • column 可以指定为 *,此时所有字段值均作为字符串。该操作虽然方便,但不能保证完全没有问题,目前测试常用数据类型是可以的。

  • nullFormat 指定空值的记录,默认是 null,这个读入 Oracle 时会出现问题。建议导出文件时指定为 "\\N",即空值和。

  • fieldDelimiter 指定 CSV 文件的列分隔符,这个和导出的时候指定的列分隔符保持一致。通常导出的列内容中如果含有列分隔符,会用双引号("")进行封装。用逗号(,)也可以,只是太过常见,建议用生僻一点的单字符,如 |^ 等。

mysqlreader 插件

mysqlreader 插件实现了从 MySQL 读取数据。在底层实现上,MysqlReader 通过 JDBC 连接远程 MySQL 数据库,并执行相应的 SQL 语句将数据从 MySQL 库中 SELECT 出来。

不同于其他关系型数据库,MysqlReader 不支持 FetchSize。

实现原理方面,简而言之,MysqlReader 通过 JDBC 连接器连接到远程的 MySQL 数据库,并根据用户配置的信息生成查询语句,然后发送到远程 MySQL 数据库,并将该 SQL 执行返回结果使用 DataX 自定义的数据类型拼装为抽象的数据集,并传递给下游 Writer 处理。

详细功能和参数说明请参考官方说明:https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md

下面是 mysqlreader 的配置示例。

"reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],                        
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                }

参数说明:

  • 如果表的主键是单列主键(比如 id),那么可以在 parameter 下加一个配置: "splitPk": "db_id",。如果加在最后,则需去掉后面的逗号(,)。

  • column 指定读取的列。通常建议写具体的列,可以在列上用函数做逻辑转换。使用 * 就是要时刻确保列跟目标端写入的列能对应上。

oceanbasev10writer 插件

oceanbasev10writer 插件实现了写入数据到 OceanBase 主库的目标表的功能。在底层实现上,OceanbaseV10Writer 通过 Java 客户端(底层 MySQL JDBC 或 OceanBase Client)连接以 obproxy 远程 OceanBase 数据库,并执行相应的 insert sql 语句将数据写入 OceanBase 数据库,内部会分批次提交入库。

实现原理

Oceanbasev10Writer 通过 DataX 框架获取 Reader 生成的协议数据,生成 insert 语句。对于 MySQL 租户,在主键或唯一键冲突时,可以选择 replace 模式更新表中的所有字段。对于 Oracle 租户,目前只有 insert 行为。 出于性能考虑,写入采用 batch 方式批量写,当行数累计到预定阈值时,才发起写入请求。

下面是 oceanbasev10writer 的配置示例。

"writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "batchSize": 512,
                        "writerThreadCount":10,
                        "memstoreThreshold": "0.9"
                    }
                }

参数说明:

仅供参考,以后可能会发生改变,详细情况请关注 DataX 开源地址:https://github.com/alibaba/DataX/tree/master/oceanbasev10writer

参数 是否必选 默认值 参数说明
jdbcUrl 目的数据库的连接信息,包含了 OceanBase 的集群、租户、obproxy 的地址和端口以及库名。
使用域名可能会报错,建议使用 ip
table 目的表的表名称。支持写入一个或者多个表。当配置为多张表时,必须确保所有表结构保持一致,表名中一般不含库名。
column 目的表需要写入数据的字段,字段之间用英文逗号分隔。例如:"column": ["id","name","age"]
column 配置项必须指定,不能留空。
preSql 写入数据到目的表之前,会先执行这里的标准语句。如果 SQL 中包含您需要操作的表名称,请使用 @table 表示,这样在实际执行 SQL 语句时,会对变量按照实际表名称进行替换。
比如:您的任务是要写入到目的端 100 个同构分表(表名称为 datax_00,datax01, ... datax_98,datax_99),并且你希望导入数据前,先对表中数据进行删除操作。那么您可以采用配置 "preSql":\["delete from @table"\]。这样在执行到每个表写入数据前,会先执行对应的 delete from 对应表名称。
说明
该配置只支持 delete 语句。
batchSize 1000 一次性批量提交的记录数大小,该值可以极大减少 DataX 与 OceanBase 的网络交互次数,并提升整体吞吐量。但是该值设置过大可能会造成 DataX 运行进程 OOM。
memstoreThreshold 0 OceanBase 租户的 MemStore 使用率,当达到这个阀值时暂停导入,等释放内存后继续导入,防止租户内存溢出。
username 访问 oceanbase1.0 的用户名,需要注意的是,此处不配置 OceanBase 的集群名和租户名。
password 访问 oceanbase1.0 的密码。
writerThreadCount 1 每个通道(channel)中写入使用的线程数。

MySQL 数据导出为 CSV 文件

将 MySQL 数据导出为 CSV 文件。

配置文件如下:

$cat job/bmsql_oorder_mysql2csv.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "username": "tpcc",
                        "password": "********",
                        "column": [
                            "*"
                        ],
                        "connection": [
                            {
                                "table": [
                                    "bmsql_oorder"
                                ],
                                "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
                            }
                        ]
                    }
                },
                "writer": {
                    "name": "txtfilewriter",
                    "parameter": {
                        "path": "/tmp/tpcc/bmsql_oorder",
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "writeMode": "truncate",
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fileFormat": "csv" ,
                        "fieldDelimiter": ","
                    }
                }
            }
        ]
    }
}

CSV 文件导入到 OceanBase

将源端导出的 CSV 文件复制到目标端的 DataX 服务器上,然后导入到目标端 OceanBase 数据库中。

配置文件如下:

$cat job/bmsql_oorder_csv2ob.json
{
    "job": {
        "setting": {
            "speed": {
                "channel": 4
            },
            "errorLimit": {
                "record": 0,
                "percentage": 0.1
            }
        },
        "content": [
            {
                "reader": {
                    "name": "txtfilereader",
                    "parameter": {
                        "path": ["/tmp/tpcc/bmsql_oorder"],
                        "fileName": "bmsql_oorder",
                        "encoding": "UTF-8",
                        "column": ["*"],
                        "dateFormat": "yyyy-MM-dd hh:mm:ss" ,
                        "nullFormat": "\\N" ,
                        "fieldDelimiter": ","
                    }
                },
                "writer": {
                    "name": "oceanbasev10writer",
                    "parameter": {
                        "obWriteMode": "insert",
                        "column": [
                            "*"
                        ],
                        "preSql": [
                            "truncate table bmsql_oorder"
                        ],
                        "connection": [
                            {
                                "jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
                                "table": [
                                    "bmsql_oorder"
                                ]
                            }
                        ],
                        "username": "tpcc",
                        "password":"********",
                        "writerThreadCount":10,
                        "batchSize": 1000,
                        "memstoreThreshold": "0.9"
                    }
                }
            }
        ]
    }
}