Skip to content

Commit

Permalink
Improvement - RedisConnectionClosedException removed. #1695 #1748
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Dec 20, 2018
1 parent 97e582a commit 1a08db7
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 98 deletions.
7 changes: 1 addition & 6 deletions redisson/src/main/java/org/redisson/RedissonMultiLock.java
Expand Up @@ -28,7 +28,6 @@

import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisResponseTimeoutException;
import org.redisson.misc.RPromise;
import org.redisson.misc.RedissonPromise;
Expand Down Expand Up @@ -236,9 +235,6 @@ public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws Inte
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (RedisConnectionClosedException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
} catch (RedisResponseTimeoutException e) {
unlockInner(Arrays.asList(lock));
lockAcquired = false;
Expand Down Expand Up @@ -321,8 +317,7 @@ public void operationComplete(Future<Boolean> future) throws Exception {
lockAcquired = future.getNow();
}

if (future.cause() instanceof RedisConnectionClosedException
|| future.cause() instanceof RedisResponseTimeoutException) {
if (future.cause() instanceof RedisResponseTimeoutException) {
unlockInnerAsync(Arrays.asList(lock), threadId);
}

Expand Down

This file was deleted.

Expand Up @@ -21,7 +21,6 @@
import java.util.regex.Pattern;

import org.redisson.client.ChannelName;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.WriteRedisConnectionException;
import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
Expand Down Expand Up @@ -82,11 +81,6 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
command.getChannelPromise().tryFailure(
new WriteRedisConnectionException("Channel has been closed! Can't write command: "
+ LogHelper.toString(command.getCommand()) + " to channel: " + ctx.channel()));

if (command.getChannelPromise().isSuccess() && !command.getCommand().isBlockingCommand()) {
command.getCommand().tryFailure(new RedisConnectionClosedException("Command "
+ LogHelper.toString(command.getCommand()) + " succesfully sent, but channel " + ctx.channel() + " has been closed!"));
}
}

super.channelInactive(ctx);
Expand Down
Expand Up @@ -219,29 +219,29 @@ protected <R> RPromise<R> createPromise() {
@Override
public <T, R> RFuture<R> readAsync(RedisClient client, MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(entry, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

@Override
public <T, R> RFuture<R> readAsync(RedisClient client, String name, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(name);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

public <T, R> RFuture<R> readAsync(RedisClient client, byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
int slot = connectionManager.calcSlot(key);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(slot, client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

@Override
public <T, R> RFuture<R> readAsync(RedisClient client, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(client), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

Expand Down Expand Up @@ -291,7 +291,7 @@ public void operationComplete(Future<Object> future) throws Exception {
for (MasterSlaveEntry entry : nodes) {
RPromise<R> promise = new RedissonPromise<R>();
promise.addListener(listener);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true, null);
async(true, new NodeSource(entry), codec, command, params, promise, 0, true);
}
return mainPromise;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ public void operationComplete(Future<R> future) throws Exception {
});

MasterSlaveEntry entry = nodes.remove(0);
async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false, null);
async(true, new NodeSource(entry), codec, command, params, attemptPromise, 0, false);
}

@Override
Expand Down Expand Up @@ -392,7 +392,7 @@ public void operationComplete(Future<T> future) throws Exception {
for (MasterSlaveEntry entry : nodes) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true, null);
async(readOnlyMode, new NodeSource(entry), codec, command, params, promise, 0, true);
}
return mainPromise;
}
Expand All @@ -419,28 +419,28 @@ private NodeSource getNodeSource(byte[] key) {
public <T, R> RFuture<R> readAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false, null);
async(true, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}

@Override
public <T, R> RFuture<R> readAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(true, source, codec, command, params, mainPromise, 0, false, null);
async(true, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}

public <T, R> RFuture<R> readAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
async(true, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

@Override
public <T, R> RFuture<R> writeAsync(MasterSlaveEntry entry, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false, null);
async(false, new NodeSource(entry), codec, command, params, mainPromise, 0, false);
return mainPromise;
}

Expand Down Expand Up @@ -510,7 +510,7 @@ public void operationComplete(Future<T> future) throws Exception {
for (MasterSlaveEntry entry : entries) {
RPromise<T> promise = new RedissonPromise<T>();
promise.addListener(listener);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true, null);
async(readOnlyMode, new NodeSource(entry), connectionManager.getCodec(), command, args.toArray(), promise, 0, true);
}
return mainPromise;
}
Expand Down Expand Up @@ -590,7 +590,7 @@ private <T, R> RFuture<R> evalAsync(final NodeSource nodeSource, boolean readOnl
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(false, nodeSource, codec, command, args.toArray(), promise, 0, false, null);
async(false, nodeSource, codec, command, args.toArray(), promise, 0, false);

promise.addListener(new FutureListener<R>() {
@Override
Expand All @@ -613,8 +613,7 @@ public void operationComplete(Future<String> future) throws Exception {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(pps));
async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false,
null);
async(false, nodeSource, codec, command, args.toArray(), mainPromise, 0, false);
}
});
} else {
Expand All @@ -636,7 +635,7 @@ public void operationComplete(Future<String> future) throws Exception {
args.add(keys.size());
args.addAll(keys);
args.addAll(Arrays.asList(params));
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false, null);
async(readOnlyMode, nodeSource, codec, evalCommandType, args.toArray(), mainPromise, 0, false);
return mainPromise;
}

Expand All @@ -649,20 +648,20 @@ public <T, R> RFuture<R> writeAsync(String key, RedisCommand<T> command, Object.
public <T, R> RFuture<R> writeAsync(String key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false, null);
async(false, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}

public <T, R> RFuture<R> writeAsync(byte[] key, Codec codec, RedisCommand<T> command, Object... params) {
RPromise<R> mainPromise = createPromise();
NodeSource source = getNodeSource(key);
async(false, source, codec, command, params, mainPromise, 0, false, null);
async(false, source, codec, command, params, mainPromise, 0, false);
return mainPromise;
}

public <V, R> void async(final boolean readOnlyMode, final NodeSource source, final Codec codec,
final RedisCommand<V> command, final Object[] params, final RPromise<R> mainPromise, final int attempt,
final boolean ignoreRedirect, final RFuture<RedisConnection> connFuture) {
final boolean ignoreRedirect) {
if (mainPromise.isCancelled()) {
free(params);
return;
Expand Down Expand Up @@ -764,7 +763,7 @@ public void run(Timeout t) throws Exception {
count, details.getCommand(), LogHelper.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect, connFuture);
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
AsyncDetails.release(details);
}

Expand Down Expand Up @@ -798,7 +797,7 @@ public void operationComplete(Future<RedisConnection> connFuture) throws Excepti
details.getWriteFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
checkWriteFuture(details, connection);
checkWriteFuture(details, ignoreRedirect, connection);
}
});

Expand Down Expand Up @@ -831,7 +830,7 @@ protected void free(final Object[] params) {
}
}

private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final RedisConnection connection) {
private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final boolean ignoreRedirect, final RedisConnection connection) {
ChannelFuture future = details.getWriteFuture();
if (future.isCancelled() || details.getAttemptPromise().isDone()) {
return;
Expand Down Expand Up @@ -884,9 +883,26 @@ private <V, R> void checkWriteFuture(final AsyncDetails<V, R> details, final Red
TimerTask timeoutTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (details.getAttempt() < connectionManager.getConfig().getRetryAttempts()) {
if (!details.getAttemptPromise().cancel(false)) {
return;
}

int count = details.getAttempt() + 1;
if (log.isDebugEnabled()) {
log.debug("attempt {} for command {} and params {}",
count, details.getCommand(), LogHelper.toString(details.getParams()));
}
details.removeMainPromiseListener();
async(details.isReadOnlyMode(), details.getSource(), details.getCodec(), details.getCommand(), details.getParams(), details.getMainPromise(), count, ignoreRedirect);
AsyncDetails.release(details);
return;
}

details.getAttemptPromise().tryFailure(
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured for command: " + details.getCommand()
+ " with params: " + LogHelper.toString(details.getParams()) + " channel: " + connection.getChannel()));
new RedisResponseTimeoutException("Redis server response timeout (" + timeoutAmount + " ms) occured"
+ " after " + connectionManager.getConfig().getRetryAttempts() + " retry attempts. Command: " + details.getCommand()
+ ", params: " + LogHelper.toString(details.getParams()) + ", channel: " + connection.getChannel()));
}
};

Expand All @@ -905,17 +921,9 @@ public void operationComplete(Future<Boolean> future) throws Exception {
final Timeout scheduledFuture;
if (popTimeout != 0) {
// handling cases when connection has been lost
final Channel orignalChannel = connection.getChannel();
scheduledFuture = connectionManager.newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
// re-connection hasn't been made
// and connection is still active
// if (orignalChannel == connection.getChannel()
// && connection.isActive()) {
// return;
// }

if (details.getAttemptPromise().trySuccess(null)) {
connection.forceFastReconnectAsync();
}
Expand Down Expand Up @@ -1004,22 +1012,22 @@ protected <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDet
}

async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.MOVED), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}

if (future.cause() instanceof RedisAskException && !ignoreRedirect) {
RedisAskException ex = (RedisAskException) future.cause();
async(details.isReadOnlyMode(), new NodeSource(ex.getSlot(), ex.getUrl(), Redirect.ASK), details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}

if (future.cause() instanceof RedisLoadingException) {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);
AsyncDetails.release(details);
return;
}
Expand All @@ -1029,7 +1037,7 @@ protected <R, V> void checkAttemptFuture(final NodeSource source, final AsyncDet
@Override
public void run(Timeout timeout) throws Exception {
async(details.isReadOnlyMode(), source, details.getCodec(),
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect, details.getConnectionFuture());
details.getCommand(), details.getParams(), details.getMainPromise(), details.getAttempt(), ignoreRedirect);

}
}, 1, TimeUnit.SECONDS);
Expand Down

0 comments on commit 1a08db7

Please sign in to comment.