Skip to content

Commit

Permalink
Success\Failed Future listener notification optimization. #338
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 18, 2015
1 parent b0d9803 commit e17a9ce
Show file tree
Hide file tree
Showing 9 changed files with 318 additions and 78 deletions.
52 changes: 52 additions & 0 deletions src/main/java/org/redisson/command/AsyncDetails.java
@@ -0,0 +1,52 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.command;

import org.redisson.client.RedisException;

import io.netty.channel.ChannelFuture;
import io.netty.util.Timeout;

public class AsyncDetails {

private volatile ChannelFuture writeFuture;

private volatile RedisException exception;

private volatile Timeout timeout;

public ChannelFuture getWriteFuture() {
return writeFuture;
}
public void setWriteFuture(ChannelFuture writeFuture) {
this.writeFuture = writeFuture;
}

public RedisException getException() {
return exception;
}
public void setException(RedisException exception) {
this.exception = exception;
}

public Timeout getTimeout() {
return timeout;
}
public void setTimeout(Timeout timeout) {
this.timeout = timeout;
}

}
92 changes: 45 additions & 47 deletions src/main/java/org/redisson/command/CommandAsyncService.java
Expand Up @@ -23,7 +23,6 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.RedisClientResult;
import org.redisson.SlotCallback;
Expand Down Expand Up @@ -238,19 +237,19 @@ public <T, R> Future<R> readAsync(String key, RedisCommand<T> command, Object ..
@Override
public <T, R> Future<R> evalReadAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key);
return evalAsync(new NodeSource(slot), true, key, codec, evalCommandType, script, keys, params);
return evalAsync(new NodeSource(slot), true, codec, evalCommandType, script, keys, params);
}

@Override
public <T, R> Future<R> evalReadAsync(InetSocketAddress client, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key);
return evalAsync(new NodeSource(slot, client), true, key, codec, evalCommandType, script, keys, params);
return evalAsync(new NodeSource(slot, client), true, codec, evalCommandType, script, keys, params);
}

@Override
public <T, R> Future<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
int slot = connectionManager.calcSlot(key);
return evalAsync(new NodeSource(slot), false, key, codec, evalCommandType, script, keys, params);
return evalAsync(new NodeSource(slot), false, codec, evalCommandType, script, keys, params);
}

@Override
Expand Down Expand Up @@ -290,7 +289,7 @@ public Promise<T> setFailure(Throwable cause) {
return mainPromise;
}

private <T, R> Future<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
private <T, R> Future<R> evalAsync(NodeSource nodeSource, boolean readOnlyMode, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params) {
Promise<R> mainPromise = connectionManager.newPromise();
List<Object> args = new ArrayList<Object>(2 + keys.size() + params.length);
args.add(script);
Expand All @@ -314,8 +313,8 @@ public <T, R> Future<R> writeAsync(String key, Codec codec, RedisCommand<T> comm
return mainPromise;
}

protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec, final RedisCommand<V> command,
final Object[] params, final Promise<R> mainPromise, final int attempt) {
protected <V, R> void async(final boolean readOnlyMode, final NodeSource source, final MultiDecoder<Object> messageDecoder, final Codec codec,
final RedisCommand<V> command, final Object[] params, final Promise<R> mainPromise, final int attempt) {
if (mainPromise.isCancelled()) {
return;
}
Expand All @@ -327,10 +326,7 @@ protected <V, R> void async(final boolean readOnlyMode, final NodeSource source,

final Promise<R> attemptPromise = connectionManager.newPromise();

final AtomicReference<ChannelFuture> writeFutureRef = new AtomicReference<ChannelFuture>();
final AtomicReference<RedisException> exceptionRef = new AtomicReference<RedisException>();
final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();

final AsyncDetails details = new AsyncDetails();

final Future<RedisConnection> connectionFuture;
if (readOnlyMode) {
Expand All @@ -350,7 +346,7 @@ public void run(Timeout t) throws Exception {
connectionManager.getShutdownLatch().release();
} else {
if (connectionFuture.isSuccess()) {
ChannelFuture writeFuture = writeFutureRef.get();
ChannelFuture writeFuture = details.getWriteFuture();
if (writeFuture != null && !writeFuture.cancel(false) && writeFuture.isSuccess()) {
return;
}
Expand All @@ -363,10 +359,10 @@ public void run(Timeout t) throws Exception {
}

if (attempt == connectionManager.getConfig().getRetryAttempts()) {
if (exceptionRef.get() == null) {
exceptionRef.set(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params)));
if (details.getException() == null) {
details.setException(new RedisTimeoutException("Command execution timeout for command: " + command + " with params: " + Arrays.toString(params)));
}
attemptPromise.tryFailure(exceptionRef.get());
attemptPromise.tryFailure(details.getException());
return;
}
if (!attemptPromise.cancel(false)) {
Expand All @@ -379,7 +375,7 @@ public void run(Timeout t) throws Exception {
};

Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
details.setTimeout(timeout);

connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override
Expand All @@ -389,7 +385,7 @@ public void operationComplete(Future<RedisConnection> connFuture) throws Excepti
}

if (!connFuture.isSuccess()) {
exceptionRef.set(convertException(connFuture));
details.setException(convertException(connFuture));
return;
}

Expand All @@ -402,63 +398,65 @@ public void operationComplete(Future<RedisConnection> connFuture) throws Excepti
list.add(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
Promise<Void> main = connectionManager.newPromise();
ChannelFuture future = connection.send(new CommandsData(main, list));
writeFutureRef.set(future);
details.setWriteFuture(future);
} else {
log.debug("aquired connection for command {} from slot {} using node {}", command, source, connection.getRedisClient().getAddr());
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
writeFutureRef.set(future);
details.setWriteFuture(future);
}

writeFutureRef.get().addListener(new ChannelFutureListener() {
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled()) {
return;
}

if (!future.isSuccess()) {
exceptionRef.set(new WriteRedisConnectionException(
details.setException(new WriteRedisConnectionException(
"Can't write command: " + command + ", params: " + params + " to channel: " + future.channel(), future.cause()));
} else {
timeoutRef.get().cancel();
int timeoutTime = connectionManager.getConfig().getTimeout();
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName())
|| command.getName().equals(RedisCommands.BRPOP_VALUE.getName())) {
Integer blPopTimeout = Integer.valueOf(params[params.length - 1].toString());
if (blPopTimeout == 0) {
return;
}
timeoutTime += blPopTimeout*1000;
}
return;
}

final int timeoutAmount = timeoutTime;
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + command
+ " with params: " + Arrays.toString(params) + " channel: " + connection.getChannel()));
}
};

Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
details.getTimeout().cancel();

int timeoutTime = connectionManager.getConfig().getTimeout();
if (command.getName().equals(RedisCommands.BLPOP_VALUE.getName())
|| command.getName().equals(RedisCommands.BRPOP_VALUE.getName())) {
Integer popTimeout = Integer.valueOf(params[params.length - 1].toString());
if (popTimeout == 0) {
return;
}
timeoutTime += popTimeout*1000;
}

final int timeoutAmount = timeoutTime;
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(
new RedisTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + command
+ " with params: " + Arrays.toString(params) + " channel: " + connection.getChannel()));
}
};

Timeout timeout = connectionManager.newTimeout(timeoutTask, timeoutTime, TimeUnit.MILLISECONDS);
details.setTimeout(timeout);
}
});

if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, timeoutRef));
attemptPromise.addListener(connectionManager.createReleaseReadListener(source, connection, details));
} else {
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, timeoutRef));
attemptPromise.addListener(connectionManager.createReleaseWriteListener(source, connection, details));
}
}
});

attemptPromise.addListener(new FutureListener<R>() {
@Override
public void operationComplete(Future<R> future) throws Exception {
timeoutRef.get().cancel();
details.getTimeout().cancel();
if (future.isCancelled()) {
return;
}
Expand Down

0 comments on commit e17a9ce

Please sign in to comment.