Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sinks.many().multicast() memory leak #3028

Closed
majorovms opened this issue Apr 21, 2022 · 1 comment
Closed

Sinks.many().multicast() memory leak #3028

majorovms opened this issue Apr 21, 2022 · 1 comment
Labels
type/bug A general bug
Milestone

Comments

@majorovms
Copy link

It seems that Sinks.many().multicast() keeps subscribers even after cancelling subscription if subscription.dispose() executes concurrently.

We had this memory leak issue #3001 so we upgraded to 3.4.17 and it went much better. But there is still a memory leak in another emitter, SinkManySerialized. Here is a test which reproduces the issue:

    @ParameterizedTest
    @ValueSource(ints = {1,2,3,4,5})
    public void testConcurrentUnsubscribe(int threadCount) {
        int subsCount = 10;
        Many<Object> emitter = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);

        List<Disposable> subscriptions = IntStream.range(0, subsCount)
                .mapToObj(i -> emitter.asFlux().subscribe())
                .toList();

        ExecutorService executor = Executors.newFixedThreadPool(threadCount);
        List<? extends Future<?>> disposeFutures = subscriptions.stream()
                .map(subscription -> executor.submit(subscription::dispose))
                .toList();
        List<?> disposals = disposeFutures.stream().map(this::getQuietly).toList();

        Assertions.assertEquals(subsCount, disposals.size());
        Assertions.assertEquals(0, emitter.currentSubscriberCount());
    }

    private <T> T getQuietly(Future<T> it) {
        try {
            return it.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

The test passes with a single thread executor and it is more likely to fail with 2-5 threads. But it is expected that there is no subscribers even if subscription.dispose() executes concurrently.

Workaround

As a temporary solution we had to do all the subscription.dispose() calls in a single thread executor.

Environment

  • Reactor version: 3.4.17
  • Spring-webflux, netty
  • JVM corretto 17
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Apr 21, 2022
@simonbasle simonbasle added type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Apr 21, 2022
@simonbasle simonbasle added this to the 3.4.18 milestone Apr 21, 2022
@simonbasle
Copy link
Member

wow, this bug has been around since April 2017 😱 in case of contention during removal of inners (ie. subscribers cancellation, to a lesser extent subscriber completion), when the compareAnSet(oldSubscriberArray, reducedSubscriberArray) fails, the EmitterProcessor's implementation doesn't loop back...

simonbasle added a commit that referenced this issue Apr 21, 2022
The EmitterProcessor#remove method causes retaining of subscribers if
the removal is done in parallel, as the CAS failure doesn't cause a
new loop iteration.

This applies to direct instantiations of EmitterProcessor as well as
Sinks.many().onBackpressureBuffer sinks.

This commit fixes the method to loop back when the CAS fails.

Fixes #3028.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
Development

No branches or pull requests

3 participants