Skip to content

Commit

Permalink
Send channelFuture support improvement. Reconnection detection speedup
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 24, 2015
1 parent bf64e3a commit 2019283
Show file tree
Hide file tree
Showing 15 changed files with 152 additions and 32 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/redisson/BaseConfig.java
Expand Up @@ -25,7 +25,7 @@ class BaseConfig<T extends BaseConfig<T>> {
*/
private int timeout = 60000;

private int retryAttempts = 5;
private int retryAttempts = 20;

private int retryInterval = 1000;

Expand Down
22 changes: 18 additions & 4 deletions src/main/java/org/redisson/CommandBatchExecutorService.java
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
Expand All @@ -35,6 +36,8 @@
import org.redisson.client.protocol.decoder.MultiDecoder;
import org.redisson.connection.ConnectionManager;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future;
Expand Down Expand Up @@ -170,7 +173,7 @@ public void execute(final Entry entry, final int slot, final Promise<Void> mainP
final Promise<Void> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

TimerTask timerTask = new TimerTask() {
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
Expand Down Expand Up @@ -199,10 +202,21 @@ public void run(Timeout timeout) throws Exception {
for (CommandEntry c : entry.getCommands()) {
list.add(c.getCommand());
}
connection.send(new CommandsData(attemptPromise, list));
ChannelFuture future = connection.send(new CommandsData(attemptPromise, list));

ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});

if (entry.isReadOnlyMode()) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
Expand All @@ -211,7 +225,7 @@ public void run(Timeout timeout) throws Exception {
}
} catch (RedisConnectionException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<Void>() {
@Override
Expand Down
23 changes: 19 additions & 4 deletions src/main/java/org/redisson/CommandExecutorService.java
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.atomic.AtomicReference;

import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionClosedException;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.client.RedisMovedException;
Expand All @@ -38,6 +39,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.DefaultPromise;
Expand Down Expand Up @@ -335,7 +338,7 @@ protected <V, R> void async(final boolean readOnlyMode, final int slot, final Mu
final Promise<R> attemptPromise = connectionManager.newPromise();
final AtomicReference<RedisException> ex = new AtomicReference<RedisException>();

TimerTask timerTask = new TimerTask() {
final TimerTask retryTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (attemptPromise.isDone()) {
Expand All @@ -360,10 +363,21 @@ public void run(Timeout timeout) throws Exception {
connection = connectionManager.connectionWriteOp(slot);
}
log.debug("getting connection for command {} via slot {} using {}", command, slot, connection.getRedisClient().getAddr());
connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));
ChannelFuture future = connection.send(new CommandData<V, R>(attemptPromise, messageDecoder, codec, command, params));

ex.set(new RedisTimeoutException());
Timeout timeout = connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);
final Timeout timeout = connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getTimeout(), TimeUnit.MILLISECONDS);

future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
timeout.cancel();
ex.set(new RedisConnectionClosedException("channel: " + future.channel() + " closed"));
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
}
});

if (readOnlyMode) {
attemptPromise.addListener(connectionManager.createReleaseReadListener(slot, connection, timeout));
Expand All @@ -372,7 +386,7 @@ public void run(Timeout timeout) throws Exception {
}
} catch (RedisConnectionException e) {
ex.set(e);
connectionManager.getTimer().newTimeout(timerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
}
attemptPromise.addListener(new FutureListener<R>() {
@Override
Expand All @@ -384,6 +398,7 @@ public void operationComplete(Future<R> future) throws Exception {

if (future.cause() instanceof RedisMovedException) {
RedisMovedException ex = (RedisMovedException)future.cause();
connectionManager.getTimer().newTimeout(retryTimerTask, connectionManager.getConfig().getRetryInterval(), TimeUnit.MILLISECONDS);
async(readOnlyMode, ex.getSlot(), messageDecoder, codec, command, params, mainPromise, attempt);
return;
}
Expand Down
1 change: 0 additions & 1 deletion src/main/java/org/redisson/SingleServerConfig.java
Expand Up @@ -16,7 +16,6 @@
package org.redisson;

import java.net.URI;
import java.net.URISyntaxException;

import org.redisson.misc.URIBuilder;

Expand Down
15 changes: 15 additions & 0 deletions src/main/java/org/redisson/SlotCallback.java
@@ -1,3 +1,18 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

public interface SlotCallback<T, R> {
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -107,5 +107,10 @@ public ChannelGroupFuture shutdownAsync() {
return channels.close();
}

@Override
public String toString() {
return "RedisClient [addr=" + addr + "]";
}

}

7 changes: 6 additions & 1 deletion src/main/java/org/redisson/client/RedisConnection.java
Expand Up @@ -91,7 +91,7 @@ public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params)

public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
channel.writeAndFlush(new CommandData<T, R>(promise, encoder, command, params));
send(new CommandData<T, R>(promise, encoder, command, params));
return promise;
}

Expand All @@ -113,4 +113,9 @@ public ChannelFuture closeAsync() {
return channel.close();
}

@Override
public String toString() {
return getClass().getSimpleName() + " [redisClient=" + redisClient + ", channel=" + channel + "]";
}

}
@@ -0,0 +1,30 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client;

public class RedisConnectionClosedException extends RedisException {

private static final long serialVersionUID = -4756928186967834601L;

public RedisConnectionClosedException(String msg) {
super(msg);
}

public RedisConnectionClosedException(String msg, Throwable e) {
super(msg, e);
}

}
14 changes: 8 additions & 6 deletions src/main/java/org/redisson/client/handler/CommandsQueue.java
Expand Up @@ -20,6 +20,7 @@

import org.redisson.client.protocol.CommandData;
import org.redisson.client.protocol.QueueCommand;
import org.redisson.client.protocol.QueueCommandHolder;

import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -39,7 +40,7 @@ public enum QueueCommands {NEXT_COMMAND}

public static final AttributeKey<QueueCommand> REPLAY = AttributeKey.valueOf("promise");

private final Queue<QueueCommand> queue = PlatformDependent.newMpscQueue();
private final Queue<QueueCommandHolder> queue = PlatformDependent.newMpscQueue();

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
Expand All @@ -55,10 +56,10 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof QueueCommand) {
QueueCommand data = (QueueCommand) msg;
if (data.getSended().get()) {
if (queue.peek() != null && queue.peek().getCommand() == data) {
super.write(ctx, msg, promise);
} else {
queue.add(data);
queue.add(new QueueCommandHolder(data, promise));
sendData(ctx);
}
} else {
Expand All @@ -67,8 +68,9 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
}

private void sendData(ChannelHandlerContext ctx) throws Exception {
QueueCommand data = queue.peek();
if (data != null && data.getSended().compareAndSet(false, true)) {
QueueCommandHolder command = queue.peek();
if (command != null && command.getSended().compareAndSet(false, true)) {
QueueCommand data = command.getCommand();
List<CommandData<Object, Object>> pubSubOps = data.getPubSubOperations();
if (!pubSubOps.isEmpty()) {
for (CommandData<Object, Object> cd : pubSubOps) {
Expand All @@ -79,7 +81,7 @@ private void sendData(ChannelHandlerContext ctx) throws Exception {
} else {
ctx.channel().attr(REPLAY).set(data);
}
ctx.channel().writeAndFlush(data);
ctx.channel().writeAndFlush(data, command.getChannelPromise());
}
}

Expand Down
8 changes: 1 addition & 7 deletions src/main/java/org/redisson/client/protocol/CommandData.java
Expand Up @@ -30,7 +30,6 @@ public class CommandData<T, R> implements QueueCommand {
final RedisCommand<T> command;
final Object[] params;
final Codec codec;
final AtomicBoolean sended = new AtomicBoolean();
final MultiDecoder<Object> messageDecoder;

public CommandData(Promise<R> promise, Codec codec, RedisCommand<T> command, Object[] params) {
Expand Down Expand Up @@ -61,19 +60,14 @@ public Promise<R> getPromise() {
return promise;
}

public AtomicBoolean getSended() {
return sended;
}

public Codec getCodec() {
return codec;
}

@Override
public String toString() {
return "CommandData [promise=" + promise + ", command=" + command + ", params="
+ Arrays.toString(params) + ", codec=" + codec + ", sended=" + sended + ", messageDecoder="
+ messageDecoder + "]";
+ Arrays.toString(params) + ", codec=" + codec + "]";
}

@Override
Expand Down
5 changes: 0 additions & 5 deletions src/main/java/org/redisson/client/protocol/CommandsData.java
Expand Up @@ -26,7 +26,6 @@ public class CommandsData implements QueueCommand {

private final List<CommandData<?, ?>> commands;
private final Promise<Void> promise;
private final AtomicBoolean sended = new AtomicBoolean();

public CommandsData(Promise<Void> promise, List<CommandData<?, ?>> commands) {
super();
Expand All @@ -42,10 +41,6 @@ public Promise<Void> getPromise() {
return commands;
}

public AtomicBoolean getSended() {
return sended;
}

@Override
public List<CommandData<Object, Object>> getPubSubOperations() {
List<CommandData<Object, Object>> result = new ArrayList<CommandData<Object, Object>>();
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/org/redisson/client/protocol/QueueCommand.java
Expand Up @@ -16,12 +16,9 @@
package org.redisson.client.protocol;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;

public interface QueueCommand {

List<CommandData<Object, Object>> getPubSubOperations();

AtomicBoolean getSended();

}
46 changes: 46 additions & 0 deletions src/main/java/org/redisson/client/protocol/QueueCommandHolder.java
@@ -0,0 +1,46 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol;

import java.util.concurrent.atomic.AtomicBoolean;

import io.netty.channel.ChannelPromise;

public class QueueCommandHolder {

final AtomicBoolean sended = new AtomicBoolean();
final ChannelPromise channelPromise;
final QueueCommand command;

public QueueCommandHolder(QueueCommand command, ChannelPromise channelPromise) {
super();
this.command = command;
this.channelPromise = channelPromise;
}

public QueueCommand getCommand() {
return command;
}

public ChannelPromise getChannelPromise() {
return channelPromise;
}

public AtomicBoolean getSended() {
return sended;
}

}
Expand Up @@ -189,6 +189,7 @@ public RedisConnection nextConnection() {
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
log.debug("new connection created: {}", conn);

return conn;
} catch (RedisConnectionException e) {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -134,6 +134,8 @@ public RedisConnection connectionWriteOp() {
if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName());
}
log.debug("new connection created: {}", conn);

return conn;
} catch (RedisConnectionException e) {
masterEntry.getConnectionsSemaphore().release();
Expand Down

0 comments on commit 2019283

Please sign in to comment.