Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Consumer pausing and resuming consumption of individual partitions #417

Merged
merged 12 commits into from
Jul 15, 2019

Conversation

JaapRood
Copy link
Collaborator

@JaapRood JaapRood commented Jul 5, 2019

To address #396.

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 5, 2019

I did a first draft of the required to see what the implications would be for implemention, public API, etc. In doing so I've identified some friction with the current public API of consumer.pause and consumer.resume.

Both already accept an array partitions to be specified with the topic name. The issue arises is what happens when omitting this partitions array, or including an empty one. The rest of the changes so far suggests that if the partitions would be empty, nothing would be paused, but so far the behaviour has been the opposite: the entire topic is paused.

The question here is whether we have to break compatibility to make this change or not, or whether we can find some reasonable way to maintain the current behaviour.

I'd propose the following:

  • when typeof partitions === undefined pause all current and future (can change through rebalance events) partitions
  • when Array.isArray(partitions) pause only those partitions, no partitions when empty
  • otherwise throw an error

I might give that approach a go and see if I can get the existing tests to pass, that should be a good indicator whether it's at all something we could consider.

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 5, 2019

The pause.spec.js tests are passing without any changes to the tests, so that must mean we're doing something right :)

However, the implementation also brought an issue to light with the tracking of only the paused partitions. When having paused "all" partitions through calling consumer.pause([{ topic }]), you then can't resume any partitions specifically. That's because the SubscriptionState does not have access to the partitions that have actually been assigned to the consumer group.

This could be solved by not only keeping a collection of paused topics, but also resumed. An alternative would be for the SubscriptionState to own the state describing what topics and partitions are assigned (currently in consumerGroup.topics and consumerGroup.memberAssignment), which given it's name suggests that might be the right thing to do anyway. That approach would also clean up consumerGroup.fetch which now has to join the paused with the assigned manually (although it's the only place where that needs to happen, so perhaps that's fine).

I'll have a look at that next, as well as what other pre-existing tests are failing. From there, we can start to add tests covering the new behaviour.

Edit: it makes more sense to get new behaviour tests in first, so we can actually prove the refactoring is necessary by showing failing tests!

@tulios
Copy link
Owner

tulios commented Jul 8, 2019

Nice, and I agree with your proposal. I can try to get an early review today.

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 8, 2019

The version I just pushed fixes and tests the basic implementation of pausing and resuming single partitions.

It does not yet cover the case where we pause by only topic, to then resume a specific partition, which is currently unsupported. I'm not sure whether it's worth the refactoring required at this point to make that work, as I don't think it's proven this is an actual use case. I reckon you either choose topics as your unit or partitions in this case.

With the meat of the changes here, it's ready for review while I get some other tests passing, like the subscription state unit tests.

@JaapRood JaapRood marked this pull request as ready for review July 8, 2019 10:33
@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 8, 2019

The PR should be in pretty good shape now, with fixed unit tests, a happy linter and documentation for the enhancement. It seems like the current failure of the tests is unrelated to the change and probably attributable to flakiness!

@JaapRood
Copy link
Collaborator Author

JaapRood commented Jul 8, 2019

Just fyi, #421 will impact this, as it exposes the internals of SubscriptionState that have changed in this version. I'd recommend making these changes first, to then resolve how to expose it through the public API.

} else if (Array.isArray(partitions) && !pausedForTopic.all) {
partitions.forEach(partition => pausedForTopic.partitions.delete(partition))
} else if (Array.isArray(partitions) && pausedForTopic.all) {
// TODO: consider whether we should actively track active topics, rather than paused ones, as to avoid this,
Copy link
Collaborator Author

@JaapRood JaapRood Jul 8, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll probably want to either do something about this, or document it elsewhere. I don't think it's a proven use case yet, so documenting it and leaving the throw for now seems like the way to go.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we should probably make a note of this in an issue or something and bundle it into some future API breaking change instead, because the current behavior is pretty confusing (but totally the right call, given backwards compatibility).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the API difference between calling with or withouts partitions specified? This comment was about the error we have to throw when we've previously paused all topics by not specifying partitions, to then try to resume a single partition. My idea so far has been that people use either full topic pausing, or per partition, but if this thrown Error is too confusing, we can go with bringing the current member assignment into SubscriptionState, which would allow mixing the two.

This could be solved by not only keeping a collection of paused topics, but also resumed. An alternative would be for the SubscriptionState to own the state describing what topics and partitions are assigned (currently in consumerGroup.topics and consumerGroup.memberAssignment), which given it's name suggests that might be the right thing to do anyway. That approach would also clean up consumerGroup.fetch which now has to join the paused with the assigned manually (although it's the only place where that needs to happen, so perhaps that's fine).

Maybe that's just the way to go anyway, as it would probably clean things up a bit, too. I'll have a look at how much is involved by having a small go at it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was talking about the general problem of calling the public API with or without partitions. I'm not so sure that the assumption that people will either pause entire topics or individual partitions is valid, but I'm perfectly fine with us making the decision to go with it until we discover otherwise.

If you want to look into moving assigned partitions into SubscriptionState, you're very welcome to. I agree that it's likely the way to go, but I don't think it necessarily has to be done within the scope of this PR.

@Nevon Nevon merged commit 2b3e159 into tulios:master Jul 15, 2019
@JaapRood JaapRood deleted the feature/pause-partitions branch July 15, 2019 09:03
@tulios
Copy link
Owner

tulios commented Jul 16, 2019

Just a note to myself, update the typescript types before releasing this.

@JaapRood
Copy link
Collaborator Author

@tulios how can I help with that in the future? Must say I haven't really used Typescript apart from some small experiments, but would like to offer complete PR's in the future!

@tulios
Copy link
Owner

tulios commented Jul 18, 2019

@JaapRood we are also getting used with the typescript flow, so don't worry. I made this mistake several times by now. 😓

I am planning to add a PR template with a checklist of things we should be aware

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants