Skip to content

Commit

Permalink
CommandBatchExecutorService timeout handling improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 20, 2015
1 parent 4066886 commit 853a0c2
Showing 1 changed file with 51 additions and 19 deletions.
70 changes: 51 additions & 19 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
Expand Up @@ -32,9 +32,11 @@
import org.redisson.client.RedisTimeoutException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.codec.Codec;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.CommandsData;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.NodeSource;
Expand Down Expand Up @@ -207,7 +209,10 @@ public void execute(final Entry entry, final NodeSource source, final Promise<Vo
}

final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

final AtomicReference<ChannelFuture> writeFutureRef = new AtomicReference<ChannelFuture>();
final AtomicReference<RedisException> exceptionRef = new AtomicReference<RedisException>();
final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();

final Future<RedisConnection> connectionFuture;
if (entry.isReadOnlyMode()) {
Expand All @@ -223,6 +228,17 @@ public void run(Timeout timeout) throws Exception {
connectionManager.getShutdownLatch().release();
}

if ((writeFutureRef.get() == null || !writeFutureRef.get().isDone())
&& connectionFuture.isSuccess()) {
Timeout newTimeout = connectionManager.newTimeout(this, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
timeoutRef.set(newTimeout);
return;
}

if (writeFutureRef.get() != null && writeFutureRef.get().isSuccess()) {
return;
}

if (attemptPromise.isDone()) {
return;
}
Expand All @@ -233,7 +249,7 @@ public void run(Timeout timeout) throws Exception {
}

if (attempt == connectionManager.getConfig().getRetryAttempts()) {
attemptPromise.tryFailure(ex.get());
attemptPromise.tryFailure(exceptionRef.get());
return;
}
if (!attemptPromise.cancel(false)) {
Expand All @@ -245,8 +261,7 @@ public void run(Timeout timeout) throws Exception {
}
};

ex.set(new RedisTimeoutException("Batch command execution timeout"));
final AtomicReference<Timeout> timeoutRef = new AtomicReference<Timeout>();
exceptionRef.set(new RedisTimeoutException("Batch command execution timeout"));
Timeout timeout = connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);

Expand All @@ -256,35 +271,52 @@ public void operationComplete(Future<RedisConnection> connFuture) throws Excepti
if (attemptPromise.isDone() || connFuture.isCancelled() || mainPromise.isCancelled()) {
return;
}

if (!connFuture.isSuccess()) {
ex.set(convertException(connFuture));
if (timeoutRef.get().cancel()) {
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
exceptionRef.set(convertException(connFuture));
return;
}

RedisConnection connection = connFuture.getNow();

List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());

if (source.getRedirect() == Redirect.ASK) {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size() + 1);
Promise<Void> promise = connectionManager.newPromise();
list.add(new CommandData<Void, Void>(promise, StringCodec.INSTANCE, RedisCommands.ASKING, new Object[] {}));
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
writeFutureRef.set(future);
} else {
List<CommandData<?, ?>> list = new ArrayList<CommandData<?, ?>>(entry.getCommands().size());
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));
writeFutureRef.set(future);
}
ChannelFuture writeFuture = connection.send(new CommandsData(attemptPromise, list));


writeFuture.addListener(new ChannelFutureListener() {
writeFutureRef.get().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (attemptPromise.isDone() || future.isCancelled() || mainPromise.isCancelled()) {
if (attemptPromise.isDone() || mainPromise.isCancelled()) {
return;
}

if (!future.isSuccess()) {
ex.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause()));
if (timeoutRef.get().cancel()) {
connectionManager.newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
exceptionRef.set(new WriteRedisConnectionException("Can't write commands batch to channel: " + future.channel(), future.cause()));
} else {
timeoutRef.get().cancel();
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
attemptPromise.tryFailure(exceptionRef.get());
}
};
Timeout timeout = connectionManager.newTimeout(timeoutTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
timeoutRef.set(timeout);
}
}
});
Expand Down

0 comments on commit 853a0c2

Please sign in to comment.