Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible pool resource release bug #124

Closed
andreisilviudragnea opened this issue Mar 14, 2021 · 11 comments
Closed

Possible pool resource release bug #124

andreisilviudragnea opened this issue Mar 14, 2021 · 11 comments
Labels
status/invalid We don't feel this issue is valid

Comments

@andreisilviudragnea
Copy link

andreisilviudragnea commented Mar 14, 2021

There are times when the resources are never released back to the pool. I do not know the cause yet, but I was able to reproduce the problem in a separate project here.

Expected Behavior

The resources should be released to the pool.

Actual Behavior

The resources never get released to the pool.

Steps to Reproduce

package com.example;

import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.pool.InstrumentedPool;
import reactor.pool.PoolBuilder;

public class ReactorPoolBugTest {

    private static final int COUNT = 10_000;

    private final InstrumentedPool<String> stringReactivePool = PoolBuilder
            .from(Mono.just("value").delayElement(Duration.ofMillis(2)))
            .maxPendingAcquireUnbounded()
            .sizeBetween(0, 3)
            .buildPool();

    @Test
    public void reactorPoolBug() throws InterruptedException {
        ExecutorService loggerThreads = Executors.newFixedThreadPool(
                1,
                r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setDaemon(true);
                    return t;
                }
        );

        loggerThreads.submit(new PoolMetricsLogger(stringReactivePool.metrics()));

        ExecutorService executorService = Executors.newFixedThreadPool(
                16,
                r -> {
                    Thread t = Executors.defaultThreadFactory().newThread(r);
                    t.setDaemon(true);
                    return t;
                }
        );

        CountDownLatch cdl = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) {
            executorService.submit(new FlatMapErrorTask(cdl));
        }

        cdl.await();
    }

    private static final class PoolMetricsLogger implements Runnable {

        private final InstrumentedPool.PoolMetrics poolMetrics;

        private PoolMetricsLogger(InstrumentedPool.PoolMetrics poolMetrics) {
            this.poolMetrics = poolMetrics;
        }

        public void run() {
            while (true) {
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                System.err.printf(
                        "[POOL Metrics] Acquired = %d Pending = %d Idle = %d%n",
                        poolMetrics.acquiredSize(),
                        poolMetrics.pendingAcquireSize(),
                        poolMetrics.idleSize()
                );
            }
        }
    }

    private final class FlatMapErrorTask implements Runnable {

        private final CountDownLatch cdl;

        public FlatMapErrorTask(CountDownLatch cdl) {
            this.cdl = cdl;
        }

        public void run() {
            Flux<Void> flux = Flux
                    .range(0, 10)
                    .flatMap(i -> stringReactivePool
                            .withPoolable(value -> Mono
                                    .just(value)
                                    .delayElement(Duration.ofMillis(10))
                                    .then()
                            )
                            .switchIfEmpty(Mono.error(new RuntimeException("Empty")))
                    )
                    .doOnComplete(() -> cdl.countDown())
                    .doOnError(error -> cdl.countDown());

            try {
                flux.blockLast();
            } catch (Exception e) {
                System.err.println(e);
            }

            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

Possible Solution

The test should always finish, but most of the time it never does, because all the pool resources remain acquired. For an unknown reason, the test fails way less often if you remove the Thread.sleep() at the end.

Your Environment

  • Reactor version(s) used: reactor-core:3.4.4, reactor-pool:0.2.3
  • Other relevant libraries versions (eg. netty, ...):
  • JVM version (java -version):
openjdk version "1.8.0_282"
OpenJDK Runtime Environment (Zulu 8.52.0.23-CA-linux64) (build 1.8.0_282-b08)
OpenJDK 64-Bit Server VM (Zulu 8.52.0.23-CA-linux64) (build 25.282-b08, mixed mode)
  • OS and version (eg uname -a): Linux andrei-desktop 5.8.0-44-generic #50-Ubuntu SMP Tue Feb 9 06:29:41 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux
@reactorbot reactorbot added the ❓need-triage This issue needs triage, hasn't been looked at by a team member yet label Mar 14, 2021
@simonbasle simonbasle added status/need-investigation This needs more in-depth investigation type/bug A general bug and removed ❓need-triage This issue needs triage, hasn't been looked at by a team member yet labels Mar 15, 2021
@simonbasle
Copy link
Member

just in case this is a bug with the withPoolable construct, have you been able to reproduce with the acquire API?

@andreisilviudragnea
Copy link
Author

@simonbasle I tried reproducing the bug only on the pool side here and only on the Flux.usingWhen() operator side here, but I failed.

I am not very familiar with the inner workings of either reactor-core or reactor-pool projects, but I may not use the .doOnCancel() operator correctly in the first isolated example above.

@andreisilviudragnea
Copy link
Author

@simonbasle I have opened a MonoUsingWhen / FluxUsingWhen issue reactor/reactor-core#2661

@andreisilviudragnea
Copy link
Author

@simonbasle After investigating the reactor-pool implementation more, should drainLoop() be called also when AbstractPool.cancelAcquire() is called?

@andreisilviudragnea
Copy link
Author

andreisilviudragnea commented Mar 23, 2021

It looks like activating background eviction makes the test fail less often, but the test for the bug still fails sometimes.

Edit: background eviction seems useless.

@andreisilviudragnea
Copy link
Author

andreisilviudragnea commented Mar 23, 2021

I managed to isolate the reactor-pool bug here. Not even background eviction is helpful.

@agorbachenko
Copy link

Hi there! According to @mp911de and @chemicL this issue is the same as r2dbc/r2dbc-pool#198. That issue is another reproducer of this critical bug. Maybe this helps to spot the bug and fix it.

@chemicL
Copy link
Member

chemicL commented Feb 27, 2024

@andreisilviudragnea thanks for the report. I'm sorry to see it didn't get much attention. With some other reports around r2dbc, jOOQ, and reactor-core I stumbled upon the same problem and discovered your report. So I'll use this issue to try to make some progress on resolving this.

Here's a highly reproducible example that I created to capture what I observed in the r2dbc-pool issue mentioned by @agorbachenko (best to run it in a loop / repeat mode in the IDE): https://github.com/reactor/reactor-pool/compare/fix-124-deliver-cancel-race?expand=1.

As a result of the race between cancel and deliver, the connection is delivered to an already cancelled Subscriber. In a happy case a onNextDropped hook picks it up and that's what happens in case of the test, which prints out the following log:

Feb 27 2024 11:39:40.743 [parallel-4] DEBUG r.c.p.Operators [Loggers.java:254] - onNextDropped: PooledRef{poolable=PoolableTest{id=1, used=0/5}, lifeTime=0ms, idleTime=0ms, acquireCount=0} 

I'll chat with the team about possible solutions.

@chemicL
Copy link
Member

chemicL commented Mar 7, 2024

@andreisilviudragnea I have some news and comments. I do believe it is time to close this particular issue. Here are the details:

  1. The example that I came up with in my above comment is expected due to the async nature of cancellation. The resolution is for the downstream consumers of the API to handle dropped items either with a Hook or to provide a proper combination of operators. The reason is that, within the reactor-pool's API, if we already emitted the connection downstream, it is the responsibility of the consumer to return it back to the pool. In my contrived example, the consumer is the LambdaMonoSubscriber, which receives the connection. The constructs used by reactor-pool are not involved, it's the user's side.
  2. The example that you provided, boiled down to the one from comment from March 23rd, 2021, requires a discard hook. Unfortunately, the way certain operators work requires handling cancellation at different levels. Sometimes items are dropped, like in my example. Sometimes items are discarded - which is a case for filter for instance. In this particular scenario it is the outer flatMap, that acquires the connection, that gets cancelled (due to the emitted error) and needs to handle dropped items which it has not yet applied the mapper on top of. Below please find the modified example with my comments and changes.
  • <1> is a change to avoid using the term "forever" and just coordinate all the asynchronous resource releases instead of relying on time. In my case the test takes over 19 minutes to finish.
  • <2> is about all the drops, discards and cancellations.
  • <3> is probably not true, but makes the assumption that the cleanup can be asynchronous so it gives some time to check the state of the pool resources.
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Hooks;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class ReactorPoolIsolatedBugTest {

	private static final int COUNT = 10_000;

	// <1> Added a sink to ensure all releases are consumed in time.
	private Sinks.Many<Mono<Void>> asyncReleaseSink =
			Sinks.many().unicast().onBackpressureBuffer();

	private final InstrumentedPool<String> stringReactivePool = PoolBuilder
			.from(Mono.just("value").delayElement(Duration.ofMillis(2)))
			.maxPendingAcquireUnbounded()
			.sizeBetween(0, 3)
			.buildPool();

	@Test
	public void reactorPoolBug() throws InterruptedException {
		// <2> This actually never gets triggered. Will explain soon.
		Hooks.onNextDropped((Consumer<Object>) dropped -> {
			if (dropped instanceof PooledRef) {
				System.out.println("GOT DROPPED REF");
				Sinks.EmitResult result =
						asyncReleaseSink.tryEmitNext(((PooledRef<?>) dropped).release());
				if (result.isFailure()) {
					System.out.println("Failed to release dropped con");
				}
			}
		});

		ExecutorService executorService = Executors.newFixedThreadPool(
				16,
				r -> {
					Thread t = Executors.defaultThreadFactory().newThread(r);
					t.setDaemon(true);
					return t;
				}
		);

		// <1> Make sure all cancellations return connection to the pool before
		// validation happens.
		ExecutorService asyncReleasePool = Executors.newSingleThreadExecutor();
		CountDownLatch asyncReleaseDone = new CountDownLatch(1);
		asyncReleasePool.submit(
				() -> asyncReleaseSink.asFlux()
				                      .concatMap(Function.identity())
				                      .subscribe()
		);

		CountDownLatch cdl = new CountDownLatch(COUNT);
		for (int i = 0; i < COUNT; i++) {
			executorService.submit(new FlatMapErrorTask(cdl));
		}

		cdl.await();

		System.out.println("All tasks finished. Waiting for connection release.");

		// <1> Follow up all async releases with a latch release to validate the state.
		Sinks.EmitResult result =
				asyncReleaseSink.tryEmitNext(Mono.fromRunnable(asyncReleaseDone::countDown));
		assertThat(result).matches(Sinks.EmitResult::isSuccess);

		asyncReleaseDone.await();

		// <3> In case there was any asynchronous eviction in place
		await().alias("acquiredSize").atMost(10, TimeUnit.SECONDS)
		          .untilAsserted(() -> assertThat(stringReactivePool.metrics().acquiredSize()).isEqualTo(0));
		await().alias("idleSize").atMost(10, TimeUnit.SECONDS)
		          .untilAsserted(() -> assertThat(stringReactivePool.metrics().idleSize()).isEqualTo(3));
		await().alias("allocatedSize").atMost(10, TimeUnit.SECONDS)
		          .untilAsserted(() -> assertThat(stringReactivePool.metrics().allocatedSize()).isEqualTo(3));
	}

	private final class FlatMapErrorTask implements Runnable {

		private final CountDownLatch cdl;

		public FlatMapErrorTask(CountDownLatch cdl) {
			this.cdl = cdl;
		}

		public void run() {
			Flux<Void> flux = Flux
					.range(0, 10)
					.flatMap(i -> stringReactivePool
							.acquire()
							.delayElement(Duration.ofMillis(100))
							// <2> It can happen that the flatMap over the range is
							// cancelled. However, the lambda in the flatMap that follows
							// is not actually exercised. The value is discarded
							// after cancellation, without a chance to be released. So
							// we use the discard hook:
							.doOnDiscard(PooledRef.class, s -> {
								System.out.println("Discarded after acquire, pushing release to sink");
								Sinks.EmitResult result =
										asyncReleaseSink.tryEmitNext(s.release());
								if (result.isFailure()) {
									System.out.println("Failed to emit async release");
									System.exit(1);
								}
							})
							.flatMap(pooledRef -> Mono
									.just(pooledRef.poolable())
									.delayElement(Duration.ofMillis(10))
									// <2> Might never be triggered.
									.then(pooledRef.release())
									// <2> Might never be triggered.
									.onErrorResume(error -> pooledRef.release())
									// <2> Might never be triggered.
									.doOnCancel(() -> {
										System.out.println("Canceled inner, pushing " +
												"release to sink");
										Sinks.EmitResult result =
												asyncReleaseSink.tryEmitNext(pooledRef.release());
										if (result.isFailure()) {
											System.out.println("Failed to emit async release");
											System.exit(1);
										}
									})
							)
							.switchIfEmpty(Mono.error(new RuntimeException("Empty")))
					)
					.doOnComplete(cdl::countDown)
					.doOnError(error -> cdl.countDown());

			try {
				flux.blockLast();
			} catch (Exception e) {
				System.err.println(e);
			}

			try {
				Thread.sleep(50);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
}

@andreisilviudragnea With these explanations, my understanding is that the pool has no bug. However, the usage of various operators requires care when dealing with the connections to properly handle cancellations, discards, drops, and termination in a graceful manner at respective levels of chaining.

I look forward to your verification of the above evaluation. Thanks in advance!

@chemicL chemicL added for/user-attention This issue needs user attention (feedback, rework, etc...) and removed type/bug A general bug status/need-investigation This needs more in-depth investigation labels Mar 7, 2024
@chemicL
Copy link
Member

chemicL commented Mar 13, 2024

@andreisilviudragnea friendly ping :)

@chemicL
Copy link
Member

chemicL commented Mar 19, 2024

Ok, I'll close the issue. Please feel free to re-open if you disagree with my assessment.

@chemicL chemicL closed this as not planned Won't fix, can't repro, duplicate, stale Mar 19, 2024
@chemicL chemicL added status/invalid We don't feel this issue is valid and removed for/user-attention This issue needs user attention (feedback, rework, etc...) labels Mar 19, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status/invalid We don't feel this issue is valid
Projects
None yet
Development

No branches or pull requests

5 participants