Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[connector-mysql] Allow user to pass custom JDBC URL parameters used … #921

Merged
merged 1 commit into from Mar 15, 2022

Conversation

paul8263
Copy link
Contributor

…by MySQL data source.

This issue is linked to #674 and #622 .

Added 'connection.pool.url.properties' for mysql table source in order to allow user to customize MySQL JDBC URL parameters.

Change log:

docs/content/connectors/mysql-cdc.md
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/MySqlSourceBuilder.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/config/MySqlSourceOptions.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/connection/PooledDataSourceFactory.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSource.java
flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactory.java
flink-connector-mysql-cdc/src/test/java/com/ververica/cdc/connectors/mysql/table/MySqlTableSourceFactoryTest.java


config.setPoolName(CONNECTION_POOL_PREFIX + hostName + ":" + port);
config.setJdbcUrl(String.format(JDBC_URL_PATTERN, hostName, port));
config.setJdbcUrl(formatJdbcUrlPattern(hostName, port, customProperties));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pass the jdbc url properties like Kafka connector properties?

jdbc.properties.useInformationSchema=ture
jdbc.properties.characterEncoding=UTF-8
....

https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/#properties-bootstrap-servers
In this way we can override all parameters in the JDB_URL

/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public class PooledDataSourceFactory {

public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we let user can also override characterEncoding parameter? we can save these default parameter in a default properties and then merge user passed custom properties as the final properties. HDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @leonardBang ,
Thanks for reviewing. I will do it.

@leonardBang
Copy link
Contributor

@paul8263 The CI failed, please have a look.

/** A connection pool factory to create pooled DataSource {@link HikariDataSource}. */
public class PooledDataSourceFactory {

public static final String JDBC_URL_PATTERN =
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterEncoding=UTF-8&characterSetResults=UTF-8&zeroDateTimeBehavior=CONVERT_TO_NULL";
"jdbc:mysql://%s:%s/?useInformationSchema=true&nullCatalogMeansCurrent=false&useUnicode=true&characterSetResults=UTF-8";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let parameters&useUnicode=true&characterSetResults=UTF-8 can also be overwrite?

Comment on lines 38 to 42
static {
DEFAULT_JDBC_PROPERTIES = new Properties();
DEFAULT_JDBC_PROPERTIES.setProperty("zeroDateTimeBehavior", "CONVERT_TO_NULL");
DEFAULT_JDBC_PROPERTIES.setProperty("characterEncoding", "UTF-8");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A static method is more readable than static code piece

import java.util.Map;
import java.util.Properties;

/** Option utils for JDBC options. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Option utils for JDBC options. */
/** Option utils for JDBC URL properties. */

import java.util.Properties;

/** Option utils for JDBC options. */
public class JDBCOptions {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public class JDBCOptions {
public class JdbcUrlUtils {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er... Should the name keep consistent with DebeziumOptions?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we do not need to follow them as this class is a utils class instead of a set of options.

<td>optional</td>
<td style="word-wrap: break-word;">20</td>
<td>String</td>
<td>Custom JDBC URL properties. Default value is 'zeroDateTimeBehavior=CONVERT_TO_NULL'.</td>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please update the description, we have no default value for custom properties, right?
We can say :
Option to pass custom JDBC URL properties, user can pass custom property like 'jdbc.properties.useSSL'= 'false'.

@@ -202,6 +202,12 @@
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** Custom properties that will append to the JDBC connection URL. */
/** Custom properties that will overwrite the default JDBC connection URL. */

@@ -214,6 +215,12 @@ public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean scanNewlyAdde
return this;
}

/** Custom properties that will append to the JDBC connection URL. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/** Custom properties that will overwrite the  default JDBC connection URL. */

StartupOptions.initial());
StartupOptions.initial(),
false,
new Properties());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cover the change in testOptionalProperties() ?

@paul8263
Copy link
Contributor Author

Hi @leonardBang ,
I am confused why the CI got a series of errors like Generate Splits for table xxx.xxx error.
It seems that flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner::getNext() method encountered a NPE.

@leonardBang
Copy link
Contributor

Hi @leonardBang , I am confused why the CI got a series of errors like Generate Splits for table xxx.xxx error. It seems that flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner::getNext() method encountered a NPE.

It's wired, but we need rebase the master to resolve the conflicts

@paul8263
Copy link
Contributor Author

Hi @leonardBang , I am confused why the CI got a series of errors like Generate Splits for table xxx.xxx error. It seems that flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssigner::getNext() method encountered a NPE.

It's wired, but we need rebase the master to resolve the conflicts

Thanks. I resolved the conflicts and I am waiting to see if the weird thing still exists.

Copy link
Contributor

@leonardBang leonardBang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants