Skip to content

Commit

Permalink
Fixed - connection leak during high load with few connections #5971
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Jun 26, 2024
1 parent ca4dae1 commit 3c1c90f
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class ConnectionsHolder<T extends RedisConnection> {
public ConnectionsHolder(RedisClient client, int poolMaxSize,
Function<RedisClient, CompletionStage<T>> connectionCallback,
ServiceManager serviceManager, boolean changeUsage) {
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize, serviceManager.getGroup());
this.client = client;
this.connectionCallback = connectionCallback;
this.serviceManager = serviceManager;
Expand Down Expand Up @@ -215,9 +215,7 @@ public CompletableFuture<T> acquireConnection(RedisCommand<?> command) {

private void connectTo(CompletableFuture<T> promise, RedisCommand<?> command) {
if (promise.isDone()) {
serviceManager.getGroup().submit(() -> {
releaseConnection();
});
releaseConnection();
return;
}

Expand Down Expand Up @@ -249,5 +247,8 @@ public void releaseConnection(ClientConnectionsEntry entry, T connection) {
releaseConnection();
}

public ServiceManager getServiceManager() {
return serviceManager;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public class TrackedConnectionsHolder extends ConnectionsHolder<RedisConnection>
private final AtomicInteger usage = new AtomicInteger();

public TrackedConnectionsHolder(ConnectionsHolder<RedisConnection> holder) {
super(null, 0, null, null, false);
super(null, 0, null, holder.getServiceManager(), false);
this.holder = holder;
}

Expand Down
42 changes: 38 additions & 4 deletions redisson/src/main/java/org/redisson/misc/AsyncSemaphore.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;

/**
Expand All @@ -27,13 +28,22 @@
*/
public final class AsyncSemaphore {

private final ExecutorService executorService;
private final AtomicInteger tasksLatch = new AtomicInteger(1);
private final AtomicInteger stackSize = new AtomicInteger();

private final AtomicInteger counter;
private final Queue<CompletableFuture<Void>> listeners = new ConcurrentLinkedQueue<>();

public AsyncSemaphore(int permits) {
this(permits, null);
}

public AsyncSemaphore(int permits, ExecutorService executorService) {
counter = new AtomicInteger(permits);
this.executorService = executorService;
}

public int queueSize() {
return listeners.size();
}
Expand All @@ -45,10 +55,26 @@ public void removeListeners() {
public CompletableFuture<Void> acquire() {
CompletableFuture<Void> future = new CompletableFuture<>();
listeners.add(future);
tryRun();
tryForkAndRun();
return future;
}

private void tryForkAndRun() {
if (executorService != null) {
int val = tasksLatch.get();
if (stackSize.get() > 100 * val
&& tasksLatch.compareAndSet(val, val+1)) {
executorService.submit(() -> {
tasksLatch.decrementAndGet();
tryRun();
});
return;
}
}

tryRun();
}

private void tryRun() {
while (true) {
if (counter.decrementAndGet() >= 0) {
Expand All @@ -58,7 +84,15 @@ private void tryRun() {
return;
}

if (future.complete(null)) {
boolean complete;
if (executorService != null) {
stackSize.incrementAndGet();
complete = future.complete(null);
stackSize.decrementAndGet();
} else {
complete = future.complete(null);
}
if (complete) {
return;
}
}
Expand All @@ -75,7 +109,7 @@ public int getCounter() {

public void release() {
counter.incrementAndGet();
tryRun();
tryForkAndRun();
}

@Override
Expand Down

6 comments on commit 3c1c90f

@semistone
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel 100 is too big in real application.
how about reduce to 5 or 10 or even smaller ?

@mrniko
Copy link
Member

@mrniko mrniko commented on 3c1c90f Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. I managed to reproduce the issue with your test and Flux.range(1, 8000), so I reduced it to 25. Less values, even 2 can't help to resolve issues with Flux.range(1, 10000)

@semistone
Copy link

@semistone semistone commented on 3c1c90f Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking is any way to catch that StackOverflow Error and print something in error log?
because once it happen, it's so difficult to notice.
but because it's JVM error, maybe nothing we could do.

@mrniko
Copy link
Member

@mrniko mrniko commented on 3c1c90f Jun 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't catch it. Do you?

@semistone
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when I test by

    @Test
    public void test() throws InterruptedException {
        ExecutorService executorService = Executors.newSingleThreadExecutor();

        CompletableFuture.runAsync(() -> testStack("test", "test2"), executorService)
                .whenComplete((v, e) -> {
                    e.printStackTrace();
                });


    }

    public String testStack(String xx, String yy) {
        String x = xx;
        String y = yy;
        thread=Thread.currentThread();
        count.incrementAndGet();
        return testStack(x, y);
    }

I found CompletableFuture could catch it , but I didn't check how it work

@semistone
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more we add those local variable in stack memory, less recursive it could run

Please sign in to comment.