Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer buffer of flowable leaks memory #7617

Closed
sr1996-glitch opened this issue Nov 9, 2023 · 3 comments
Closed

Consumer buffer of flowable leaks memory #7617

sr1996-glitch opened this issue Nov 9, 2023 · 3 comments

Comments

@sr1996-glitch
Copy link

sr1996-glitch commented Nov 9, 2023

I have noticed a strange behavior while using rxjava with below code snippet

version : 2.1.12

private void createTopology() {
        try {

            Flowable<event> flowable = publishSubjectController.toFlowable(BackpressureStrategy.BUFFER);
            ConnectableFlowable<event> connectableFlowable = flowable.publish();

            connectableFlowable
                    .filter(PersistenceManager::isMsgtoBePersisted)
                    .groupBy((w) -> w.getTopic())
                    .forEach(go -> {

                        Flowable<List<event>> filter = go.buffer(10, TimeUnit.SECONDS,
                                128).filter(gol -> !gol.isEmpty());

                        filter.onBackpressureBuffer(128, () -> {
                                    LOG.warn("Persist queue buffer overflow. Dropping oldest packets. Key: " + go.getKey());
                                }, BackpressureOverflowStrategy.DROP_OLDEST)
                                .observeOn(Schedulers.io())
                                .subscribe(lst -> {
                                        persistToDatabase(go.getKey(), lst);
                                });
                    });
            connectableFlowable.connect();

        } catch (Exception ex) {
            LOG.error ("Exception", ex);
        }
}

Multiple threads can invoke publishSubjectController.onNext(event) based on real time data from network.

  1. The consumer buffer keeps growing over 2-3 hours, eventually occupying 99% of the heap and crashing the application.

Screenshot 2023-11-09 at 7 45 27 PM

  1. There is a difference of 1 million between producerIndex and consumerIndex, and roughly those many elements are present in consumer buffer. But the difference grows, indicating that consumption has been stopped for some reason under certain circumstances. Nothing seems to be written to the database.

  2. The behaviour is highly sporadic, the same code works more often than not. Memory leak doesnt happen everytime

Could you please help me with finding an explanation of this behaviour?

@akarnokd
Copy link
Member

akarnokd commented Nov 9, 2023

  1. Is your publishSubjectController serialized? Concurrent onNext access can lead to hangs or leaks.
  2. groupBy may hold open groups longer than anticipated, especially if the source has some form of rolling-grouping.
  3. Calling subscribe in forEach is a code smell as it breaks the chain. That can also cause leaks as those are no longer part of the same sequence to be automatically cleaned up.

@sr1996-glitch
Copy link
Author

sr1996-glitch commented Nov 9, 2023

@akarnokd Thank you for quick reply.

  1. Its not serialized. Mainly because of high throughput requirement (and like I mentioned, this code worked well until very recently - maybe broken now due to increased load). I was able to simulate hang due to non-serialized publishsubject (under a much higher load and thread count), but it does not create consumer buffer bloating.

  2. There is no rolling-grouping. The incoming data can belong to 1 of 4 groups/topics (with a fairly similar probability). But, even then, are grouped elements cached in consumer buffer ?

  3. Point taken. I ll move the buffering+filtration to a flatmap.

@akarnokd
Copy link
Member

akarnokd commented Nov 9, 2023

If you call onNext concurrently, you must serialize the subject.

toFlowable(BackpressureStrategy.BUFFER) has an unbounded buffer so if you can't drain it fast enough in downstream operators, it can lead to buffer bloat on its own. You'll have to rethink the dataflow, to use Flowables and some form of bounded data generation.

@akarnokd akarnokd closed this as completed Feb 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants