-
Notifications
You must be signed in to change notification settings - Fork 536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(jdbc-sink): add schema support for postgres sink #10576
Conversation
Codecov Report
@@ Coverage Diff @@
## main #10576 +/- ##
=======================================
Coverage 70.14% 70.14%
=======================================
Files 1276 1276
Lines 219818 219818
=======================================
Hits 154200 154200
Misses 65618 65618
Flags with carried forward coverage won't be shown. Click here to find out more. see 2 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rest LGTM.
java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSink.java
Outdated
Show resolved
Hide resolved
...nnector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/JDBCSinkConfig.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 3692 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
1668 | 1 | 2023 | 0 |
Click to see the invalid file list
- java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/SchemaTableName.java
...r-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/SchemaTableName.java
Show resolved
Hide resolved
e5bf443
to
1f1dc0d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
license-eye has totally checked 3695 files.
Valid | Invalid | Ignored | Fixed |
---|---|---|---|
1669 | 2 | 2024 | 0 |
Click to see the invalid file list
- java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresSchemaTableName.java
- java/connector-node/risingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/SchemaTableName.java
...isingwave-sink-jdbc/src/main/java/com/risingwave/connector/jdbc/PostgresSchemaTableName.java
Outdated
Show resolved
Hide resolved
return tableName; | ||
} | ||
|
||
public String getNormalizedTableName() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method should be inside the JdbcDialect
, and has the signature of getNormalizedTableName(SchemaTableName schemaTableName)
, since how to concatenate the schema name and table name is a database-specific behavior. This class should be simply a POJO that contains that raw input from user.
package com.risingwave.connector.jdbc; | ||
|
||
public class PostgresSchemaTableName extends SchemaTableName { | ||
public PostgresSchemaTableName(String schemaName, String tableName) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have a createSchemaTable
in JdbcDialect
. We can set the default schema name there. This class is unnecessary.
String columns = | ||
fieldNames.stream().map(this::quoteIdentifier).collect(Collectors.joining(", ")); | ||
String placeholders = fieldNames.stream().map(f -> "?").collect(Collectors.joining(", ")); | ||
return "INSERT INTO " | ||
+ quoteIdentifier(tableName) | ||
+ quoteIdentifier(tableName.getNormalizedTableName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be "INSERT INTO " + getNormalizedTableName() ...
. The current code cannot handle databases that quote the schema name and table name separatedly (i.e. `schema_name`.`table_name`
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for the PR!
Co-authored-by: lmatz <lmatz823@gmail.com>
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
schema.name
field to the WITH clause to allow user to specify schema for pg sinkFix #10418
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Click here for Documentation
Types of user-facing changes
Please keep the types that apply to your changes, and remove the others.
Release note
Add a optional
schema.name
field to the WITH clause to allow user to specify schema for Postgres sink