Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flink CDC监控mysql表,表更新报错Encountered change event for table test.student whose schema isn't known to this connector #50

Closed
yangxusun9 opened this issue Nov 2, 2020 · 7 comments

Comments

@yangxusun9
Copy link

我尝试使用flink cdc的官方案例,代码如下
`
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Properties;

public class MySqlBinlogSourceExample {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("database.history.kafka.bootstrap.servers", "kafka1:9092");
properties.setProperty("database.history.kafka.topic", "demo1");
properties.setProperty("snapshot.new.tables", "parallel");
properties.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter");
properties.setProperty("decimal.handling.mode", "string");
// properties.setProperty("inconsistent.schema.handling.mode","warn");// 不报错,但是不显示binlog
// properties.setProperty("snapshot.mode","schema_only_recovery");

    SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
            .debeziumProperties(properties)
            .hostname("localhost")
            .port(3306)

// .databaseList("Test")
.tableList("Test.student")// monitor all tables under inventory database
.username("root")
.password("123")
.deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String
.build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
            .addSource(sourceFunction)
            .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
}

}
`
依赖如下:

org.apache.flink flink-clients_${scala.binary.version} ${flink.version} org.apache.flink flink-java ${flink.version} org.apache.flink flink-streaming-java_${scala.binary.version} ${flink.version} com.alibaba.ververica flink-connector-mysql-cdc 1.1.0

其中 flink 版本为1.11.1,
刚开始运行的时候日志打印为
image
是能读到数据的,但是我一旦在mysql表中执行更新操作,插入了一条数据,就会抛出异常
Caused by: org.apache.kafka.connect.errors.ConnectException: Encountered change event for table test.student whose schema isn't known to this connector at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230) at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:207) at io.debezium.connector.mysql.BinlogReader.handleEvent(BinlogReader.java:600) at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1130) at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:978) at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581) at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860) at java.lang.Thread.run(Thread.java:748)
PS:Mysql server version 5.7.31,
据我了解,debezium是依赖与Kafka的,是否需要正确配置本地Kafka的connect才能执行成功?

@shizhengchao
Copy link
Contributor

shizhengchao commented Nov 3, 2020

properties.setProperty("database.history.kafka.bootstrap.servers", "kafka1:9092");
properties.setProperty("database.history.kafka.topic", "demo1");
properties.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter");
上面这些属性可以不设置,但似乎没什么影响,我按照 你的代码,没有重现你的问题,我这边是正常的。你可以按照下面的方法定位下

  1. 首先确保下mysql是否设置了如下配置:
server-id         = 123
log_bin           = mysql-bin 
binlog_format     = ROW 
binlog_row_image  = FULL 
expire_logs_days  = 10 
  1. 如果mysql设置正确,再确认user是否用足够的权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

@yangxusun9
Copy link
Author

properties.setProperty("database.history.kafka.bootstrap.servers", "kafka1:9092");
properties.setProperty("database.history.kafka.topic", "demo1");
properties.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
properties.setProperty("value.converter", "org.apache.kafka.connect.json.JsonConverter");
上面这些属性可以不设置,但似乎没什么影响,我按照 你的代码,没有重现你的问题,我这边是正常的。你可以按照下面的方法定位下

  1. 首先确保下mysql是否设置了如下配置:
server-id         = 123
log_bin           = mysql-bin 
binlog_format     = ROW 
binlog_row_image  = FULL 
expire_logs_days  = 10 
  1. 如果mysql设置正确,再确认user是否用足够的权限
GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'user' IDENTIFIED BY 'password';

@yangxusun9 yangxusun9 reopened this Nov 3, 2020
@yangxusun9
Copy link
Author

是Mysql DatabaseName大小写的问题,我猜想应该是debezium 会将 我传入的 ‘Test’ 转化为 ‘test’,然后去找对应的schema,所以会抛这样的异常

@shizhengchao
Copy link
Contributor

是Mysql DatabaseName大小写的问题,我猜想应该是debezium 会将 我传入的 ‘Test’ 转化为 ‘test’,然后去找对应的schema,所以会抛这样的异常

正解,是debezium获取到mysql里的所有数据库的时候,会和你传入的 dbname比较,比较的时候传入的dbname被转成小写了,所以tableid找不到。

@lykpass
Copy link

lykpass commented May 15, 2021

我的库名是小写,但是也会出现这种问题
image

@zZxiaowang
Copy link

我的库名是小写,但是也会出现这种问题 image

我试了试 。CDC的earliest模式在这块是不支持下划线的

@lcchaoliu
Copy link

mysql CDC的earliest模式不支持表名带下划线的. 这个问题官方什么时候回修复

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants