Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need help on backpressure and auto pause / resume during high load for reactive consumer #190

Closed
pavanpkp33 opened this issue Feb 6, 2021 · 4 comments · Fixed by #199
Closed
Labels
type/bug A general bug
Milestone

Comments

@pavanpkp33
Copy link

pavanpkp33 commented Feb 6, 2021

Hi All,

I have a reactive kafka consumer which consumes data from a topic and pushes it to an endpoint. However, these endpoints are flaky and are error prone. I have circuit breakers which trip on failures and I induce an artificial delay Mono.delay to slow down the pipeline. By theory, reactive backpressure should kick in and pause the kafka consumers right? I see that If i set my max.poll.interval.ms to low number, it does not pause the consumer and it results in rebalancing. How can I make sure that the Kafka consumer applies back-pressure and pauses the consumption until the downstream messages are done processing?
Here's my consumer code:

protected Disposable pollRecordsWithAutoCommit(KafkaReceiver<byte[], T> receiver) {
    return receiver
        .receiveAutoAck()
        .onErrorStop()
        .flatMap(
            consumerRecordFlux -> processFluxStream(consumerRecordFlux),
            jobConfiguration.getConsumerParallelPower())
        .onErrorContinue(
            // ALL uncaught exceptions will be published here.
            // TODO:: post to DLQ with retry count or possibly other meta data
            // `o` is the KAFKA record
            (throwable, o) -> {
              log.error(
                  "Error while processing record. [error:{}, record:{}]", throwable.toString(), o);
            })
        .subscribe();

processFluxStream has Mono.delay code to slow down incase of circuit breaker errors.

How do I go about linking the reactive pipeline to pause and resume on these signals and also avoid rebalancing issues?
I read about an earlier issue which had the same problem but that does not have a solution.

Thanks.

@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Feb 6, 2021
@simonbasle
Copy link
Member

Which version of reactor-kafka are you using? Version 1.3.x was overhauled and might behave better in terms of backpressure pausing consumption. @garyrussell any insight ?

@simonbasle simonbasle added for/stackoverflow Questions are best asked on SO or Gitter for/user-attention This issue needs user attention (feedback, rework, etc...) and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 4, 2021
@garyrussell
Copy link
Contributor

@simonbasle I have done some analysis (with 1.3.3-SNAPSHOT). Here is what I found...

        Disposable flux = receiver.receive()
            .doOnNext(rec -> {
                System.out.println(rec);
            })
            .subscribe();

If I set a breakpoint on the System.out, I see the main dispatcher thread hung (so polling is stopped). So...

        Disposable flux = receiver.receive()
            .publishOn(Schedulers.newSingle("mightHang"))
            .doOnNext(rec -> {
                System.out.println(rec);
            })
            .subscribe();

Now, the polling thread continues to poll. When the requests count down to 0 (due to the back pressure), we correctly pause the consumer.

However, due to this code....

if (isActive.get()) {
if (r > 1 || commitEvent.inProgress.get() > 0) {
schedule();
}
}

...we don't schedule the PollTask because r == 0 and so we stop polling.

It looks to me like we should always reschedule, even when r is 0.

When I change it to unconditionally schedule, it works as expected (keep polling while paused, returning 0 records on each poll).

When I release the breakpoint, we resume the consumer as expected (I was actually testing with my PR in place).

However, there is one more oddity.

Initially, requested is set to 256 and counts down to 64 before being replenished to 256.

After forcing the back pressure, it counts down to 0 and we pause the consumer.

When I released the break point and we resumed the consumer, requests went to 767, counted down to 744; then it went to 936, counting down to 744, with the latter sequence repeated over and over.

@pavanpkp33
Copy link
Author

@simonbasle I'm using 1.3.0. @garyrussell any future releases planned to fix this? Thanks.

@garyrussell
Copy link
Contributor

I will try to get a fix into 1.3.3, due in a couple of weeks.

@garyrussell garyrussell added type/bug A general bug and removed for/stackoverflow Questions are best asked on SO or Gitter for/user-attention This issue needs user attention (feedback, rework, etc...) labels Mar 4, 2021
@garyrussell garyrussell added this to the 1.3.x Backlog milestone Mar 4, 2021
garyrussell added a commit to garyrussell/reactor-kafka that referenced this issue Mar 4, 2021
- enhance test to verify proper resumption after auto pause
- continue to schedule polls while paused

Now also resolves reactor#190
@garyrussell garyrussell modified the milestones: 1.3.x Backlog, 1.3.3 Mar 5, 2021
garyrussell added a commit to garyrussell/reactor-kafka that referenced this issue Mar 10, 2021
- enhance test to verify proper resumption after auto pause
- continue to schedule polls while paused

Now also resolves reactor#190
garyrussell added a commit that referenced this issue Mar 11, 2021
* Fix Partition Resumes

`ConsumerEventLoop.PollEvent` should not resume partitions that it, itself,
has not paused.

Prior to d908b81
pause/resume due to backpressure was protected by a boolean but it still unconditionally
resumed all partitions.

- reintroduce the boolean and only resume if we paused
- keep track of user-paused partitions and exclude them from resuming

* Address PR Comments

- enhance test to verify proper resumption after auto pause
- continue to schedule polls while paused

Now also resolves #190

* Fix memory leak.

Fixes #198

Only one `PollTask` needs to be scheduled at a time.

Also upgrade docker image.

* Use AtomicBoolean for scheduled flag.

- `onRequest()` can be called on a different thread.

* Address PR Comments.

* Fix copyrights.

* Don't emit empty `ConsumerRecords`.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants