Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(connector): support hudi sink #8824

Draft
wants to merge 26 commits into
base: main
Choose a base branch
from
Draft

Conversation

wenym1
Copy link
Contributor

@wenym1 wenym1 commented Mar 28, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

In this PR, we support a very trivial hudi sink. The hudi sink only supports hudi MOR table, has only one parallelism, and is sinking to only one file group, with file id named risingwave-file-id. For this file group, the initial write is regarded as insert and write the initial base parquet file, and all subsequent updates are regarded as update, and write the log file.

An example SQL to create the hudi sink is

create sink hudi_sink from s1
with (
    connector = 'hudi',
    type='upsert',
    primary_key = 'seq_id',
    base.path = 's3a://my-bucket',
    table.name = 'hudi_tbl1',
    s3.endpoint = 'https://s3.ap-southeast-1.amazonaws.com/',
    s3.access.key = 'xxxxxxxxx',
    s3.secret.key = 'xxxxxxxx',
);

Checklist For Contributors

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • I have demonstrated that backward compatibility is not broken by breaking changes and created issues to track deprecated features to be removed in the future. (Please refer to the issue)
  • All checks passed in ./risedev check (or alias, ./risedev c)

Checklist For Reviewers

  • I have requested macro/micro-benchmarks as this PR can affect performance substantially, and the results are shown.

Documentation

  • My PR DOES NOT contain user-facing changes.
Click here for Documentation

Types of user-facing changes

Please keep the types that apply to your changes, and remove the others.

  • Installation and deployment
  • Connector (sources & sinks)
  • SQL commands, functions, and operators
  • RisingWave cluster configuration changes
  • Other (please specify in the release note below)

Release note

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 3147 files.

Valid Invalid Ignored Fixed
1460 8 1679 0
Click to see the invalid file list
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HoodieRisingWaveMergeOnReadTable.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HoodieRisingWaveWriter.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HudiSink.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HudiSinkFactory.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveCreateHandleFactory.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveInsertCommitActionExecutor.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveUpsertPreppedDeltaCommitActionExecutor.java
  • java/connector-node/risingwave-sink-hudi/src/test/java/com/risingwave/connector/HudiSinkFactoryTest.java

Copy link
Contributor

@github-actions github-actions bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

license-eye has totally checked 3246 files.

Valid Invalid Ignored Fixed
1497 9 1740 0
Click to see the invalid file list
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HoodieRisingWaveMergeOnReadTable.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HoodieRisingWaveWriter.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HudiSink.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HudiSinkConfig.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/HudiSinkFactory.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveCreateHandleFactory.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveInsertCommitActionExecutor.java
  • java/connector-node/risingwave-sink-hudi/src/main/java/com/risingwave/connector/RisingWaveUpsertPreppedDeltaCommitActionExecutor.java
  • java/connector-node/risingwave-sink-hudi/src/test/java/com/risingwave/connector/HudiSinkFactoryTest.java

@wenym1 wenym1 marked this pull request as ready for review May 9, 2023 01:10
@codecov
Copy link

codecov bot commented May 9, 2023

Codecov Report

Merging #8824 (f28ed87) into main (33b583e) will decrease coverage by 0.01%.
The diff coverage is 100.00%.

@@            Coverage Diff             @@
##             main    #8824      +/-   ##
==========================================
- Coverage   70.98%   70.97%   -0.01%     
==========================================
  Files        1241     1241              
  Lines      207549   207549              
==========================================
- Hits       147329   147314      -15     
- Misses      60220    60235      +15     
Flag Coverage Δ
rust 70.97% <100.00%> (-0.01%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Impacted Files Coverage Δ
src/connector/src/sink/remote.rs 58.12% <ø> (ø)
...rc/frontend/src/optimizer/plan_node/stream_sink.rs 70.70% <100.00%> (ø)

... and 7 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@liurenjie1024 liurenjie1024 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simple implementation, generally LGTM, but maybe we should add some test case?

* limitations under the License.
*/

package com.risingwave.connector;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
package com.risingwave.connector;
package com.risingwave.connector.hudi;

if (delete != null) {
map.put(key, HudiSinkRowOp.deleteOp(key));
} else {
map.remove(key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In what case this may be null?

KeyGenUtils.getRecordKey(rec, this.recordKeyField, false), "");
switch (row.getOp()) {
case INSERT:
sinkRowMap.insert(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In such a simple case, I think maybe we don't need this row map. Hudi's log file can be used to append change logs. This way we can append records when writing, rather than writing all in clear.

Comment on lines +78 to 81
Some(s) if s == "iceberg" || s == "hudi" => {
// iceberg with multiple parallelism will fail easily with concurrent commit
// on metadata
// TODO: reset iceberg sink to have multiple parallelism
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Re: lines 78 to 82]

please change the info! contents also

See this comment inline on Graphite.

@wenym1 wenym1 marked this pull request as draft May 16, 2024 06:08
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants