Skip to content

Commit

Permalink
commandTimeout param added to RedisClient object
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 5, 2016
1 parent 7f0cc3e commit 5f16318
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 27 deletions.
7 changes: 3 additions & 4 deletions src/main/java/org/redisson/RedisNodes.java
Expand Up @@ -22,7 +22,6 @@
import java.util.Map; import java.util.Map;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;


import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
Expand Down Expand Up @@ -80,7 +79,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
connectionFuture.addListener(new FutureListener<RedisConnection>() { connectionFuture.addListener(new FutureListener<RedisConnection>() {
@Override @Override
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
Future<String> r = c.async(RedisCommands.PING); Future<String> r = c.async(connectionManager.getConfig().getPingTimeout(), RedisCommands.PING);
result.put(c, r); result.put(c, r);
latch.countDown(); latch.countDown();
} }
Expand All @@ -94,7 +93,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {


long time = System.currentTimeMillis(); long time = System.currentTimeMillis();
try { try {
latch.await(connectionManager.getConfig().getConnectTimeout(), TimeUnit.MILLISECONDS); latch.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
} }
Expand All @@ -110,7 +109,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
boolean res = true; boolean res = true;
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) { for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
Future<String> f = entry.getValue(); Future<String> f = entry.getValue();
f.awaitUninterruptibly(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS); f.awaitUninterruptibly();
if (!"PONG".equals(f.getNow())) { if (!"PONG".equals(f.getNow())) {
res = false; res = false;
} }
Expand Down
27 changes: 19 additions & 8 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -46,13 +46,19 @@
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.URIBuilder; import org.redisson.misc.URIBuilder;


/**
* Low-level Redis client
*
* @author Nikita Koksharov
*
*/
public class RedisClient { public class RedisClient {


private final Bootstrap bootstrap; private final Bootstrap bootstrap;
private final InetSocketAddress addr; private final InetSocketAddress addr;
private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);


private final long timeout; private final long commandTimeout;
private boolean hasOwnGroup; private boolean hasOwnGroup;


public RedisClient(String address) { public RedisClient(String address) {
Expand All @@ -69,15 +75,19 @@ public RedisClient(EventLoopGroup group, URI address) {
} }


public RedisClient(String host, int port) { public RedisClient(String host, int port) {
this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 60 * 1000); this(new NioEventLoopGroup(), NioSocketChannel.class, host, port, 3000);
hasOwnGroup = true; hasOwnGroup = true;
} }


public RedisClient(EventLoopGroup group, String host, int port) { public RedisClient(EventLoopGroup group, String host, int port) {
this(group, NioSocketChannel.class, host, port, 60 * 1000); this(group, NioSocketChannel.class, host, port, 3000);
} }


public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int timeout) { public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout) {
this(group, socketChannelClass, host, port, connectTimeout, 3000);
}

public RedisClient(EventLoopGroup group, Class<? extends SocketChannel> socketChannelClass, String host, int port, int connectTimeout, int commandTimeout) {
addr = new InetSocketAddress(host, port); addr = new InetSocketAddress(host, port);
bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr); bootstrap = new Bootstrap().channel(socketChannelClass).group(group).remoteAddress(addr);
bootstrap.handler(new ChannelInitializer<Channel>() { bootstrap.handler(new ChannelInitializer<Channel>() {
Expand All @@ -91,16 +101,17 @@ protected void initChannel(Channel ch) throws Exception {
} }
}); });


bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, timeout); bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout);
this.timeout = timeout; this.commandTimeout = commandTimeout;
} }



public InetSocketAddress getAddr() { public InetSocketAddress getAddr() {
return addr; return addr;
} }


long getTimeout() { public long getCommandTimeout() {
return timeout; return commandTimeout;
} }


public Bootstrap getBootstrap() { public Bootstrap getBootstrap() {
Expand Down
28 changes: 17 additions & 11 deletions src/main/java/org/redisson/client/RedisConnection.java
Expand Up @@ -122,8 +122,7 @@ public void operationComplete(Future<R> future) throws Exception {
}); });


try { try {
// TODO change connectTimeout to timeout if (!l.await(redisClient.getCommandTimeout(), TimeUnit.MILLISECONDS)) {
if (!l.await(redisClient.getTimeout(), TimeUnit.MILLISECONDS)) {
Promise<R> promise = (Promise<R>)future; Promise<R> promise = (Promise<R>)future;
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.setFailure(ex); promise.setFailure(ex);
Expand All @@ -143,8 +142,7 @@ public void operationComplete(Future<R> future) throws Exception {
} }


public <T> T sync(RedisStrictCommand<T> command, Object ... params) { public <T> T sync(RedisStrictCommand<T> command, Object ... params) {
Future<T> r = async(null, command, params); return sync(null, command, params);
return await(r);
} }


public <T, R> ChannelFuture send(CommandData<T, R> data) { public <T, R> ChannelFuture send(CommandData<T, R> data) {
Expand All @@ -156,29 +154,37 @@ public ChannelFuture send(CommandsData data) {
} }


public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params) {
Future<R> r = async(encoder, command, params); Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise();
return await(r); send(new CommandData<T, R>(promise, encoder, command, params));
return await(promise);
} }


public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) {
return async(null, command, params); return async(null, command, params);
} }

public <T, R> Future<R> async(long timeout, RedisCommand<T> command, Object ... params) {
return async(null, command, params);
}


public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise(); return async(-1, encoder, command, params);
send(new CommandData<T, R>(promise, encoder, command, params));
return promise;
} }


public <T, R> Future<R> asyncWithTimeout(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(long timeout, Codec encoder, RedisCommand<T> command, Object ... params) {
final Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<R> promise = ImmediateEventExecutor.INSTANCE.newPromise();
if (timeout == -1) {
timeout = redisClient.getCommandTimeout();
}

final ScheduledFuture<?> scheduledFuture = redisClient.getBootstrap().group().next().schedule(new Runnable() { final ScheduledFuture<?> scheduledFuture = redisClient.getBootstrap().group().next().schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr()); RedisTimeoutException ex = new RedisTimeoutException("Command execution timeout for " + redisClient.getAddr());
promise.tryFailure(ex); promise.tryFailure(ex);
} }
}, redisClient.getTimeout(), TimeUnit.MILLISECONDS); }, timeout, TimeUnit.MILLISECONDS);

promise.addListener(new FutureListener<R>() { promise.addListener(new FutureListener<R>() {
@Override @Override
public void operationComplete(Future<R> future) throws Exception { public void operationComplete(Future<R> future) throws Exception {
Expand Down
Expand Up @@ -200,7 +200,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
} }


final RedisConnection connection = future.getNow(); final RedisConnection connection = future.getNow();
Future<Map<String, String>> clusterFuture = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_INFO); Future<Map<String, String>> clusterFuture = connection.async(RedisCommands.CLUSTER_INFO);
clusterFuture.addListener(new FutureListener<Map<String, String>>() { clusterFuture.addListener(new FutureListener<Map<String, String>>() {


@Override @Override
Expand Down Expand Up @@ -322,7 +322,7 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
} }


private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator) { private void updateClusterState(final ClusterServersConfig cfg, final RedisConnection connection, final Iterator<URI> iterator) {
Future<List<ClusterNodeInfo>> future = connection.asyncWithTimeout(null, RedisCommands.CLUSTER_NODES); Future<List<ClusterNodeInfo>> future = connection.async(RedisCommands.CLUSTER_NODES);
future.addListener(new FutureListener<List<ClusterNodeInfo>>() { future.addListener(new FutureListener<List<ClusterNodeInfo>>() {
@Override @Override
public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception { public void operationComplete(Future<List<ClusterNodeInfo>> future) throws Exception {
Expand Down
Expand Up @@ -367,7 +367,7 @@ public void operationComplete(Future<Void> future)
}; };


if (entry.getConfig().getPassword() != null) { if (entry.getConfig().getPassword() != null) {
Future<Void> temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword()); Future<Void> temp = c.async(RedisCommands.AUTH, config.getPassword());


FutureListener<Void> listener = new FutureListener<Void> () { FutureListener<Void> listener = new FutureListener<Void> () {
@Override public void operationComplete (Future < Void > future)throws Exception { @Override public void operationComplete (Future < Void > future)throws Exception {
Expand All @@ -386,7 +386,7 @@ public void operationComplete(Future<Void> future)
} }


private void ping(RedisConnection c, final FutureListener<String> pingListener) { private void ping(RedisConnection c, final FutureListener<String> pingListener) {
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING); Future<String> f = c.async(RedisCommands.PING);
f.addListener(pingListener); f.addListener(pingListener);
} }


Expand Down

0 comments on commit 5f16318

Please sign in to comment.