-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Bug] Flink CDC 2.3.0 set startupOptions = specificOffset set specificOffsetFile and specificOffsetPos then can not start from checkpoint #1944
Comments
除了引入mysql-cdc包还需要引入哪些包, 我的环境跟你一样, 但是我的却说少kafka的类, org.apache.kafka.connect.data.Struct, 你的lib包还有什么jar包? |
The bug is caused by the unreasonable GTIDs saved in the new checkpoint after starting from the specificOffset. In this issue, the starting point of the checkpoint is at About the cause of this problem, We can try to analyze the handling process of GTIDs for CDC: 1.Obtain the available GTIDs, i.e., show master status.
2.Obtain the checkpoint GTIDs.
3.Obtain the purged GTIDs, i.e., @@global.gtid_purged.
4.Obtain the GTIDs to replicate, i.e., the difference between available GTIDs and checkpoint GTIDs.
5.Obtain the non-purged GTIDs to replicate, i.e., the difference between GTIDs to replicate and purged GTIDs.
6.Finally, compare the GTIDs to replicate with the non-purged GTIDs to replicate. If they are not the same, it is considered that some of the GTIDs that need to be synchronized have been cleaned up. In this issue, they are obviously not equal because the GTIDs recorded in the checkpoint are problematic. Only
This is not what we expected. In this scenario, the range of Gtids that the user expects to be synchronized should only include:
Hi @PatrickRen, could you pay attention to this issue? If you don't have time, I am willing to submit a PR to fix it. |
@wallkop Thanks a lot for the analysis. I find the earliest offset exists this problem too. |
@wallkop Please be free to fix this issue. Assign to you. Thanks ~ |
Got it, thanks. |
@ruanhang1993 hi, I have submitted a PR and would appreciate it if you could review it when you have time. PR: #2220 |
Search before asking
Flink version
1.16.0
Flink CDC version
2.3.0
Database and its version
mysql5.7
Minimal reproduce step
if i restart job from checkpoint can not work
2023-02-21 14:08:29,713 INFO com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - MySQL current GTID set 106a4bb6-ec0d-11ec-a2d4-00163e279211:1-204182899,7aec1281-719c-11eb-afcf-00163e06a35c:1-147359662 does contain the GTID set 106a4bb6-ec0d-11ec-a2d4-00163e279211:203495054-204182173 required by the connector.
2023-02-21 14:08:29,801 INFO com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Server has already purged 106a4bb6-ec0d-11ec-a2d4-00163e279211:1-203495053,7aec1281-719c-11eb-afcf-00163e06a35c:1-147359662 GTIDs
2023-02-21 14:08:29,802 WARN com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext [] - Some of the GTIDs needed to replicate have been already purged
2023-02-21 14:08:29,803 ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager [] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150) ~[flink-connector-files-1.16.1.jar:1.16.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) [flink-connector-files-1.16.1.jar:1.16.1]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_362]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_362]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_362]
at java.lang.Thread.run(Thread.java:750) [?:1.8.0_362]
Caused by: java.lang.IllegalStateException: The connector is trying to read binlog starting at Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1676959709803,db=,server_id=0,file=mysql-bin.005893,pos=15052069,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.loadStartingOffsetState(StatefulTaskContext.java:194) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext.configure(StatefulTaskContext.java:117) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:103) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.submitSplit(BinlogSplitReader.java:71) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:159) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:71) ~[blob_p-384c1d850f997d73acba83fc72dc8cfb8bead162-07d02e9eebcf14d5793c784b1d89c6cc:?]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-files-1.16.1.jar:1.16.1]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-files-1.16.1.jar:1.16.1]
... 6 more
gtids in checkpoint:"file":"mysql-bin.005893","pos":"15052069","kind":"SPECIFIC","gtids":"106a4bb6-ec0d-11ec-a2d4-00163e279211:203495054-204182173"
show master status gtids :106a4bb6-ec0d-11ec-a2d4-00163e279211:1-204479617,
7aec1281-719c-11eb-afcf-00163e06a35c:1-147359662
show binary logs:
What did you expect to see?
if first set specificOffset then can work from ck
What did you see instead?
if first set specificOffset then can not work from ck
Anything else?
No response
Are you willing to submit a PR?
The text was updated successfully, but these errors were encountered: