Skip to content

Conversation

garyrussell
Copy link
Contributor

Resolves #2581

If a rebalance occurred while pending out of order commits were present, the pending commits were correctly purged, however, the consumerPaused boolean remained true.

This prevented the consumer from being paused again while the next batch of records is being processed and subsequent polls returned more records.

Reset the consumerPaused boolean when all pending commits are purged.

Also, with a cooperative assignor, ensure that any newly assigned partitions are paused if pending commits are present.

Add tests to verify the correct behavior with a legacy and cooperative assignor.

Also tested with the reporter's reproducer.

cherry-pick to 2.9.x

Resolves spring-projects#2581

If a rebalance occurred while pending out of order commits were present,
the pending commits were correctly purged, however, the `consumerPaused`
boolean remained true.

This prevented the consumer from being paused again while the next batch
of records is being processed and subsequent polls returned more records.

Reset the `consumerPaused` boolean when all pending commits are purged.

Also, with a cooperative assignor, ensure that any newly assigned partitions
are paused if pending commits are present.

Add tests to verify the correct behavior with a legacy and cooperative
assignor.

Also tested with the reporter's reproducer.

**cherry-pick to 2.9.x**

@SuppressWarnings({ "unchecked", "rawtypes" })
@Test
void pruneRevokedPartitionsFromPendingOutOfOrderCommitsLegacyAssignor() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gary, I don't see any Assignor settings in the test code.
How does it work, please?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's simulated.

With a legacy assignor, all current assignments are revoked and all (or a subset) are re-assigned. With a coop assignor, only a subset are revoked and (possibly) previously unassigned partitions might be assigned.

See the case 1: poll phases in both tests - that is where the simulation is. In the coop case, I test that consumer.pause() is called twice (once for the original assignments and once for the newly assigned partition).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's cool. Thank you!

Co-authored-by: Artem Bilan <abilan@vmware.com>
@artembilan
Copy link
Member

Cherry-picked as c8088ef after some conflict fixes.

@garyrussell garyrussell deleted the GH-2581 branch February 16, 2023 16:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Container Does Not Pause for Missing Acks After a Rebalance

2 participants