-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
When high qps, async operation code stack may too deep #5977
Conversation
32823b8
to
eae16ba
Compare
Thanks for suggested changes. Can you share the example of stack you are trying to avoid? |
it's simple POC about reduce call stack in AsyncSemaphore private void tryRun() {
while (true) {
if (counter.decrementAndGet() >= 0) {
CompletableFuture<Lock> future = listeners.poll();
if (future == null) {
counter.incrementAndGet();
return;
}
if (!future.isDone()) {
// release only once by lock object
AtomicBoolean once = new AtomicBoolean();
AtomicBoolean tryNext = new AtomicBoolean();
long currentThreadId = Thread.currentThread().getId();
if (future.complete(() -> { <==== return release runnable
if (once.compareAndSet(false, true)) {
// if in the same thread and tryNext is false means already release in the same thread.
release(Thread.currentThread().getId() != currentThreadId || tryNext.compareAndSet(false, true));
}
})) {
if (tryNext.compareAndSet(false, true)) {
continue; // continue to tryRun if not release yet.
} else {
return;
}
}
}
}
if (counter.incrementAndGet() <= 0) {
return;
}
}
I didn't test and optimism it yet. |
}); | ||
// prevent deep stack, switch thread per 100 requests. | ||
if (count.incrementAndGet() % 100 == 0) { | ||
f.thenAcceptAsync(r -> connectTo(result, command)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you need to define serviceManager.getGroup()
as an executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serviceManager.getGroup().submit(() -> {
releaseConnection();
});
in connectTo method now can be replaced with releaseConnection()
method call only;
Signed-off-by: Chen, YinChin | Angus <yinchin.chen@rakuten.com>
eae16ba
to
d03a80f
Compare
I change to always switch thread during release as your suggestion. how to you think ? |
or switch in ? RedisExecutor.sendCommand
if (connectionManager.getServiceManager().getConfig().getMasterConnectionPoolSize() < 10
&& !command.isBlockingCommand()) {
release(connection); <=== here
} or public void release() {
counter.incrementAndGet();
tryRun(); <=== here
} |
Thanks for your assistance. I have fixed it in 3c1c90f |
No description provided.