Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(sink): support new syntax FORMAT ... ENCODE ... similar to source #12556

Merged
merged 12 commits into from
Oct 4, 2023

Conversation

xiangjinwu
Copy link
Contributor

@xiangjinwu xiangjinwu commented Sep 27, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

#11451

Old syntax:

create sink snk from mv with (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'test_topic',
  type = 'append-only');

New syntax (similar to source):

create sink snk from mv with (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'test_topic')
format plain encode json;

create sink snk from mv with (
  connector = 'kafka',
  properties.bootstrap.server = 'localhost:9092',
  topic = 'test_topic')
format upsert encode protobuf (
  schema.location = 'location',
  message = 'main_message');
  • Updated sqlparser to parse the new syntax.
  • Introduce SinkFormatDesc in rust and proto to pass around this info.
  • Extend SinkDesc/SinkCatalog/SinkParam in rust and SinkDesc/Sink/SinkParam in proto with an extra optional field format_desc.
  • Replace SinkType and SINK_TYPE_OPTION with SinkFormatDesc in kafka/kinesis/pulsar using SinkFormatterImpl.

Next step

In SinkFormatterImpl::new, convert param.format_desc to ProtobufProperties/AvroProperties and reuse ProtobufParserConfig::new/AvroParserConfig::new to get prost_reflect::MessageDescriptor/apache_avro::Schema needed by encoders #12425

Compatibility

  • The new syntax is only enabled when connector is kafka/kinesis/pulsar. We will test other connectors and migrate them one by one.
  • The sqlparser and handler still accepts both syntax. It will derive SinkFormatDesc for the old syntax.
  • When decoding proto SinkDesc/Sink/SinkParam, SinkFormatDesc will also be derived from old syntax.

Limitations

Some refactor / reuse is possible but not done here to limit the scope:

  • In sqlparser, SourceSchemaV2 is same as SinkSchema (easy to rename).
  • Common fields in SinkDesc/SinkCatalog/SinkParam may be extracted to a struct.
  • SourceStruct may be absorbed into SpecificParserConfig, then consolidated with SinkFormatDesc.
  • SinkType is similar to SinkFormat but cannot be replaced yet:
SinkType properties.get(SINK_TYPE_OPTION) SinkFormat
AppendOnly SINK_TYPE_APPEND_ONLY AppendOnly
ForceAppendOnly SINK_TYPE_APPEND_ONLY AppendOnly
Upsert SINK_TYPE_UPSERT Upsert
Upsert SINK_TYPE_DEBEZIUM Debezium

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

For kafka/kinesis/pulsar sink, the preferred syntax is now FORMAT [PLAIN | UPSERT | DEBEZIUM] ENCODE JSON rather than type = ['append-only' | 'upsert' | 'debezium']. This reads similar to source and enables support of different encodings later. Other connectors (clickhouse, jdbc, etc) still use the old type = syntax as of now.

@codecov
Copy link

codecov bot commented Sep 27, 2023

Codecov Report

Merging #12556 (213c760) into main (ed576b2) will decrease coverage by 0.05%.
The diff coverage is 30.81%.

@@            Coverage Diff             @@
##             main   #12556      +/-   ##
==========================================
- Coverage   69.32%   69.27%   -0.05%     
==========================================
  Files        1469     1469              
  Lines      241065   241256     +191     
==========================================
+ Hits       167111   167129      +18     
- Misses      73954    74127     +173     
Flag Coverage Δ
rust 69.27% <30.81%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/connector/src/sink/catalog/desc.rs 100.00% <100.00%> (ø)
src/frontend/planner_test/src/lib.rs 87.36% <100.00%> (+0.04%) ⬆️
src/frontend/src/optimizer/mod.rs 92.21% <100.00%> (+0.03%) ⬆️
src/meta/src/manager/sink_coordination/manager.rs 89.38% <100.00%> (+0.07%) ⬆️
src/stream/src/executor/sink.rs 96.22% <100.00%> (+0.02%) ⬆️
src/connector/src/source/external.rs 34.96% <0.00%> (-0.13%) ⬇️
src/sqlparser/src/ast/statement.rs 54.83% <96.55%> (+1.80%) ⬆️
src/connector/src/sink/mod.rs 58.82% <70.00%> (+0.88%) ⬆️
src/frontend/src/utils/with_options.rs 74.26% <0.00%> (-1.68%) ⬇️
...rc/frontend/src/optimizer/plan_node/stream_sink.rs 80.21% <79.16%> (-0.02%) ⬇️
... and 7 more

... and 6 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Copy link
Contributor

@st1page st1page left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Maybe some parser test is needed?
And should we notice user when they are using legacy syntax? #12373

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xiangjinwu
Copy link
Contributor Author

And should we notice user when they are using legacy syntax? #12373

Good point. But the new syntax is not actually effective yet. So maybe we can add them when it actually works.

but we still need a connector x format x encode table for compatibility check https://github.com/singularity-data/risingwave/blob/fff6d47d897b859e1128029a3fe19b5d41a029f8/src/frontend/src/handler/create_source.rs#L853

As of today the old syntax compatibility check happens in compute node (build_sink linked above) rather than in frontend. This is different from source for 2 reasons (as far as I can tell):

  • No SourceExecutor is created immediately on create source (not create table).
  • Protobuf/avro need to fetch column names and data types immediately in frontend

That being said, we could move/add the check to frontend as a refactor later if appropriate.

@xiangjinwu xiangjinwu force-pushed the feat-sqlparser-sink-format-encode branch from 2910cba to e6942fc Compare September 29, 2023 06:39
@xiangjinwu xiangjinwu added the user-facing-changes Contains changes that are visible to users label Sep 29, 2023
@xiangjinwu
Copy link
Contributor Author

Major changes since the reviewed 3c0cfc9:

  • The new syntax is now limited to only kafka/kinesis/pulsar, which is using SinkFormatterImpl. Other connecters will be tested and migrated later one by one. For example, clickhouse sink does not take an encode today, and elasticsearch sink does not even require type = .
  • Kafka/kinesis/pulsar is now fully functioning with the new syntax, and does not rely on legacy SinkType or SINK_TYPE_OPTION. This made the PR more complete, and user-facing, but also doubled the lines.

@xiangjinwu xiangjinwu added this pull request to the merge queue Oct 4, 2023
Merged via the queue into main with commit a296a06 Oct 4, 2023
35 of 36 checks passed
@xiangjinwu xiangjinwu deleted the feat-sqlparser-sink-format-encode branch October 4, 2023 04:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants