Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(source): use fragment id only as Kafka consumer group id #16111

Merged
merged 3 commits into from
Apr 19, 2024
Merged

Conversation

xxchan
Copy link
Member

@xxchan xxchan commented Apr 3, 2024

I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.

What's changed and what's your intention?

We use assign to manually assign topics and partitions to Kafka consumers, instead of subscribe. In this case, consumer group related features are not used at all.

  • This change is safe: There will be no conflicts between different actors in the same group.
    • (most importantly) When they read data, the committed offsets in the group are not used at all. (Unless using group offset on startup. For existing sources, stored offsets will be used)
      • Note that even if we create multiple Kafka consumers assigned to the same partitions, they can both consume 1 copy of data without conflict. They work independently.
    • They only update the committed offsets of their assigned partitions. So they will not override other actors' committed offset.
  • This change increase utility: The committed offset is only for monitoring progress/lag. For this usage, it also make more sense to use a group id at the job (fragment) level, instead of actor level.
  • This change reduces the number of consumer groups greatly, and thus can reduce user's confusion.

For more details explaining stuff, check this doc: https://www.notion.so/risingwave-labs/Notes-on-Kafka-Consumer-Consumer-Group-6bafdbab58b34ad7917fe47645f9a862

Checklist

  • I have written necessary rustdoc comments
  • I have added necessary unit tests and integration tests
  • I have added test labels as necessary. See details.
  • I have added fuzzing tests or opened an issue to track them. (Optional, recommended for new SQL features Sqlsmith: Sql feature generation #7934).
  • My PR contains breaking changes. (If it deprecates some features, please create a tracking issue to remove them in the future).
  • All checks passed in ./risedev check (or alias, ./risedev c)
  • My PR changes performance-critical code. (Please run macro/micro-benchmarks and show the results.)
  • My PR contains critical fixes that are necessary to be merged into the latest release. (Please check out the details)

Documentation

  • My PR needs documentation updates. (Please use the Release note section below to summarize the impact on users)

Release note

If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.

@@ -162,7 +162,6 @@ pub struct SourceEnumeratorInfo {
pub struct SourceContext {
pub actor_id: u32,
pub source_id: TableId,
// There should be a 1-1 mapping between `source_id` & `fragment_id`
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is wrong. 1 source_id can correspond to multiple MV's SourceExecutors, and thus multiple fragment_ids

@xxchan xxchan requested review from tabVersion, fuyufjh and shanicky and removed request for fuyufjh April 3, 2024 03:13
@xxchan xxchan changed the title feat(source): use fragment id only as consumer group id feat(source): use fragment id only as Kafka consumer group id Apr 3, 2024
Copy link
Member

@fuyufjh fuyufjh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM as long as it can work

@tabVersion
Copy link
Contributor

tabVersion commented Apr 5, 2024

can we do a test about the behavior? I think we no longer need to migrate to low level API as long as it can work.

For example, have three consumers on 4 partitions in a topic. Then shut down one of the consumers to see if the other two work as expected (split assignment not changed).

& what's the behavior in shared source?

@xxchan
Copy link
Member Author

xxchan commented Apr 5, 2024

can we do a test about the behavior?
For example, have three consumers on 4 partitions in a topic. Then shut down one of the consumers to see if the other two work as expected (split assignment not changed).

Testing for sure (I'm quite frustrated that we don't have any existing serious tests about partitioned topics & split changes :(

But I don't understand the scenario you proposed here. By "consumer" do you mean RisingWave or just Kafka consumers? Do they have the same group id?

I've manually tested like this (outside risingwave): 5 partition topic t, c1 c2 both assigned to t's partition 1, to verify that they both receive one copy of data. Can you elaborate what properties you want to ensure? Feel free to raise any concerns, but please describe the scenario clearly 🙏

& what's the behavior in shared source?

All behavior is the same as before. This PR doesn't not change behavior in RisingWave. The behavior is according to how partitions are assigned to actors (by source manager). Group ID has nothing to do with the actual assignment.

@xxchan
Copy link
Member Author

xxchan commented Apr 19, 2024

test updated @tabVersion

@xxchan xxchan added the user-facing-changes Contains changes that are visible to users label Apr 19, 2024
@xxchan xxchan enabled auto-merge April 19, 2024 08:35
@tabVersion
Copy link
Contributor

now we have split change tests in CI. Split migration can also change consumer group id in prev impl so can we also cover this?

cc @shanicky Can you help design the tests?

@xxchan
Copy link
Member Author

xxchan commented Apr 19, 2024

Related issues #15591 #15994

@xxchan xxchan added this pull request to the merge queue Apr 19, 2024
Merged via the queue into main with commit efa1fda Apr 19, 2024
32 of 33 checks passed
@xxchan xxchan deleted the xxchan/group branch April 19, 2024 09:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/feature user-facing-changes Contains changes that are visible to users
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants