Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

请教个问题:这个connector可以在生产环境用吗? 主要是关于Debezium的快照,FlinkDatabaseHistory是否对机器的内存有大小要求? #47

Closed
jindyliu opened this issue Oct 19, 2020 · 10 comments

Comments

@jindyliu
Copy link

假如在生产环境中,mysql中的单表比较大,比如10亿条,并且在不断增长。
FlinkDatabaseHistory中使用了ConcurrentLinkedQueue records来保存快照数据,
当快照数据较大时,是不是有跑不起来的风险或对机器有特殊要求?

`public class FlinkDatabaseHistory extends AbstractDatabaseHistory {

public static final String DATABASE_HISTORY_INSTANCE_NAME = "database.history.instance.name";

/**
 * We will synchronize the records into Flink's state during snapshot.
 * We have to use a global variable to communicate with Flink's source function,
 * because Debezium will construct the instance of {@link DatabaseHistory} itself.
 * Maybe we can improve this in the future.
 *
 * <p>NOTE: we just use Flink's state as a durable persistent storage as a replacement of
 * {@link FileDatabaseHistory} and {@link KafkaDatabaseHistory}. It doesn't need to guarantee
 * the exactly-once semantic for the history records. The history records shouldn't be super
 * large, because we only monitor the schema changes for one single table.
 *
 * @see com.alibaba.ververica.cdc.debezium.DebeziumSourceFunction#snapshotState(FunctionSnapshotContext)
 */
public static final Map<String, ConcurrentLinkedQueue<HistoryRecord>> ALL_RECORDS = new HashMap<>();

private ConcurrentLinkedQueue<HistoryRecord> records;
private String instanceName;

   ………………

}
`

@jindyliu
Copy link
Author

@wuchong @PatrickRen 麻烦解答下?谢谢~

@wuchong
Copy link
Member

wuchong commented Oct 19, 2020

FlinkDatabaseHistory 存储的是 schema 变更,不是 records 数据,不会那么大。 不过这块确实可以想办法优化一下,换成非内存模式。

@jindyliu
Copy link
Author

FlinkDatabaseHistory 存储的是 schema 变更,不是 records 数据,不会那么大。 不过这块确实可以想办法优化一下,换成非内存模式。
@wuchong 谢谢jark,对Debezium还不是很了解,我再去补补课。

感觉这个connector很神奇,感觉比接kafka方便很多,我试用了下这个cdc-connector,实验环境里mysql两个表join打成一个宽表,看了下实验效果确实是会把历史数据与实时变更的数据都能打到宽表里。但看代码,在原理上有点没想通,为啥历史数据也会被join到,难道是我的实验环境里binlog是有全部的历史行数据和历史ddl变更导致的?

请问下:
1、这个connector在功能上就是支持历史存量数据join操作,不依赖binlog里存在全部的历史行数据变更吗(因为生产环境里binlog里不存储所有历史数据变更比较常见)?

2、如果能做,想问下,他们全量的历史数据(启动的时候,mysql表里的所有数据快照,select * from table)是怎么保存的或者说全量的历史数据怎么能输入到这个cdc conector里的,并能参与到后续的flink sql join操作里的。

问这个两个的主要原因是我的场景里,锁拿不到,业务表数据量也比较大。
1、这个cdc connector需要一个全局锁,目前在生产环境中应用的时候,锁控制的很严格,拿不到全局读锁或表锁。
2、历史全量表会比较大,并且还在不停的增长,如果fail over的时候,存量历史数据是从哪里恢复过来的,从而还能保证正确的join结果输出?

所以想着能不能把存量数据与实时变更数据结合起来,想看看能不能改一下?(我们的场景可以不需要精确一次的语义的)
再次麻烦jark了~

@wuchong
Copy link
Member

wuchong commented Oct 19, 2020

  1. 因为这个 connector 会先扫描全表读历史数据,然后再切换到对应的binlog 位点读增量的更新数据。
  2. source 不用保存啊,直接下发给下游就行拉。
  3. 拿不到全局读锁是因为 DBA 不肯给么?
  4. 如果再读历史全量的过程中 fail 了,那么恢复的时候会重新读历史全量。也就是说读历史全量过程中不会做 checkpoint。

@jindyliu
Copy link
Author

  1. 因为这个 connector 会先扫描全表读历史数据,然后再切换到对应的binlog 位点读增量的更新数据。
  2. source 不用保存啊,直接下发给下游就行拉。
  3. 拿不到全局读锁是因为 DBA 不肯给么?
  4. 如果再读历史全量的过程中 fail 了,那么恢复的时候会重新读历史全量。也就是说读历史全量过程中不会做 checkpoint。

1、锁权限(表锁或全局锁)dba有一套规范,不肯给。我感觉我们的场景下,表的schema基本不会做变更(频率很低,并且只要做变更都要做通知),应该也是可以直接跳过去获取锁??
2、结合你的讲解,带着疑问去看代码,理解又更深了一点。看这个 cdc connector的source是核心是通过debezium connector的engine来获取历史数据与binlog增量数据的,source是先emitRecords历史全量数据,再发emitRecords增量数据发送至下游。全量的过程,结合flink打的日志,DebeziumEngine 是有一个debezium/connector/mysql/SnapshotReader.java 获取锁,做表的扫描,每10000行还会输出下时间消耗。

这里还有一个关于内存不懂的地方,当数据往事下游流的时候,基本上,就是过flink的各种算子或者是写的sql转成的算子了进行运算了。像我们这种几个表做join的打成宽表的场景,例如

// 输入表test
CREATE TABLE test ( idINT,nameVARCHAR(255),timeTIMESTAMP(3),status` INT,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_ask',
'table-name' = 'test'
);

// 输入表status
CREATE TABLE status (
id INT,
name VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = '1',
'database-name' = 'ai_task',
'table-name' = 'status'
);

// 输出宽表test_status
CREATE TABLE test_status (
id INT,
name VARCHAR(255),
time TIMESTAMP(3),
status INT,
status_name VARCHAR(255)
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3306/ai_task',
'connector.table' = 'test_status',
'connector.username' = 'root',
'connector.password' = '1',
'connector.write.flush.max-rows' = '1'
);

// 计算出宽表
INSERT into test_status
SELECT t.*, s.name
FROM test AS t
LEFT JOIN status AS s ON t.status = s.id;
`
),

想问下jack, 数据往flink计算引擎流入的过程中,会做join打成宽表的逻辑,这里做join就会依赖的数据,比如status 一个id的name变化了,test_status相应的会有关联N条数据变化,这个join过程关联的N条test表数据是会怎么在flink计算引擎里保存的?
量大的时候会不会有内存不足的问题?不理解flink cdc sql语句和这些数据的保存之间会有什么关系,数据保存是依赖于flink 的statebackend吗?

@wuchong 望帮忙解答下,有没有哪方面的资源可以加深下理解的~

@wuchong
Copy link
Member

wuchong commented Oct 20, 2020

  1. 如果表 schema 不做变更,那可以尝试跳过拿锁阶段。
  2. join 算子会用 state 来保存输入流的数据,生产上一般会用 rocksdb statebackend,所以不会有内存 OOM 的问题。 flink 提供的 cdc connector 其实只是去接了 binlog 的数据,和 state 没有什么关系,source 也只是记一个位点,不会存数据。就是是普通的数据流 join 也是会有 state 的。

@jindyliu
Copy link
Author

  1. 如果表 schema 不做变更,那可以尝试跳过拿锁阶段。
  2. join 算子会用 state 来保存输入流的数据,生产上一般会用 rocksdb statebackend,所以不会有内存 OOM 的问题。 flink 提供的 cdc connector 其实只是去接了 binlog 的数据,和 state 没有什么关系,source 也只是记一个位点,不会存数据。就是是普通的数据流 join 也是会有 state 的。

好的,结合你的讲解,再看看代码,感觉理解更深入了一步。整理下,拿上面的宽表场景,flink cdc join操作来看,串起来后,应该是这样?
1、cdc connector 先将存量数据发完(scan存量数据期间持有checkpoint操作需要的锁,所以无法做checkpoint动作,完后再释放锁),开始发送增量数据时后,才开始cdc connector开始checkpoint binlog的分区与offsize。当在发送增量数据阶段,job失败重启,cdc connector从checkpoint点重启,只消费增量数据,不会再scan全量数据了;join算子从state checkpoint中拿到join的中间表结果,继续恢复计算。join算子里的state大小会跟表的数据量级相关,但用了rocksdb,数据都在硬盘上,没OOM问题。

2、若cdc connector 没有scan完,scan部分存量数据就失败了,因为没有checkpoint,所以任务重启的时候会重新scan数据。
3、锁问题可以通过配置配debezium参数跳过。 debezium 有个开关,MySqlConnectorConfig.SnapshotLockingMode.NONE
if (!snapshotLockingMode.equals(MySqlConnectorConfig.SnapshotLockingMode.NONE) && useGlobalLock) { //Snapshot 锁获取 …… }
@wuchong jark,麻烦看看正确不?

另外目前试了下,cdc connector 在scan存量数据时好像效率有点低,4百万的存量数据,做这个宽表计算, 然后再存sql,实测要1个多小时。感觉有点慢,这里好像无解,看cdc connector 依赖的debezium单线程在读全表??
jark有啥优化思路或用法上的建议提升效率不?

@wuchong
Copy link
Member

wuchong commented Oct 20, 2020

  1. 正确。
  2. 正确。
  3. 是的。你可以在 DDL 直接通过 'debezium.snapshot.mode' = 'never' 参数就可以控制,见文档
  4. 这个应该是你下游效率太低,导致 source 被反压了,你可以看看下游为什么这么慢? 并发低?磁盘性能不行?

@jindyliu
Copy link
Author

  1. 正确。
  2. 正确。
  3. 是的。你可以在 DDL 直接通过 'debezium.snapshot.mode' = 'never' 参数就可以控制,见文档
  4. 这个应该是你下游效率太低,导致 source 被反压了,你可以看看下游为什么这么慢? 并发低?磁盘性能不行?

关于4, 用blackhole验证试了下,确实快了很多。在这个测试场景里,应该是下游慢反压了。
CREATE TABLE test_status (
id INT,
name VARCHAR(255),
time TIMESTAMP(3),
status INT,
status_name VARCHAR(255),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'blackhole'
);

但总感觉这里的全量scan操作要是能并行就好了,就像jdbc connector一样,能依据某个column设置并行扫描('scan.partition.column' = '%s)会更快些~~
要是能并行描,可以加速下job上线时间。

@wuchong
Copy link
Member

wuchong commented Oct 21, 2020

这里主要是要保证 snapshot 和 binlog offset 的一致性,所以只能单并发运行 :(

@wuchong wuchong closed this as completed Oct 23, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants