Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: allow configure message.timeout.ms and max.in.flight for kafka sink #12574

Merged
merged 4 commits into from
Sep 28, 2023

Conversation

hzxa21
Copy link
Collaborator

@hzxa21 hzxa21 commented Sep 27, 2023

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

  • Expose message.timeout.ms and max.in.flight.requests.per.connection for kafka sink.
  • Change the default max.in.flight.requests.per.connection to 5, which is recommended in the confluent doc. Note that the previous default set by librdkafka is 1000000, which is not reasonable since it will create excessive produce requests that can potentially overload kafka broker and may cause a large number of duplicates on network blips.
  • Remove unused kafka sink option timeout and use_transaction

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

  • Expose message.timeout.ms and max.in.flight.requests.per.connection for kafka sink.
  • Change the default max.in.flight.requests.per.connection to 5.

See here for the meaning of these kafka client configs.

@github-actions github-actions bot added the type/fix Bug fix label Sep 27, 2023
@xzhseh
Copy link
Contributor

xzhseh commented Sep 27, 2023

Should we also change the default batch.size for librdkafka in order to be conformed with confluent?
The current default is 1MB by the way.
In addition, since the number of in-flight requests sent by rdkafka is significantly changed, will this affect the current buffered FutureRecord? (Which is essentially the sent but not yet delivered (These records may be buffered in client, or may be in-flight) record.)

CleanShot 2023-09-27 at 19 06 20@2x

Copy link
Contributor

@tabVersion tabVersion left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!
exposing batch.size is nice to have option

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Sep 28, 2023

Should we also change the default batch.size for librdkafka in order to be conformed with confluent?
The current default is 1MB by the way.

1MB looks preferable to me because we prefer high throughput over low latency. We used to set it to 20MB in production so 1MB is reasonable in our case.

In addition, since the number of in-flight requests sent by rdkafka is significantly changed, will this affect the current buffered FutureRecord? (Which is essentially the sent but not yet delivered (These records may be buffered in client, or may be in-flight) record.)

The max.inflight controls the max inflight produce requests per destination broker. Since records are batched per topic partition using batch.size and produce request will further batch the record batches across different topic partitions per destination broker, I don't think setting max.inflight to 5 will significantly affect the production throughput. On the other hand, setting a large max.inflight may affect performance since record batches produced to the same topic partition are applied and persisted in order on the kafka broker. Having too many inflight record batches in the broker side can overload the broker and cause more delays.

@hzxa21
Copy link
Collaborator Author

hzxa21 commented Sep 28, 2023

LGTM! exposing batch.size is nice to have option

batch.size is already exposed via properties.batch.size

@hzxa21 hzxa21 added this pull request to the merge queue Sep 28, 2023
@codecov
Copy link

codecov bot commented Sep 28, 2023

Codecov Report

Merging #12574 (cd8a5a8) into main (454e72d) will increase coverage by 0.00%.
Report is 11 commits behind head on main.
The diff coverage is 65.00%.

@@           Coverage Diff           @@
##             main   #12574   +/-   ##
=======================================
  Coverage   69.41%   69.41%           
=======================================
  Files        1469     1469           
  Lines      240680   240684    +4     
=======================================
+ Hits       167064   167067    +3     
- Misses      73616    73617    +1     
Flag Coverage Δ
rust 69.41% <65.00%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
src/connector/src/sink/kafka.rs 34.00% <65.00%> (-0.09%) ⬇️

... and 4 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Merged via the queue into main with commit 78b83df Sep 28, 2023
26 of 27 checks passed
@hzxa21 hzxa21 deleted the patrick/kafka-producer-config branch September 28, 2023 11:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/fix Bug fix
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants