Skip to content

Commit

Permalink
Async reconnection to channel. #274
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Nov 4, 2015
1 parent 8410d6f commit 46138c7
Show file tree
Hide file tree
Showing 8 changed files with 129 additions and 52 deletions.
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/client/ReconnectListener.java
Expand Up @@ -15,8 +15,10 @@
*/ */
package org.redisson.client; package org.redisson.client;


import io.netty.util.concurrent.Promise;

public interface ReconnectListener { public interface ReconnectListener {


void onReconnect(RedisConnection redisConnection) throws RedisException; void onReconnect(RedisConnection redisConnection, Promise<RedisConnection> connectionFuture) throws RedisException;


} }
23 changes: 13 additions & 10 deletions src/main/java/org/redisson/client/handler/ConnectionWatchdog.java
Expand Up @@ -33,6 +33,9 @@
import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroup;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class ConnectionWatchdog extends ChannelInboundHandlerAdapter { public class ConnectionWatchdog extends ChannelInboundHandlerAdapter {


Expand Down Expand Up @@ -112,21 +115,21 @@ public void run() {


private void reconnect(final RedisConnection connection, final Channel channel) { private void reconnect(final RedisConnection connection, final Channel channel) {
if (connection.getReconnectListener() != null) { if (connection.getReconnectListener() != null) {
bootstrap.group().execute(new Runnable() { // new connection used only for channel init
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel);
Promise<RedisConnection> connectionFuture = bootstrap.group().next().newPromise();
connection.getReconnectListener().onReconnect(rc, connectionFuture);
connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void run() { public void operationComplete(Future<RedisConnection> future) throws Exception {
// new connection used only for channel init if (future.isSuccess()) {
RedisConnection rc = new RedisConnection(connection.getRedisClient(), channel); connection.updateChannel(channel);
connection.getReconnectListener().onReconnect(rc); resubscribe(connection);
connection.updateChannel(channel); }

resubscribe(connection);
} }

}); });
} else { } else {
connection.updateChannel(channel); connection.updateChannel(channel);

resubscribe(connection); resubscribe(connection);
} }
} }
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/client/protocol/RedisCommands.java
Expand Up @@ -136,8 +136,8 @@ public interface RedisCommands {
RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY"); RedisStrictCommand<Long> INCRBY = new RedisStrictCommand<Long>("INCRBY");
RedisStrictCommand<Long> DECR = new RedisStrictCommand<Long>("DECR"); RedisStrictCommand<Long> DECR = new RedisStrictCommand<Long>("DECR");


RedisStrictCommand<String> AUTH = new RedisStrictCommand<String>("AUTH", new StringReplayDecoder()); RedisStrictCommand<Void> AUTH = new RedisStrictCommand<Void>("AUTH", new VoidReplayConvertor());
RedisStrictCommand<String> SELECT = new RedisStrictCommand<String>("SELECT", new StringReplayDecoder()); RedisStrictCommand<Void> SELECT = new RedisStrictCommand<Void>("SELECT", new VoidReplayConvertor());
RedisStrictCommand<Boolean> CLIENT_SETNAME = new RedisStrictCommand<Boolean>("CLIENT", "SETNAME", new BooleanReplayConvertor()); RedisStrictCommand<Boolean> CLIENT_SETNAME = new RedisStrictCommand<Boolean>("CLIENT", "SETNAME", new BooleanReplayConvertor());
RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder()); RedisStrictCommand<String> CLIENT_GETNAME = new RedisStrictCommand<String>("CLIENT", "GETNAME", new StringDataDecoder());
RedisStrictCommand<Void> FLUSHDB = new RedisStrictCommand<Void>("FLUSHDB", new VoidReplayConvertor()); RedisStrictCommand<Void> FLUSHDB = new RedisStrictCommand<Void>("FLUSHDB", new VoidReplayConvertor());
Expand Down
Expand Up @@ -16,11 +16,11 @@
package org.redisson.cluster; package org.redisson.cluster;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.ConnectionEntry.Mode; import org.redisson.connection.ConnectionEntry.Mode;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;


public class ClusterConnectionListener extends DefaultConnectionListener { public class ClusterConnectionListener extends DefaultConnectionListener {


Expand All @@ -31,10 +31,10 @@ public ClusterConnectionListener(boolean readFromSlaves) {
} }


@Override @Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) throws RedisException { public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException {
super.onConnect(config, conn, serverMode); super.onConnect(config, serverMode, connectionListener);
if (serverMode == Mode.SLAVE && readFromSlaves) { if (serverMode == Mode.SLAVE && readFromSlaves) {
conn.sync(RedisCommands.READONLY); connectionListener.addCommand(RedisCommands.READONLY);
} }
} }


Expand Down
51 changes: 24 additions & 27 deletions src/main/java/org/redisson/connection/ConnectionEntry.java
Expand Up @@ -24,12 +24,12 @@
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class ConnectionEntry { public class ConnectionEntry {


Expand Down Expand Up @@ -93,6 +93,7 @@ public void releaseConnection(RedisConnection connection) {
} }


public Future<RedisConnection> connect(final MasterSlaveServersConfig config) { public Future<RedisConnection> connect(final MasterSlaveServersConfig config) {
final Promise<RedisConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisConnection> future = client.connectAsync(); Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() { future.addListener(new FutureListener<RedisConnection>() {
@Override @Override
Expand All @@ -103,31 +104,29 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
RedisConnection conn = future.getNow(); RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn); log.debug("new connection created: {}", conn);


connectListener.onConnect(config, conn, serverMode); FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
conn.setReconnectListener(new ReconnectListener() { connectListener.onConnect(config, serverMode, listener);
@Override listener.executeCommands();
public void onReconnect(RedisConnection conn) { addReconnectListener(config, conn);
connectListener.onConnect(config, conn, serverMode);
}
});
} }

}); });
return future; return connectionFuture;
} }


private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) { private void addReconnectListener(final MasterSlaveServersConfig config, RedisConnection conn) {
if (config.getPassword() != null) { conn.setReconnectListener(new ReconnectListener() {
conn.sync(RedisCommands.AUTH, config.getPassword()); @Override
} public void onReconnect(RedisConnection conn, Promise<RedisConnection> connectionFuture) {
if (config.getDatabase() != 0) { FutureConnectionListener<RedisConnection> listener = new FutureConnectionListener<RedisConnection>(connectionFuture, conn);
conn.sync(RedisCommands.SELECT, config.getDatabase()); connectListener.onConnect(config, serverMode, listener);
} listener.executeCommands();
if (config.getClientName() != null) { }
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); });
}
} }


public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) { public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) {
final Promise<RedisPubSubConnection> connectionFuture = client.getBootstrap().group().next().newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync(); Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() { future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override @Override
Expand All @@ -138,16 +137,14 @@ public void operationComplete(Future<RedisPubSubConnection> future) throws Excep
RedisPubSubConnection conn = future.getNow(); RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn); log.debug("new pubsub connection created: {}", conn);


connectListener.onConnect(config, conn, serverMode); FutureConnectionListener<RedisPubSubConnection> listener = new FutureConnectionListener<RedisPubSubConnection>(connectionFuture, conn);
conn.setReconnectListener(new ReconnectListener() { connectListener.onConnect(config, serverMode, listener);
@Override listener.executeCommands();
public void onReconnect(RedisConnection conn) {
connectListener.onConnect(config, conn, serverMode); addReconnectListener(config, conn);
}
});
} }
}); });
return future; return connectionFuture;
} }


@Override @Override
Expand Down
Expand Up @@ -16,12 +16,11 @@
package org.redisson.connection; package org.redisson.connection;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.connection.ConnectionEntry.Mode; import org.redisson.connection.ConnectionEntry.Mode;


public interface ConnectionListener { public interface ConnectionListener {


void onConnect(MasterSlaveServersConfig config, RedisConnection redisConnection, Mode serverMode) throws RedisException; void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener) throws RedisException;


} }
Expand Up @@ -16,24 +16,23 @@
package org.redisson.connection; package org.redisson.connection;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionEntry.Mode; import org.redisson.connection.ConnectionEntry.Mode;


public class DefaultConnectionListener implements ConnectionListener { public class DefaultConnectionListener implements ConnectionListener {


@Override @Override
public void onConnect(MasterSlaveServersConfig config, RedisConnection conn, Mode serverMode) public void onConnect(MasterSlaveServersConfig config, Mode serverMode, FutureConnectionListener connectionListener)
throws RedisException { throws RedisException {
if (config.getPassword() != null) { if (config.getPassword() != null) {
conn.sync(RedisCommands.AUTH, config.getPassword()); connectionListener.addCommand(RedisCommands.AUTH, config.getPassword());
} }
if (config.getDatabase() != 0) { if (config.getDatabase() != 0) {
conn.sync(RedisCommands.SELECT, config.getDatabase()); connectionListener.addCommand(RedisCommands.SELECT, config.getDatabase());
} }
if (config.getClientName() != null) { if (config.getClientName() != null) {
conn.sync(RedisCommands.CLIENT_SETNAME, config.getClientName()); connectionListener.addCommand(RedisCommands.CLIENT_SETNAME, config.getClientName());
} }
} }


Expand Down
@@ -0,0 +1,77 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.connection;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommand;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;

public class FutureConnectionListener<T extends RedisConnection> implements FutureListener<Object> {

private final AtomicInteger commandsCounter = new AtomicInteger();

private final Promise<T> connectionPromise;
private final T connection;
private final List<Runnable> commands = new ArrayList<Runnable>(4);

public FutureConnectionListener(Promise<T> connectionFuture, T connection) {
super();
this.connectionPromise = connectionFuture;
this.connection = connection;
}

public void addCommand(final RedisCommand<?> command, final Object ... params) {
commandsCounter.incrementAndGet();
commands.add(new Runnable() {
@Override
public void run() {
Future<Object> future = connection.async(command, params);
future.addListener(FutureConnectionListener.this);
}
});
}

public void executeCommands() {
if (commands.isEmpty()) {
connectionPromise.setSuccess(connection);
return;
}

for (Runnable command : commands) {
command.run();
}
commands.clear();
}

@Override
public void operationComplete(Future<Object> future) throws Exception {
if (!future.isSuccess()) {
connectionPromise.tryFailure(future.cause());
return;
}
if (commandsCounter.decrementAndGet() == 0) {
connectionPromise.trySuccess(connection);
}
}

}

0 comments on commit 46138c7

Please sign in to comment.