Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] Unsupported table metadata field type 216 when parsing binlog event #2192

Closed
1 of 2 tasks
leonardBang opened this issue Jun 8, 2023 · 7 comments
Closed
1 of 2 tasks
Assignees
Labels
bug Something isn't working

Comments

@leonardBang
Copy link
Contributor

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.15.2

Flink CDC version

2.3.0

Database and its version

mysql version = 8.0.18

Minimal reproduce step

This error occurs when parsing the binlog.

What did you expect to see?

Support this kind of even type.

What did you see instead?

Caused by: io.debezium.DebeziumException: Failed to deserialize data of EventHeaderV4{timestamp=1679640836000, eventType=TABLE_MAP, serverId=1704062425, headerLength=19, dataLength=89, nextPosition=359034816, flags=0}
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.wrap(MySqlStreamingChangeEventSource.java:1154)
... 5 more
Caused by: com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException: Failed to deserialize data of EventHeaderV4{timestamp=1679640836000, eventType=TABLE_MAP, serverId=1704062425, headerLength=19, dataLength=89, nextPosition=359034816, flags=0}
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:309)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeTableMapEventData(EventDeserializer.java:281)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.nextEvent(EventDeserializer.java:228)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource$1.nextEvent(MySqlStreamingChangeEventSource.java:230)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:952)
... 3 more
Caused by: java.io.IOException: Unsupported table metadata field type 216
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventMetadataDeserializer.deserialize(TableMapEventMetadataDeserializer.java:52)
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:59)
at com.github.shyiko.mysql.binlog.event.deserialization.TableMapEventDataDeserializer.deserialize(TableMapEventDataDeserializer.java:37)
at com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer.deserializeEventData(EventDeserializer.java:303)
... 7 more

#2001 (comment)

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@leonardBang leonardBang added the bug Something isn't working label Jun 8, 2023
@leonardBang leonardBang added this to the V2.4.0 milestone Jun 8, 2023
@ruanhang1993
Copy link
Contributor

I add a test, but cannot reproduce the error.

MySQL : docker mysql:8.0
Table sql :

CREATE TABLE multi_value_index (
    id bigint NOT NULL,
    f1 json,
    PRIMARY KEY (id)
);

insert into multi_value_index(id, f1)
values (1, cast('{"inner":[1,3]}' as json)),
       (2, cast('{"inner":[1,3,9,8]}' as json));

Test:

@Test
    public void testMultiValueIndex() throws Throwable {
        fullTypesMySql8Database.createAndInitialize();
        String sourceDDL =
                String.format(
                        "CREATE TABLE multi_value_index (\n"
                                + "    `id` INT NOT NULL,\n"
                                + "    f1 STRING,\n"
                                + "    primary key (`id`) not enforced"
                                + ") WITH ("
                                + " 'connector' = 'mysql-cdc',"
                                + " 'hostname' = '%s',"
                                + " 'port' = '%s',"
                                + " 'username' = '%s',"
                                + " 'password' = '%s',"
                                + " 'database-name' = '%s',"
                                + " 'table-name' = '%s',"
                                + " 'scan.incremental.snapshot.enabled' = '%s',"
                                + " 'server-id' = '%s',"
                                + " 'server-time-zone' = 'UTC',"
                                + " 'scan.incremental.snapshot.chunk.size' = '%s'"
                                + ")",
                        MYSQL8_CONTAINER.getHost(),
                        MYSQL8_CONTAINER.getDatabasePort(),
                        fullTypesMySql8Database.getUsername(),
                        fullTypesMySql8Database.getPassword(),
                        fullTypesMySql8Database.getDatabaseName(),
                        "multi_value_index",
                        incrementalSnapshot,
                        getServerId(),
                        getSplitSize());
        tEnv.executeSql(sourceDDL);

        // async submit job
        TableResult result = tEnv.executeSql("SELECT id, f1 FROM multi_value_index");

        CloseableIterator<Row> iterator = result.collect();
        waitForSnapshotStarted(iterator);
        Thread.sleep(10000L);

        try (Connection connection = fullTypesMySql8Database.getJdbcConnection();
             Statement statement = connection.createStatement()) {
            statement.execute(
                    "ALTER TABLE `multi_value_index` ADD INDEX `k_source`  ( (CAST(f1->'$.inner' AS UNSIGNED ARRAY)) );");
            statement.execute(
                    "insert into multi_value_index(id, f1) values (3, cast('{\"inner\":[1,4,5,8]}' as json));");
        }

        while (true) {

        }
//        assertEqualsInAnyOrder(Arrays.asList(), fetchRows(iterator, 2));
//        result.getJobClient().get().cancel().get();
    }

@ruanhang1993
Copy link
Contributor

Debug

截屏2023-06-08 下午6 16 55

The value is:

{"source":{"file":"mysql-bin.000005","pos":1001,"server_id":223344},"position":{"transaction_id":null,"ts_sec":1686219039,"file":"mysql-bin.000005","pos":1240,"gtids":"9444d53c-05e4-11ee-b649-0242ac110004:1-17","server_id":223344},"databaseName":"column_type_test_mysql8_1gppbph","ddl":"ALTER TABLE `multi_value_index` ADD INDEX `k_source`  ( (CAST(f1->'$.inner' AS UNSIGNED ARRAY)) )","tableChanges":[]}

@racoon945
Copy link

racoon945 commented Jun 9, 2023

I add a test, but cannot reproduce the error.

MySQL : docker mysql:8.0 Table sql :

CREATE TABLE multi_value_index (
    id bigint NOT NULL,
    f1 json,
    PRIMARY KEY (id)
);

insert into multi_value_index(id, f1)
values (1, cast('{"inner":[1,3]}' as json)),
       (2, cast('{"inner":[1,3,9,8]}' as json));

Test:

@Test
    public void testMultiValueIndex() throws Throwable {
        fullTypesMySql8Database.createAndInitialize();
        String sourceDDL =
                String.format(
                        "CREATE TABLE multi_value_index (\n"
                                + "    `id` INT NOT NULL,\n"
                                + "    f1 STRING,\n"
                                + "    primary key (`id`) not enforced"
                                + ") WITH ("
                                + " 'connector' = 'mysql-cdc',"
                                + " 'hostname' = '%s',"
                                + " 'port' = '%s',"
                                + " 'username' = '%s',"
                                + " 'password' = '%s',"
                                + " 'database-name' = '%s',"
                                + " 'table-name' = '%s',"
                                + " 'scan.incremental.snapshot.enabled' = '%s',"
                                + " 'server-id' = '%s',"
                                + " 'server-time-zone' = 'UTC',"
                                + " 'scan.incremental.snapshot.chunk.size' = '%s'"
                                + ")",
                        MYSQL8_CONTAINER.getHost(),
                        MYSQL8_CONTAINER.getDatabasePort(),
                        fullTypesMySql8Database.getUsername(),
                        fullTypesMySql8Database.getPassword(),
                        fullTypesMySql8Database.getDatabaseName(),
                        "multi_value_index",
                        incrementalSnapshot,
                        getServerId(),
                        getSplitSize());
        tEnv.executeSql(sourceDDL);

        // async submit job
        TableResult result = tEnv.executeSql("SELECT id, f1 FROM multi_value_index");

        CloseableIterator<Row> iterator = result.collect();
        waitForSnapshotStarted(iterator);
        Thread.sleep(10000L);

        try (Connection connection = fullTypesMySql8Database.getJdbcConnection();
             Statement statement = connection.createStatement()) {
            statement.execute(
                    "ALTER TABLE `multi_value_index` ADD INDEX `k_source`  ( (CAST(f1->'$.inner' AS UNSIGNED ARRAY)) );");
            statement.execute(
                    "insert into multi_value_index(id, f1) values (3, cast('{\"inner\":[1,4,5,8]}' as json));");
        }

        while (true) {

        }
//        assertEqualsInAnyOrder(Arrays.asList(), fetchRows(iterator, 2));
//        result.getJobClient().get().cancel().get();
    }

I noticed that you are using docker mysql:8.0
Can you try with mysql:8.0.18 ?
docker pull mysql:8.0.18
The problem occurred again recently, reproducible in MySQL 8.0.18 but not in 8.0.22-13.

@ruanhang1993
Copy link
Contributor

Hi, @racoon945 .
I change the version to mysql:8.0.18, but the error does not occur.
截屏2023-06-13 下午2 30 25
截屏2023-06-13 下午2 27 02

@ruanhang1993 ruanhang1993 removed this from the V2.4.0 milestone Jun 16, 2023
@ruanhang1993 ruanhang1993 added this to the V2.4.1 milestone Jun 28, 2023
@ruanhang1993 ruanhang1993 removed this from the V2.4.1 milestone Jul 6, 2023
@hanpi14
Copy link

hanpi14 commented Aug 16, 2023

I was using flink1.16.2 and mysql 5.6 for data capture, and such errors also occurred,cdc versions 2.4.0 and 2.4.1 have the same error
image

@ruanhang1993
Copy link
Contributor

Hi, @hanpi14.
Which MySQL version are you using when the error occurs?
And do you know what sql will cause this error ? Thanks.

@ruanhang1993
Copy link
Contributor

Close this issue as no reply received.

If the problem still exists, please re-create your issue in English on Apache Jira under project Flink with component tag Flink CDC. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

4 participants