From 642a9a326e593f8ab4cc660b51678431d8d97920 Mon Sep 17 00:00:00 2001 From: Leonard Xu Date: Wed, 7 Jun 2023 17:49:41 +0800 Subject: [PATCH] [mysql-cdc] Optimize the error msg when binlog expire or server id conflict This closes #2010. --- .../cdc/debezium/DebeziumSourceFunction.java | 19 +- .../task/context/StatefulTaskContext.java | 2 +- .../MySqlStreamingChangeEventSource.java | 4 + .../mysql/util/ErrorMessageUtils.java | 55 ++++++ .../cdc/connectors/mysql/MySqlTestUtils.java | 16 ++ .../reader/BinlogSplitReaderTest.java | 167 ++++++++++++++++-- .../mysql/source/MySqlSourceTestBase.java | 6 +- .../mysql/table/MySqlConnectorITCase.java | 83 ++++++++- .../mysql/util/ErrorMessageUtilsTest.java | 59 +++++++ .../docker/server-gtids/expire-seconds/my.cnf | 64 +++++++ 10 files changed, 454 insertions(+), 21 deletions(-) create mode 100644 flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java create mode 100644 flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/util/ErrorMessageUtilsTest.java create mode 100644 flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/expire-seconds/my.cnf diff --git a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java index 9149685500b..2b5d0be0384 100644 --- a/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java +++ b/flink-connector-debezium/src/main/java/com/ververica/cdc/debezium/DebeziumSourceFunction.java @@ -442,7 +442,24 @@ public void run(SourceContext sourceContext) throws Exception { "sourceIdleTime", (Gauge) () -> debeziumChangeFetcher.getIdleTime()); // start the real debezium consumer - debeziumChangeFetcher.runFetchLoop(); + try { + debeziumChangeFetcher.runFetchLoop(); + } catch (Throwable t) { + if (t.getMessage() != null + && t.getMessage() + .contains( + "A slave with the same server_uuid/server_id as this slave has connected to the master")) { + throw new RuntimeException( + "The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n" + + "The server id conflict may happen in the following situations: \n" + + "1. The server id has been used by other mysql cdc table in the current job.\n" + + "2. The server id has been used by the mysql cdc table in other jobs.\n" + + "3. The server id has been used by other sync tools like canal, debezium and so on.\n", + t); + } else { + throw t; + } + } } @Override diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java index 80c10bd45d9..41dd3501cbb 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java @@ -176,7 +176,7 @@ private void validateAndLoadDatabaseHistory( } /** Loads the connector's persistent offset (if present) via the given loader. */ - private MySqlOffsetContext loadStartingOffsetState( + protected MySqlOffsetContext loadStartingOffsetState( OffsetContext.Loader loader, MySqlSplit mySqlSplit) { BinlogOffset offset = mySqlSplit.isSnapshotSplit() diff --git a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java index c43e62dd73b..f7ac0bded8e 100644 --- a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java +++ b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlStreamingChangeEventSource.java @@ -36,6 +36,7 @@ import io.debezium.config.Configuration; import io.debezium.connector.mysql.MySqlConnectorConfig.GtidNewChannelPosition; import io.debezium.connector.mysql.MySqlConnectorConfig.SecureConnectionMode; +import io.debezium.connector.mysql.util.ErrorMessageUtils; import io.debezium.data.Envelope.Operation; import io.debezium.function.BlockingConsumer; import io.debezium.pipeline.ErrorHandler; @@ -88,6 +89,8 @@ *

Line 268 ~ 270: Clean cache on rotate event to prevent it from growing indefinitely. We should * remove this class after we bumped a higher debezium version where the * https://issues.redhat.com/browse/DBZ-5126 has been fixed. + * + *

Line 1386 : Add more error details for some exceptions. */ public class MySqlStreamingChangeEventSource implements StreamingChangeEventSource { @@ -1380,6 +1383,7 @@ protected DebeziumException wrap(Throwable error) { + e.getSQLState() + "."; } + msg = ErrorMessageUtils.optimizeErrorMessage(msg); return new DebeziumException(msg, error); } diff --git a/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java new file mode 100644 index 00000000000..0ebb908d26b --- /dev/null +++ b/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java @@ -0,0 +1,55 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql.util; + +import java.util.regex.Pattern; + +/** This util tries to optimize error message for some exceptions. */ +public class ErrorMessageUtils { + private static final Pattern SERVER_ID_CONFLICT = + Pattern.compile( + ".*A slave with the same server_uuid/server_id as this slave has connected to the master.*"); + private static final Pattern MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE = + Pattern.compile( + ".*The connector is trying to read binlog.*but this is no longer available on the server.*"); + private static final Pattern MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE = + Pattern.compile(".*Cannot replicate because the master purged required binary logs.*"); + + /** Add more error details for some exceptions. */ + public static String optimizeErrorMessage(String msg) { + if (msg == null) { + return null; + } + if (SERVER_ID_CONFLICT.matcher(msg).matches()) { + // Optimize the error msg when server id conflict + msg += + "\nThe 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n" + + "The server id conflict may happen in the following situations: \n" + + "1. The server id has been used by other mysql cdc table in the current job.\n" + + "2. The server id has been used by the mysql cdc table in other jobs.\n" + + "3. The server id has been used by other sync tools like canal, debezium and so on.\n"; + } else if (MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE.matcher(msg).matches() + || MISSING_TRANSACTION_WHEN_BINLOG_EXPIRE.matcher(msg).matches()) { + // Optimize the error msg when binlog is unavailable + msg += + "\nThe required binary logs are no longer available on the server. This may happen in following situations:\n" + + "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n" + + "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side."; + } + return msg; + } +} diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java index ad21e8ee483..8ff2f410418 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/MySqlTestUtils.java @@ -49,6 +49,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import static org.junit.Assert.assertTrue; + /** Utils to help test. */ public class MySqlTestUtils { @@ -173,6 +175,20 @@ private static Properties createDebeziumProperties(boolean useLegacyImplementati return debeziumProps; } + public static void assertContainsErrorMsg(Throwable t, String errorMsg) { + Throwable temp = t; + boolean findFixMsg = false; + while (temp != null) { + findFixMsg = findFixMsg || temp.getMessage().contains(errorMsg); + if (findFixMsg) { + break; + } else { + temp = temp.getCause(); + } + } + assertTrue(findFixMsg); + } + // --------------------------------------------------------------------------------------- /** diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java index 8dba20a2986..e6eeecf5667 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java @@ -40,19 +40,28 @@ import com.ververica.cdc.connectors.mysql.source.utils.RecordUtils; import com.ververica.cdc.connectors.mysql.source.utils.TableDiscoveryUtils; import com.ververica.cdc.connectors.mysql.table.StartupOptions; +import com.ververica.cdc.connectors.mysql.testutils.MySqlContainer; +import com.ververica.cdc.connectors.mysql.testutils.MySqlVersion; import com.ververica.cdc.connectors.mysql.testutils.RecordsFormatter; import com.ververica.cdc.connectors.mysql.testutils.UniqueDatabase; import io.debezium.connector.mysql.MySqlConnection; import io.debezium.connector.mysql.MySqlConnectorConfig; +import io.debezium.connector.mysql.MySqlOffsetContext; import io.debezium.jdbc.JdbcConnection; +import io.debezium.pipeline.spi.OffsetContext; import io.debezium.relational.TableId; import io.debezium.relational.history.TableChanges; import io.debezium.relational.history.TableChanges.TableChange; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import org.testcontainers.lifecycle.Startables; +import java.sql.Connection; import java.sql.SQLException; +import java.sql.Statement; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -64,7 +73,10 @@ import java.util.Properties; import java.util.function.Predicate; import java.util.stream.Collectors; +import java.util.stream.Stream; +import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.assertContainsErrorMsg; +import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo; import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit; import static org.junit.Assert.assertEquals; @@ -74,15 +86,35 @@ /** Tests for {@link BinlogSplitReader}. */ public class BinlogSplitReaderTest extends MySqlSourceTestBase { - + private static final String TEST_USER = "mysqluser"; + private static final String TEST_PASSWORD = "mysqlpw"; private static final Duration DEFAULT_TIMEOUT = Duration.ofSeconds(30); private final UniqueDatabase customerDatabase = - new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw"); + new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD); + + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); + private final UniqueDatabase inventoryDatabase8 = + new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); private BinaryLogClient binaryLogClient; private MySqlConnection mySqlConnection; + @BeforeClass + public static void beforeClass() { + LOG.info("Starting MySql8 containers..."); + Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join(); + LOG.info("Container MySql8 is started."); + } + + @AfterClass + public static void afterClass() { + LOG.info("Stopping MySql8 containers..."); + MYSQL8_CONTAINER.stop(); + LOG.info("Container MySql8 is stopped."); + } + @Test public void testReadSingleBinlogSplit() throws Exception { customerDatabase.createAndInitialize(); @@ -689,7 +721,7 @@ public void testHeartbeatEvent() throws Exception { // Create config and initializer client and connections MySqlSourceConfig sourceConfig = - getConfigFactory(new String[] {"customers"}) + getConfigFactory(MYSQL_CONTAINER, customerDatabase, new String[] {"customers"}) .startupOptions(StartupOptions.latest()) .heartbeatInterval(heartbeatInterval) .debeziumProperties(dbzProps) @@ -721,9 +753,75 @@ public void testHeartbeatEvent() throws Exception { "Timeout waiting for heartbeat event"); } + @Test + public void testReadBinlogFromUnavailableBinlog() throws Exception { + // Preparations + inventoryDatabase8.createAndInitialize(); + MySqlSourceConfig connectionConfig = + getConfig(MYSQL8_CONTAINER, inventoryDatabase8, new String[] {"products"}); + binaryLogClient = DebeziumUtils.createBinaryClient(connectionConfig.getDbzConfiguration()); + mySqlConnection = DebeziumUtils.createMySqlConnection(connectionConfig); + + // Capture the current binlog offset, and we will start the reader from here + BinlogOffset startingOffset = DebeziumUtils.currentBinlogOffset(mySqlConnection); + + // Create a new config to start reading from the offset captured above + MySqlSourceConfig sourceConfig = + getConfig( + MYSQL8_CONTAINER, + inventoryDatabase8, + StartupOptions.specificOffset(startingOffset.getGtidSet()), + new String[] {"products"}); + + // Create some binlog events and expire the binlog + try (Connection connection = inventoryDatabase8.getJdbcConnection(); + Statement statement = connection.createStatement()) { + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute( + "UPDATE products SET description='new water resistent white wind breaker', weight='0.5' WHERE id=110;"); + statement.execute("UPDATE products SET weight='5.17' WHERE id=111;"); + statement.execute("DELETE FROM products WHERE id=111;"); + statement.execute("FLUSH LOGS;"); + Thread.sleep(3000); + statement.execute( + "INSERT INTO products VALUES (default,'jacket','water resistent white wind breaker',0.2);"); + statement.execute( + "INSERT INTO products VALUES (default,'scooter','Big 2-wheel scooter ',5.18);"); + statement.execute("FLUSH LOGS;"); + Thread.sleep(3000); + } + + // Create reader and submit splits + MySqlBinlogSplit split = createBinlogSplit(sourceConfig); + BinlogSplitReader reader = createBinlogReader(sourceConfig, true); + + try { + reader.submitSplit(split); + reader.pollSplitRecords(); + } catch (Throwable t) { + assertContainsErrorMsg( + t, + "The required binary logs are no longer available on the server. This may happen in following situations:\n" + + "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n" + + "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side."); + } + } + private BinlogSplitReader createBinlogReader(MySqlSourceConfig sourceConfig) { + return createBinlogReader(sourceConfig, false); + } + + private BinlogSplitReader createBinlogReader( + MySqlSourceConfig sourceConfig, boolean skipValidStartingOffset) { return new BinlogSplitReader( - new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection), 0); + skipValidStartingOffset + ? new TestStatefulTaskContext( + sourceConfig, binaryLogClient, mySqlConnection) + : new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection), + 0); } private MySqlBinlogSplit createBinlogSplit(MySqlSourceConfig sourceConfig) throws Exception { @@ -1016,28 +1114,44 @@ private List getMySqlSplits( } private MySqlSourceConfig getConfig(StartupOptions startupOptions, String[] captureTables) { - return getConfigFactory(captureTables).startupOptions(startupOptions).createConfig(0); + return getConfig(MYSQL_CONTAINER, customerDatabase, startupOptions, captureTables); + } + + private MySqlSourceConfig getConfig( + MySqlContainer container, + UniqueDatabase database, + StartupOptions startupOptions, + String[] captureTables) { + return getConfigFactory(container, database, captureTables) + .startupOptions(startupOptions) + .createConfig(0); } private MySqlSourceConfig getConfig(String[] captureTables) { - return getConfigFactory(captureTables).createConfig(0); + return getConfig(MYSQL_CONTAINER, customerDatabase, captureTables); } - private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) { + private MySqlSourceConfig getConfig( + MySqlContainer container, UniqueDatabase database, String[] captureTables) { + return getConfigFactory(container, database, captureTables).createConfig(0); + } + + private MySqlSourceConfigFactory getConfigFactory( + MySqlContainer container, UniqueDatabase database, String[] captureTables) { String[] captureTableIds = Arrays.stream(captureTables) - .map(tableName -> customerDatabase.getDatabaseName() + "." + tableName) + .map(tableName -> database.getDatabaseName() + "." + tableName) .toArray(String[]::new); return new MySqlSourceConfigFactory() - .databaseList(customerDatabase.getDatabaseName()) + .databaseList(database.getDatabaseName()) .tableList(captureTableIds) - .hostname(MYSQL_CONTAINER.getHost()) - .port(MYSQL_CONTAINER.getDatabasePort()) - .username(customerDatabase.getUsername()) + .hostname(container.getHost()) + .port(container.getDatabasePort()) + .username(database.getUsername()) .splitSize(4) .fetchSize(2) - .password(customerDatabase.getPassword()); + .password(database.getPassword()); } private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception { @@ -1045,4 +1159,31 @@ private void addColumnToTable(JdbcConnection connection, String tableId) throws "ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213"); connection.commit(); } + + /** This stateful task context will skip valid the starting offset. */ + private static class TestStatefulTaskContext extends StatefulTaskContext { + + public TestStatefulTaskContext( + MySqlSourceConfig sourceConfig, + BinaryLogClient binaryLogClient, + MySqlConnection connection) { + super(sourceConfig, binaryLogClient, connection); + } + + @Override + protected MySqlOffsetContext loadStartingOffsetState( + OffsetContext.Loader loader, MySqlSplit mySqlSplit) { + BinlogOffset offset = + mySqlSplit.isSnapshotSplit() + ? BinlogOffset.ofEarliest() + : initializeEffectiveOffset( + mySqlSplit.asBinlogSplit().getStartingOffset(), + getConnection()); + + LOG.info("Starting offset is initialized to {}", offset); + + MySqlOffsetContext mySqlOffsetContext = loader.load(offset.getOffset()); + return mySqlOffsetContext; + } + } } diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 36901a0d98b..99ec4b858e8 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -76,9 +76,13 @@ public static void stopContainers() { } protected static MySqlContainer createMySqlContainer(MySqlVersion version) { + return createMySqlContainer(version, "docker/server-gtids/my.cnf"); + } + + protected static MySqlContainer createMySqlContainer(MySqlVersion version, String configPath) { return (MySqlContainer) new MySqlContainer(version) - .withConfigurationOverride("docker/server-gtids/my.cnf") + .withConfigurationOverride(configPath) .withSetupSQL("docker/setup.sql") .withDatabaseName("flink-test") .withUsername("flinkuser") diff --git a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java index a75b5fb618e..d340551e5d7 100644 --- a/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java +++ b/flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlConnectorITCase.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.api.bridge.java.StreamStatementSet; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; @@ -63,11 +64,13 @@ import java.util.stream.Stream; import static com.ververica.cdc.connectors.mysql.LegacyMySqlSourceTest.currentMySqlLatestOffset; +import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.assertContainsErrorMsg; import static com.ververica.cdc.connectors.mysql.MySqlTestUtils.waitForJobStatus; import static org.apache.flink.api.common.JobStatus.RUNNING; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** Integration tests for MySQL Table source. */ @RunWith(Parameterized.class) @@ -78,7 +81,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { private static final String TEST_USER = "mysqluser"; private static final String TEST_PASSWORD = "mysqlpw"; - private static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0); + private static final MySqlContainer MYSQL8_CONTAINER = + createMySqlContainer(MySqlVersion.V8_0, "docker/server-gtids/expire-seconds/my.cnf"); private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); @@ -99,8 +103,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase { private final UniqueDatabase userDatabase2 = new UniqueDatabase(MYSQL_CONTAINER, "user_2", TEST_USER, TEST_PASSWORD); - private final UniqueDatabase charsetTestDatabase = - new UniqueDatabase(MYSQL_CONTAINER, "charset_test", TEST_USER, TEST_PASSWORD); + private final UniqueDatabase inventoryDatabase8 = + new UniqueDatabase(MYSQL8_CONTAINER, "inventory", TEST_USER, TEST_PASSWORD); private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); @@ -1257,7 +1261,6 @@ public void testReadingWithDotTableName() throws Exception { + " 'password' = '%s'," + " 'database-name' = '%s'," + " 'table-name' = '%s'," - + " 'debezium.internal.implementation' = '%s'," + " 'scan.incremental.snapshot.enabled' = '%s'," + " 'server-id' = '%s'," + " 'scan.incremental.snapshot.chunk.size' = '%s'" @@ -1268,7 +1271,6 @@ public void testReadingWithDotTableName() throws Exception { customer3_0Database.getPassword(), customer3_0Database.getDatabaseName(), "customers3.0", - getDezImplementation(), incrementalSnapshot, getServerId(), getSplitSize()); @@ -2011,6 +2013,70 @@ public void testReadingWithMultiMaxValue() throws Exception { result.getJobClient().get().cancel().get(); } + @Test + public void testServerIdConflict() { + try { + env.setRestartStrategy(RestartStrategies.noRestart()); + customerDatabase.createAndInitialize(); + int base = 5400; + for (int i = 0; i < 2; i++) { + String sourceDDL = + String.format( + "CREATE TABLE debezium_source%d (" + + " `id` INTEGER NOT NULL," + + " `name` STRING," + + " `address` STRING," + + " `phone_name` STRING," + + " primary key (`id`) not enforced" + + ") WITH (" + + " 'connector' = 'mysql-cdc'," + + " 'hostname' = '%s'," + + " 'port' = '%s'," + + " 'username' = '%s'," + + " 'password' = '%s'," + + " 'database-name' = '%s'," + + " 'table-name' = '%s'," + + " 'scan.incremental.snapshot.enabled' = '%s'," + + " 'server-id' = '%s'," + + " 'scan.incremental.snapshot.chunk.size' = '%s'" + + ")", + i, + MYSQL_CONTAINER.getHost(), + MYSQL_CONTAINER.getDatabasePort(), + customerDatabase.getUsername(), + customerDatabase.getPassword(), + customerDatabase.getDatabaseName(), + "customers", + incrementalSnapshot, + getServerId(base), + getSplitSize()); + String sinkDDL = + String.format( + "CREATE TABLE blackhole_table%d WITH ('connector' = 'blackhole')\n" + + " LIKE debezium_source%d (EXCLUDING ALL)", + i, i); + tEnv.executeSql(sourceDDL); + tEnv.executeSql(sinkDDL); + } + + StreamStatementSet statementSet = tEnv.createStatementSet(); + statementSet.addInsertSql( + "Insert into blackhole_table0 select * from debezium_source0"); + statementSet.addInsertSql( + "Insert into blackhole_table1 select * from debezium_source1"); + statementSet.execute().await(); + fail(); + } catch (Throwable t) { + assertContainsErrorMsg( + t, + "The 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n" + + "The server id conflict may happen in the following situations: \n" + + "1. The server id has been used by other mysql cdc table in the current job.\n" + + "2. The server id has been used by the mysql cdc table in other jobs.\n" + + "3. The server id has been used by other sync tools like canal, debezium and so on.\n"); + } + } + // ------------------------------------------------------------------------------------ private String getServerId() { @@ -2022,6 +2088,13 @@ private String getServerId() { return String.valueOf(serverId); } + protected String getServerId(int base) { + if (incrementalSnapshot) { + return base + "-" + (base + DEFAULT_PARALLELISM); + } + return String.valueOf(base); + } + private int getSplitSize() { if (incrementalSnapshot) { // test parallel read diff --git a/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/util/ErrorMessageUtilsTest.java b/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/util/ErrorMessageUtilsTest.java new file mode 100644 index 00000000000..62df7f69921 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/java/io/debezium/connector/mysql/util/ErrorMessageUtilsTest.java @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Ververica Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.debezium.connector.mysql.util; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +/** The tests for {@link ErrorMessageUtils}. */ +public class ErrorMessageUtilsTest { + @Test + public void testOptimizeErrorMessageWhenServerIdConflict() { + assertEquals( + "A slave with the same server_uuid/server_id as this slave has connected to the master Error code: 1236; SQLSTATE: HY000." + + "\nThe 'server-id' in the mysql cdc connector should be globally unique, but conflicts happen now.\n" + + "The server id conflict may happen in the following situations: \n" + + "1. The server id has been used by other mysql cdc table in the current job.\n" + + "2. The server id has been used by the mysql cdc table in other jobs.\n" + + "3. The server id has been used by other sync tools like canal, debezium and so on.\n", + ErrorMessageUtils.optimizeErrorMessage( + "A slave with the same server_uuid/server_id as this slave has connected to the master Error code: 1236; SQLSTATE: HY000.")); + } + + @Test + public void testOptimizeErrorMessageWhenMissingBinlogPosition() { + assertEquals( + "Cannot replicate because the master purged required binary logs. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID set sent by the slave is 'b9d6f3df-79e7-11ed-9a81-0242ac110004:1-33', and the missing transactions are 'b9d6f3df-79e7-11ed-9a81-0242ac110004:34'" + + "\nThe required binary logs are no longer available on the server. This may happen in following situations:\n" + + "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n" + + "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side.", + ErrorMessageUtils.optimizeErrorMessage( + "Cannot replicate because the master purged required binary logs. Replicate the missing transactions from elsewhere, or provision a new slave from backup. Consider increasing the master's binary log expiration period. The GTID set sent by the slave is 'b9d6f3df-79e7-11ed-9a81-0242ac110004:1-33', and the missing transactions are 'b9d6f3df-79e7-11ed-9a81-0242ac110004:34'")); + } + + @Test + public void testOptimizeErrorMessageWhenMissingTransaction() { + assertEquals( + "The connector is trying to read binlog starting at Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1670826084012,db=,server_id=0,file=mysql-bin.000005,pos=3845,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed." + + "\nThe required binary logs are no longer available on the server. This may happen in following situations:\n" + + "1. The speed of CDC source reading is too slow to exceed the binlog expired period. You can consider increasing the binary log expiration period, you can also to check whether there is back pressure in the job and optimize your job.\n" + + "2. The job runs normally, but something happens in the database and lead to the binlog cleanup. You can try to check why this cleanup happens from MySQL side.", + ErrorMessageUtils.optimizeErrorMessage( + "The connector is trying to read binlog starting at Struct{version=1.6.4.Final,connector=mysql,name=mysql_binlog_source,ts_ms=1670826084012,db=,server_id=0,file=mysql-bin.000005,pos=3845,row=0}, but this is no longer available on the server. Reconfigure the connector to use a snapshot when needed.")); + } +} diff --git a/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/expire-seconds/my.cnf b/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/expire-seconds/my.cnf new file mode 100644 index 00000000000..0b6ad13c836 --- /dev/null +++ b/flink-connector-mysql-cdc/src/test/resources/docker/server-gtids/expire-seconds/my.cnf @@ -0,0 +1,64 @@ +# Copyright 2022 Ververica Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# For advice on how to change settings please see +# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html + +[mysqld] +# +# Remove leading # and set to the amount of RAM for the most important data +# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%. +# innodb_buffer_pool_size = 128M +# +# Remove leading # to turn on a very important data integrity option: logging +# changes to the binary log between backups. +# log_bin +# +# Remove leading # to set options mainly useful for reporting servers. +# The server defaults are faster for transactions and fast SELECTs. +# Adjust sizes as needed, experiment to find the optimal values. +# join_buffer_size = 128M +# sort_buffer_size = 2M +# read_rnd_buffer_size = 2M +skip-host-cache +skip-name-resolve +#datadir=/var/lib/mysql +#socket=/var/lib/mysql/mysql.sock +secure-file-priv=/var/lib/mysql +user=mysql + +# Disabling symbolic-links is recommended to prevent assorted security risks +symbolic-links=0 + +#log-error=/var/log/mysqld.log +#pid-file=/var/run/mysqld/mysqld.pid + +# ---------------------------------------------- +# Enable the binlog for replication & CDC +# ---------------------------------------------- + +# Enable binary replication log and set the prefix, expiration, and log format. +# The prefix is arbitrary, expiration can be short for integration tests but would +# be longer on a production system. Row-level info is required for ingest to work. +# Server ID is required, but this will vary on production systems. +server-id = 223344 +log_bin = mysql-bin +binlog_format = row +# Make binlog_expire_logs_seconds = 1 and max_binlog_size = 4096 to test the exception +# message when the binlog expires in the server. +binlog_expire_logs_seconds = 1 +max_binlog_size = 4096 + +# enable gtid mode +gtid_mode = on +enforce_gtid_consistency = on \ No newline at end of file