Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink cdc postgresql 设置 'debezium.heartbeat.interval.ms' 报错 #97

Closed
William-Kaiser opened this issue Feb 5, 2021 · 7 comments
Closed
Labels
bug Something isn't working fixed The issue has been fixed

Comments

@William-Kaiser
Copy link

William-Kaiser commented Feb 5, 2021

Flink 1.11.0
postgresql12
设置 'debezium.heartbeat.interval.ms' = '30000' 报错

Caused by: java.lang.NullPointerException
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractBeforeRow(RowDataDebeziumDeserializeSchema.java:130) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:111) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]

@eric3zhao
Copy link
Contributor

已知bug #92 ,今天我提交了一个PR #107 可以参考一下

@wuchong
Copy link
Member

wuchong commented Feb 24, 2021

@William-Kaiser PG 的 REPLICA IDENTITY 设置成 FULL 了吗?
看下这个文档:https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-replica-identity

@eric3zhao
Copy link
Contributor

@William-Kaiser PG 的 REPLICA IDENTITY 设置成 FULL 了吗?
看下这个文档:https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-replica-identity

这个问题的原因是当开启了heartbeat以后,当满足心跳间隔时debezium会创建一个只包含offset但是没有具体record的消息,格式大概是这样:
SourceRecord{sourcePartition={server=postgres_binlog_source}, sourceOffset={transaction_id=null, lsn=343798710272, txId=1358232, ts_usec=1613800003904000}} ConnectRecord{topic='__debezium-heartbeat.postgres_binlog_source', kafkaPartition=0, key=Struct{serverName=postgres_binlog_source}, keySchema=Schema{io.debezium.connector.common.ServerNameKey:STRUCT}, value=Struct{ts_ms=1613800003905}, valueSchema=Schema{io.debezium.connector.common.Heartbeat:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
这条数据本身是没有意义的只是为了触发offset的提交,所以我在修复的方案中添加了filter过滤这些不包含实际数据的record

public interface ValueValidator extends Serializable {
	default boolean filter(SourceRecord sourceRecord){
		return false;
	}
}

public final class PostgresValueValidator implements RowDataDebeziumDeserializeSchema.ValueValidator {
	@Override
	public boolean filter(SourceRecord sourceRecord) {
		String topic = sourceRecord.topic();
		if (StringUtils.isBlank(topic)) {
			return false;
		}
		if (topic.startsWith(heartbeatPrefix)) {
			return true;
		}
		return false;
	}
}

@William-Kaiser
Copy link
Author

@William-Kaiser PG 的 REPLICA IDENTITY 设置成 FULL 了吗?
看下这个文档:https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-replica-identity

这个问题的原因是当开启了heartbeat以后,当满足心跳间隔时debezium会创建一个只包含offset但是没有具体record的消息,格式大概是这样:
SourceRecord{sourcePartition={server=postgres_binlog_source}, sourceOffset={transaction_id=null, lsn=343798710272, txId=1358232, ts_usec=1613800003904000}} ConnectRecord{topic='__debezium-heartbeat.postgres_binlog_source', kafkaPartition=0, key=Struct{serverName=postgres_binlog_source}, keySchema=Schema{io.debezium.connector.common.ServerNameKey:STRUCT}, value=Struct{ts_ms=1613800003905}, valueSchema=Schema{io.debezium.connector.common.Heartbeat:STRUCT}, timestamp=null, headers=ConnectHeaders(headers=)}
这条数据本身是没有意义的只是为了触发offset的提交,所以我在修复的方案中添加了filter过滤这些不包含实际数据的record

public interface ValueValidator extends Serializable {
	default boolean filter(SourceRecord sourceRecord){
		return false;
	}
}

public final class PostgresValueValidator implements RowDataDebeziumDeserializeSchema.ValueValidator {
	@Override
	public boolean filter(SourceRecord sourceRecord) {
		String topic = sourceRecord.topic();
		if (StringUtils.isBlank(topic)) {
			return false;
		}
		if (topic.startsWith(heartbeatPrefix)) {
			return true;
		}
		return false;
	}
}

您好,我现在是用flink sql 创建两者表,然后join之后 通过cdc 获取变更数据,插入到es里,
类似这样的设置
CREATE TABLE dbd_xxx (
id BIGINT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'xxx',
'port' = '5432',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxxx',
'schema-name' = 'xxx',
'table-name' = 'xxxx',
'decoding.plugin.name' = 'wal2json',
'debezium.slot.name' = 'xxxx'
);
运行之后,我在tm的日志里看到WARN:

2021-02-25 15:27:13,636 WARN io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation.

所以我添加设置 'debezium.heartbeat.interval.ms' = '3000',但是会错空指针错误:

Caused by: java.lang.NullPointerException
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractBeforeRow(RowDataDebeziumDeserializeSchema.java:130) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:111) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]

我设置PG 的 REPLICA IDENTITY为FULL,还是同样的问题,
我看您是通过代码来过滤数据的,能通过在flink sql 里设置的吗?谢谢

@William-Kaiser
Copy link
Author

@William-Kaiser PG 的 REPLICA IDENTITY 设置成 FULL 了吗?
看下这个文档:https://debezium.io/documentation/reference/1.2/connectors/postgresql.html#postgresql-replica-identity

您好,我现在是用flink sql 创建两者表,然后join之后 通过cdc 获取变更数据,插入到es里,
类似这样的设置
CREATE TABLE dbd_xxx (
id BIGINT,
name STRING
) WITH (
'connector' = 'postgres-cdc',
'hostname' = 'xxx',
'port' = '5432',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'xxxx',
'schema-name' = 'xxx',
'table-name' = 'xxxx',
'decoding.plugin.name' = 'wal2json',
'debezium.slot.name' = 'xxxx'
);
运行之后,我在tm的日志里看到WARN:

2021-02-25 15:27:13,636 WARN io.debezium.connector.postgresql.PostgresStreamingChangeEventSource [] - Received 10001 events which were all filtered out, so no offset could be committed. This prevents the replication slot from acknowledging the processed WAL offsets, causing a growing backlog of non-removeable WAL segments on the database server. Consider to either adjust your filter configuration or enable heartbeat events (via the heartbeat.interval.ms option) to avoid this situation.

所以我添加设置 'debezium.heartbeat.interval.ms' = '3000',但是会错空指针错误:

Caused by: java.lang.NullPointerException
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.extractBeforeRow(RowDataDebeziumDeserializeSchema.java:130) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.table.RowDataDebeziumDeserializeSchema.deserialize(RowDataDebeziumDeserializeSchema.java:111) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at com.alibaba.ververica.cdc.debezium.internal.DebeziumChangeConsumer.handleBatch(DebeziumChangeConsumer.java:97) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder.lambda$notifying$2(ConvertingEngineBuilder.java:81) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:812) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:170) ~[flink-sql-connector-postgres-cdc-1.1.0.jar:1.1.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_202]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_202]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_202]

我设置PG 的 REPLICA IDENTITY为FULL,还是同样的问题,
我看您是通过代码来过滤数据的,能通过在flink sql 里设置的吗?谢谢

@eric3zhao
Copy link
Contributor

1.2版本已经修复了你下载1.2的依赖包再运行试试

@leonardBang leonardBang added bug Something isn't working fixed The issue has been fixed labels Aug 20, 2021
@leonardBang
Copy link
Contributor

leonardBang commented Aug 20, 2021

please try cdc version which is bigger than 1.2

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working fixed The issue has been fixed
Projects
None yet
Development

No branches or pull requests

4 participants