Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OverflowException in Flux.interval when kafka broker is unavailable for some time #29

Closed
mlex opened this issue Apr 16, 2018 · 1 comment
Assignees
Labels
type/bug A general bug
Milestone

Comments

@mlex
Copy link

mlex commented Apr 16, 2018

We noticed the following exception in our test environments:

ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception 
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: Could not emit tick 272 due to lack of requests (interval doesn't support small downstream requ
ests that replenish slower than the ticks)
Caused by: reactor.core.Exceptions$OverflowException: Could not emit tick 272 due to lack of requests (interval doesn't support small downstream requests that replenish slower than the ticks)
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:215)
        at reactor.core.publisher.FluxInterval$IntervalRunnable.run(FluxInterval.java:121)
        at reactor.core.scheduler.PeriodicWorkerTask.call(PeriodicWorkerTask.java:59)
        at reactor.core.scheduler.PeriodicWorkerTask.run(PeriodicWorkerTask.java:73)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

The reason seems to be the Flux.interval() in DefaultKafkaReceiver, see https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L311.

When the connection to the kafka broker is disrupted, then the consumer.poll() execution can hang indefinitely. This means that the eventScheduler is blocked and can not process the CommitEvents anymore. After enough CommitEvents queued up, the interval cannot emit more events and throws the above exception.

To reproduce, simply stop the kafkabroker while the application is running. After some time (depending on the commitInterval - using the default interval of 5 seconds, it might take up to 256*5=1280 seconds) the above exception is thrown because the queue of the PublishOnSubscriber (used here: https://github.com/reactor/reactor-kafka/blob/master/src/main/java/reactor/kafka/receiver/internals/DefaultKafkaReceiver.java#L317) is full.

@smaldini smaldini added this to the 1.0.1.RELEASE milestone Jun 15, 2018
@smaldini smaldini added type/bug A general bug and removed type/bug A general bug labels Jun 15, 2018
@OlegDokuka OlegDokuka self-assigned this Jul 7, 2018
@OlegDokuka OlegDokuka added the WIP label Jul 7, 2018
OlegDokuka added a commit that referenced this issue Jul 8, 2018
…ilable for some time

- added `onBackpressureLatest` in order to skip emitted intervals during a hang
OlegDokuka added a commit that referenced this issue Jul 9, 2018
#29 fix OverflowException in Flux.interval when kafka broker is unavailable for some time
@OlegDokuka
Copy link
Contributor

fixed by #40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants