Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support multiple tables sink for Flink CDC #86

Merged
merged 97 commits into from
Oct 12, 2022
Merged

Conversation

dmetasoul01
Copy link
Contributor

@dmetasoul01 dmetasoul01 commented Sep 25, 2022

Support read from multiple tables with different schemas into corresponding lakesoul tables.

Close #84
Close #87

@dmetasoul01 dmetasoul01 added enhancement New feature or request flink flink support into lakesoul labels Sep 25, 2022
@dmetasoul01 dmetasoul01 self-assigned this Sep 25, 2022
return self();
}

public T withRollingPolicy(CheckpointRollingPolicy<RowData, String> rollingPolicy) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanIgnoreReturnValueSuggester: Methods that always 'return this' should be annotated with @CanIgnoreReturnValue


Suggested change
public T withRollingPolicy(CheckpointRollingPolicy<RowData, String> rollingPolicy) {
@CanIgnoreReturnValue

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

return RowType.of(true, new IntType(true));
}

public T withBucketCheckInterval(final long interval) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanIgnoreReturnValueSuggester: Methods that always 'return this' should be annotated with @CanIgnoreReturnValue


Suggested change
public T withBucketCheckInterval(final long interval) {
@CanIgnoreReturnValue

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

return self();
}

public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CanIgnoreReturnValueSuggester: Methods that always 'return this' should be annotated with @CanIgnoreReturnValue


Suggested change
public T withOutputFileConfig(final OutputFileConfig outputFileConfig) {
@CanIgnoreReturnValue

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

TableSchemaIdentity tableId,
String bucketId,
TableSchemaWriterCreator creator) throws IOException {
LakeSoulWriterBucket bucket = activeBuckets.get(Tuple2.of(creator.identity.tableId, bucketId));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CollectionIncompatibleType: Argument 'Tuple2.of(creator.identity.tableId, bucketId)' should not be passed to this method; its type Tuple2<TableId, String> is not compatible with its collection's type argument Tuple2<TableSchemaIdentity, String>


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]


import org.apache.flink.api.common.functions.Partitioner;

public class DataPartitioner<Long> implements Partitioner<Long> {
public class HashPartitioner<Long> implements Partitioner<Long> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

JavaLangClash: Long clashes with java.lang.Long


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

return id.hashCode();
}

public int compareTo(TableId that) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MissingImplementsComparable: Classes implementing valid compareTo function should implement Comparable interface


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

}

public int compareTo(TableId that) {
if (this == that) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReferenceEquality: Comparison using reference equality instead of value equality


Suggested change
if (this == that) {
if (this.equals(that)) {

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

FileSystem fileSystem = p.getFileSystem();
return p.makeQualified(fileSystem);
}

public static String getDatabaseName(String fullDatabaseName) {
String[] splited = fullDatabaseName.split("\\.");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringSplitter: String.split(String) has surprising behavior


Suggested change
String[] splited = fullDatabaseName.split("\\.");
List<String> splited = Splitter.on('.').splitToList(fullDatabaseName);

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]


import org.apache.flink.api.common.functions.Partitioner;

public class DataPartitioner<Long> implements Partitioner<Long> {
public class HashPartitioner<Long> implements Partitioner<Long> {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeNameShadowing: Found type parameters shadowing other declared types:


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

@@ -60,25 +55,13 @@ public DynamicTableSink createDynamicTableSink(Context context) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnusedVariable: The field 'TABLE_NAME' is never read.


ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

* DataStream sink fileSystem and upload metadata
*/
private DataStreamSink<?> createStreamingSink(DataStream<RowData> dataStream,
Context sinkContext) throws IOException {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UnusedVariable: The parameter 'sinkContext' is never read.


Suggested change
Context sinkContext) throws IOException {
return createStreamingSink(dataStream);

ℹ️ Learn about @sonatype-lift commands

You can reply with the following commands. For example, reply with @sonatype-lift ignoreall to leave out all findings.

Command Usage
@sonatype-lift ignore Leave out the above finding from this PR
@sonatype-lift ignoreall Leave out all the existing findings from this PR
@sonatype-lift exclude <file|issue|path|tool> Exclude specified file|issue|path|tool from Lift findings by updating your config.toml file

Note: When talking to LiftBot, you need to refresh the page to see its response.
Click here to add LiftBot to another repo.


Was this a good recommendation?
[ 🙁 Not relevant ] - [ 😕 Won't fix ] - [ 😑 Not critical, will fix ] - [ 🙂 Critical, will fix ] - [ 😊 Critical, fixing now ]

dmetasoul-opensource and others added 27 commits October 8, 2022 11:44
@moresun moresun merged commit f73089a into main Oct 12, 2022
@dmetasoul01 dmetasoul01 deleted the flink_cdc_multitable branch October 12, 2022 04:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request flink flink support into lakesoul
Projects
No open projects
Development

Successfully merging this pull request may close these issues.

[SPARK] Use non-session catalog before spark 3.3 Support multiple table sync in Flink CDC Sink
6 participants