Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use SubscriptionState to track member assignment and how that's affected by pause / resume logic #429

Merged
merged 15 commits into from
Jul 18, 2019

Conversation

JaapRood
Copy link
Collaborator

@JaapRood JaapRood commented Jul 15, 2019

Closes #427.

@JaapRood
Copy link
Collaborator Author

Starting from what I imagine the enhanced API of SubscriptionState to look like, it seems definitely like a net gain to me in terms of clarity. While logic living between two files is always a bit of a trade off, I think it now becomes a lot more explicit what other logic governs the member assignment and how it should be used.

From trying to fit the consistent API of SubscriptionState, accepting Array<TopicPartitions>, it became apparent that the code that calls it should keep track of it's newly received assignment in a slightly different format. All consumer tests passing should prove that this should be fine.

So far, while the SubscriptionState assignment is being set as part of consumerGroup.sync, it's not used anywhere yet. I'll look at that next, which should give good insight whether these drafted changes actually have the desired effect.

@JaapRood JaapRood marked this pull request as ready for review July 15, 2019 13:48
@JaapRood
Copy link
Collaborator Author

I'm happy to consider this ready for merging with the progress made.

subscriptionState.active is now used to determine which partitions to fetch for, taking into consideration all pausing and resuming logic. The paused method now renders which of the assigned partitions are paused, whether an entire topic or just the individual partitions were paused, making it ready for public consumption (see #421).

Now that subscriptionState is aware of the assigned partitions, we could also add the tracking of explicitly resumed partitions, removing an entire type of Error that can be thrown. Basically you can now do this:

consumer.pause([{ topic: 'topic1' }]) // this pauses all current and future partitions for this topic

// This resumes consumption of these specific 2 partitions, while the rest remains paused. 
// Threw error before
consumer.resume([{ topic: 'topic1', partitions: [1,2]}]) 

I had a look whether there were other things of ConsumerGroup that would be better housed in SubscriptionState, but everything else is too closely related to the internals of the consumer group to qualify.

@JaapRood
Copy link
Collaborator Author

Looks like tests are failing because of an actual problem for once 😂. On it!

@JaapRood
Copy link
Collaborator Author

Turned out that the runner was accessing memberAssignment as well. Instead of exposing that once again, I added consumerGroup.assigned, which proxies subscriptionState.assigned, just like it does with paused. A bit of format conversion was needed, to not break the public API of the GROUP_JOIN event.

@tulios tulios self-requested a review July 17, 2019 08:47
Copy link
Owner

@tulios tulios left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great stuff, I made some small cosmetic changes.

@tulios
Copy link
Owner

tulios commented Jul 17, 2019

@JaapRood can you update the typescript types with the new methods and any signature changes? I can do it if you can't.

@JaapRood
Copy link
Collaborator Author

@tulios if you're in the position to do so swiftly, go right ahead. I haven't touched TS in a while and not past some experiments, so would have to take some more time than I have available for it today! Happy to include it in future PR's, though :)

@tulios
Copy link
Owner

tulios commented Jul 18, 2019

I have updated the types; I will merge after a green build.

@tulios tulios merged commit 690f68a into tulios:master Jul 18, 2019
@JaapRood JaapRood deleted the feature/subscription-state branch July 18, 2019 08:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move member assignment state into SubscriptionState and enable it to apply pause logic to concrete assignments
3 participants