Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Partition Resumes #199

Merged
merged 7 commits into from
Mar 11, 2021
Merged

Fix Partition Resumes #199

merged 7 commits into from
Mar 11, 2021

Conversation

garyrussell
Copy link
Contributor

@garyrussell garyrussell commented Mar 3, 2021

ConsumerEventLoop.PollEvent should not resume partitions that it, itself,
has not paused.

Prior to d908b81
pause/resume due to backpressure was protected by a boolean but it still unconditionally
resumed all partitions.

  • reintroduce the boolean and only resume if we paused
  • keep track of user-paused partitions and exclude them from resuming

Also fixes #190
Also fixes #198

@garyrussell garyrussell added the type/bug A general bug label Mar 3, 2021
@garyrussell
Copy link
Contributor Author

cc/ @artembilan - I couldn't tag you for a review - maybe you aren't in this org?

@artembilan
Copy link
Contributor

True. Here are people of this org: https://github.com/orgs/reactor/people.
But that still does not prevent us from reviewing.
Although looks like the merge is still only their prerogative 😄

Copy link
Member

@simonbasle simonbasle left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need a few clarifications

if (pausedByUs.getAndSet(false)) {
Set<TopicPartition> toResume = new HashSet<>(consumer.assignment());
toResume.removeAll(this.pausedByUser);
consumer.resume(toResume);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it relevant to keep the pausedByUser collection full after this point?
ie. all the branches where pausedByUs == false will clear the collection and refill it, but that clear is thus done in a lazy fashion

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; yes, it can be cleared here.

@@ -1190,6 +1190,29 @@ public void abortTransaction() throws Exception {
checkConsumedMessages(count, count * 3);
}

@Test
public void userPause() throws InterruptedException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does the test really cover externally paused topic partitions? ie. would there be a way to have 2 partitions paused and verify that the PollEvent distinguishes between the two more explicitly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes; more test coverage is needed; I originally wrote this test just to demonstrate that the paused partition was incorrectly resumed.

The notion of user Vs. our pauses only came to me while fixing the bug, but I didn't enhance the test to cover that use case.

@@ -230,12 +236,22 @@ public void run() {
long r = requested;
if (r > 0) {
if (!awaitingTransaction.get()) {
consumer.resume(consumer.assignment());
if (pausedByUs.getAndSet(false)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can two run() execute concurrently? ie could it be that while we atomically switch pausedByUs to false here, the line 245 below gets executed? or is it all serialized by the awaitingTransaction and requested somehow ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't believe so, the scheduler is a Schedulers.newSingle().

`ConsumerEventLoop.PollEvent` should not resume partitions that it, itself,
has not paused.

Prior to reactor@d908b81
pause/resume due to backpressure was protected by a boolean but it still unconditionally
resumed all partitions.

- reintroduce the boolean and only resume if we paused
- keep track of user-paused partitions and exclude them from resuming
- enhance test to verify proper resumption after auto pause
- continue to schedule polls while paused

Now also resolves reactor#190
Fixes reactor#198

Only one `PollTask` needs to be scheduled at a time.

Also upgrade docker image.
- `onRequest()` can be called on a different thread.
@@ -116,6 +118,9 @@
}

void onRequest(long toAdd) {
if (log.isDebugEnabled()) {
log.debug("onRequest.toAdd:" + toAdd);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably need a space after colon to make it more readable in the logs

consumer.pause(consumer.assignment());
log.debug("Paused 2");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the info does it bring for end-users in the logs?
I mean this one and the previous Paused 1 .
Doesn't look like very useful...

public abstract class AbstractKafkaTest {

public static final int DEFAULT_TEST_TIMEOUT = 60_000;

private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.1"))
private static final KafkaContainer KAFKA = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.1.0"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use a latest image here?
I mean is it possible to have it without version at all, which means latest?

@@ -0,0 +1,29 @@
# Copyright (c) 2016-2018 Pivotal Software Inc, All Rights Reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's kinda weird to have a Copyright in the non-Java files, but OK if it is what we have so far in this project.
Anyway: I see you have modified Copyright to VMware in other place. So, this one might be affected, too.

On the other hand the Log4J 1.x is out of life for a while already: https://logging.apache.org/log4j/1.2/ 😄

Different issue though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - I changed the wrong one - intended to just change the new one. Need to figure out if they all need changing.

Yes; I am going to open another issue about log4j.

if (r > 1 || commitEvent.inProgress.get() > 0) {
schedule();
}
schedule();
}

Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since poll returns empty ConsumerRecords for paused partitions, sink's queue quickly gets overflowed with garbage references, what could lead to the OOM, since empty ConsumerRecords is effectively added on each schedule call. Also, such logic breaks backpressure logic as empty ConsumerRecords is treated as satisfying demand from the downstream when it is clearly isn't.

In order to fix this it is enough to replace

Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
sink.emitNext(records, ConsumerEventLoop.this);

With

if (!records.isEmpty()) {
    Operators.produced(REQUESTED, ConsumerEventLoop.this, 1);
    log.debug("Emmiting {} records", records.count());
    sink.emitNext(records, ConsumerEventLoop.this);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had that before, but restored it because onRequest() was never called (it's called downstream from the sink) so we never resumed the consumer.

That might have been on an earlier iteration of this patch; I'll look again tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After further review, I had to pull that change because KafkaReceiverTests.userPause() now fails, because now there is no thread to call onRequest() to resume the consumer. Need to give this some more thought.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was mistaken; that code looks fine; there is something else going on that I need to track down tomorrow.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
5 participants