Skip to content

Commit

Permalink
Redis nodes ping added. #215, #138, #196
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 11, 2015
1 parent a44a3b6 commit 3ee81d3
Show file tree
Hide file tree
Showing 23 changed files with 558 additions and 49 deletions.
23 changes: 23 additions & 0 deletions src/main/java/org/redisson/BaseConfig.java
Expand Up @@ -18,6 +18,12 @@


class BaseConfig<T extends BaseConfig<T>> { class BaseConfig<T extends BaseConfig<T>> {


/**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation
*
*/
private int pingTimeout = 1000;

/** /**
* Redis operation execution timeout. * Redis operation execution timeout.
* Then amount is reached exception will be thrown in case of <b>sync</b> operation usage * Then amount is reached exception will be thrown in case of <b>sync</b> operation usage
Expand Down Expand Up @@ -60,6 +66,7 @@ class BaseConfig<T extends BaseConfig<T>> {
setDatabase(config.getDatabase()); setDatabase(config.getDatabase());
setTimeout(config.getTimeout()); setTimeout(config.getTimeout());
setClientName(config.getClientName()); setClientName(config.getClientName());
setPingTimeout(config.getPingTimeout());
} }


/** /**
Expand Down Expand Up @@ -162,4 +169,20 @@ public T setClientName(String clientName) {
this.clientName = clientName; this.clientName = clientName;
return (T) this; return (T) this;
} }

/**
* Ping timeout used in <code>Node.ping</code> and <code>Node.pingAll<code> operation
*
* @return
*/
public int getPingTimeout() {
return pingTimeout;
}
public T setPingTimeout(int pingTimeout) {
this.pingTimeout = pingTimeout;
return (T) this;
}



} }
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/Config.java
Expand Up @@ -158,6 +158,10 @@ void setMasterSlaveServersConfig(MasterSlaveServersConfig masterSlaveConnectionC
this.masterSlaveServersConfig = masterSlaveConnectionConfig; this.masterSlaveServersConfig = masterSlaveConnectionConfig;
} }


public boolean isClusterConfig() {
return clusterServersConfig != null;
}

public int getThreads() { public int getThreads() {
return threads; return threads;
} }
Expand Down
99 changes: 99 additions & 0 deletions src/main/java/org/redisson/RedisNodes.java
@@ -0,0 +1,99 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

public class RedisNodes<N extends Node> implements NodesGroup<N> {

private final ConnectionManager connectionManager;

public RedisNodes(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public Collection<N> getNodes() {
return (Collection<N>) connectionManager.getClients();
}

@Override
public boolean pingAll() {
List<RedisClientEntry> clients = new ArrayList<RedisClientEntry>(connectionManager.getClients());
final Map<RedisConnection, Future<String>> result = new HashMap<RedisConnection, Future<String>>(clients.size());
final CountDownLatch latch = new CountDownLatch(clients.size());
for (RedisClientEntry entry : clients) {
Future<RedisConnection> f = entry.getClient().connectAsync();
f.addListener(new FutureListener<RedisConnection>() {
@Override
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (future.isSuccess()) {
RedisConnection c = future.getNow();
Future<String> r = future.getNow().async(RedisCommands.PING);
result.put(c, r);
}
latch.countDown();
}
});
}

long time = System.currentTimeMillis();
try {
latch.await(connectionManager.getConfig().getPingTimeout(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

if (System.currentTimeMillis() - time >= connectionManager.getConfig().getPingTimeout()) {
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
entry.getKey().closeAsync();
}
return false;
}

boolean res = true;
for (Entry<RedisConnection, Future<String>> entry : result.entrySet()) {
Future<String> f = entry.getValue();
long timeout = Math.max(connectionManager.getConfig().getPingTimeout() - (System.currentTimeMillis() - time), 0);
f.awaitUninterruptibly(timeout, TimeUnit.MILLISECONDS);
if (!"PONG".equals(f.getNow())) {
res = false;
}
entry.getKey().closeAsync();
}

// true and no futures missed during client connection
return res && result.size() == clients.size();
}

}
28 changes: 28 additions & 0 deletions src/main/java/org/redisson/Redisson.java
Expand Up @@ -28,6 +28,9 @@
import org.redisson.connection.MasterSlaveConnectionManager; import org.redisson.connection.MasterSlaveConnectionManager;
import org.redisson.connection.SentinelConnectionManager; import org.redisson.connection.SentinelConnectionManager;
import org.redisson.connection.SingleConnectionManager; import org.redisson.connection.SingleConnectionManager;
import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicLong; import org.redisson.core.RAtomicLong;
import org.redisson.core.RBatch; import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
Expand Down Expand Up @@ -412,6 +415,8 @@ public Long onFinish() {
* @param keys - object names * @param keys - object names
* @return * @return
*/ */
// use RKeys.delete
@Deprecated
@Override @Override
public long delete(String ... keys) { public long delete(String ... keys) {
return commandExecutor.get(deleteAsync(keys)); return commandExecutor.get(deleteAsync(keys));
Expand All @@ -423,6 +428,8 @@ public long delete(String ... keys) {
* @param keys - object names * @param keys - object names
* @return * @return
*/ */
// use RKeys.deleteAsync
@Deprecated
@Override @Override
public Future<Long> deleteAsync(String ... keys) { public Future<Long> deleteAsync(String ... keys) {
return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback<Long, Long>() { return commandExecutor.writeAllAsync(RedisCommands.DEL, new SlotCallback<Long, Long>() {
Expand All @@ -439,6 +446,27 @@ public Long onFinish() {
}, (Object[])keys); }, (Object[])keys);
} }


/**
* Get Redis nodes group for server operations
*
* @return
*/
public NodesGroup<Node> getNodesGroup() {
return new RedisNodes<Node>(connectionManager);
}

/**
* Get Redis cluster nodes group for server operations
*
* @return
*/
public NodesGroup<ClusterNode> getClusterNodesGroup() {
if (!config.isClusterConfig()) {
throw new IllegalStateException("Redisson not in cluster mode!");
}
return new RedisNodes<ClusterNode>(connectionManager);
}

/** /**
* Delete all the keys of the currently selected database * Delete all the keys of the currently selected database
*/ */
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/org/redisson/RedissonClient.java
Expand Up @@ -18,6 +18,9 @@
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;


import org.redisson.core.ClusterNode;
import org.redisson.core.Node;
import org.redisson.core.NodesGroup;
import org.redisson.core.RAtomicLong; import org.redisson.core.RAtomicLong;
import org.redisson.core.RBatch; import org.redisson.core.RBatch;
import org.redisson.core.RBlockingQueue; import org.redisson.core.RBlockingQueue;
Expand Down Expand Up @@ -281,6 +284,20 @@ public interface RedissonClient {
@Deprecated @Deprecated
Future<Long> deleteAsync(String ... keys); Future<Long> deleteAsync(String ... keys);


/**
* Get Redis nodes group for server operations
*
* @return
*/
NodesGroup<Node> getNodesGroup();

/**
* Get Redis cluster nodes group for server operations
*
* @return
*/
NodesGroup<ClusterNode> getClusterNodesGroup();

/** /**
* Delete all the keys of the currently selected database * Delete all the keys of the currently selected database
*/ */
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -26,6 +26,7 @@
import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption; import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup; import io.netty.channel.EventLoopGroup;
Expand All @@ -35,7 +36,9 @@
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GlobalEventExecutor; import io.netty.util.concurrent.GlobalEventExecutor;
import io.netty.util.concurrent.Promise;


public class RedisClient { public class RedisClient {


Expand Down Expand Up @@ -89,6 +92,23 @@ public RedisConnection connect() {
} }
} }


public Future<RedisConnection> connectAsync() {
final Promise<RedisConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisConnection c = new RedisConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
return f;
}

public RedisPubSubConnection connectPubSub() { public RedisPubSubConnection connectPubSub() {
try { try {
ChannelFuture future = bootstrap.connect(); ChannelFuture future = bootstrap.connect();
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/redisson/client/RedisConnection.java
Expand Up @@ -89,6 +89,10 @@ public <T, R> R sync(Codec encoder, RedisCommand<T> command, Object ... params)
return await(r); return await(r);
} }


public <T, R> Future<R> async(RedisCommand<T> command, Object ... params) {
return async(null, command, params);
}

public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) { public <T, R> Future<R> async(Codec encoder, RedisCommand<T> command, Object ... params) {
Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise(); Promise<R> promise = redisClient.getBootstrap().group().next().<R>newPromise();
send(new CommandData<T, R>(promise, encoder, command, params)); send(new CommandData<T, R>(promise, encoder, command, params));
Expand Down
Expand Up @@ -37,6 +37,7 @@
import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder; import org.redisson.client.protocol.decoder.ObjectSetReplayDecoder;
import org.redisson.client.protocol.decoder.StringDataDecoder; import org.redisson.client.protocol.decoder.StringDataDecoder;
import org.redisson.client.protocol.decoder.StringListReplayDecoder; import org.redisson.client.protocol.decoder.StringListReplayDecoder;
import org.redisson.client.protocol.decoder.StringMapDataDecoder;
import org.redisson.client.protocol.decoder.StringMapReplayDecoder; import org.redisson.client.protocol.decoder.StringMapReplayDecoder;
import org.redisson.client.protocol.decoder.StringReplayDecoder; import org.redisson.client.protocol.decoder.StringReplayDecoder;
import org.redisson.client.protocol.pubsub.PubSubStatusDecoder; import org.redisson.client.protocol.pubsub.PubSubStatusDecoder;
Expand Down Expand Up @@ -152,7 +153,7 @@ public interface RedisCommands {
RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder()); RedisCommand<Object> PUNSUBSCRIBE = new RedisCommand<Object>("PUNSUBSCRIBE", new PubSubStatusDecoder());


RedisStrictCommand<String> CLUSTER_NODES = new RedisStrictCommand<String>("CLUSTER", "NODES", new StringDataDecoder()); RedisStrictCommand<String> CLUSTER_NODES = new RedisStrictCommand<String>("CLUSTER", "NODES", new StringDataDecoder());
RedisStrictCommand<String> CLUSTER_INFO = new RedisStrictCommand<String>("CLUSTER", "INFO", new StringDataDecoder()); RedisStrictCommand<Map<String, String>> CLUSTER_INFO = new RedisStrictCommand<Map<String, String>>("CLUSTER", "INFO", new StringMapDataDecoder());


RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder()); RedisStrictCommand<List<String>> SENTINEL_GET_MASTER_ADDR_BY_NAME = new RedisStrictCommand<List<String>>("SENTINEL", "GET-MASTER-ADDR-BY-NAME", new StringListReplayDecoder());
RedisStrictCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisStrictCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", new StringMapReplayDecoder()); RedisStrictCommand<List<Map<String, String>>> SENTINEL_SLAVES = new RedisStrictCommand<List<Map<String, String>>>("SENTINEL", "SLAVES", new StringMapReplayDecoder());
Expand Down
@@ -0,0 +1,40 @@
/**
* Copyright 2014 Nikita Koksharov, Nickolay Borbit
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.redisson.client.protocol.decoder;

import java.util.HashMap;
import java.util.Map;

import org.redisson.client.handler.State;
import org.redisson.client.protocol.Decoder;

import io.netty.buffer.ByteBuf;
import io.netty.util.CharsetUtil;

public class StringMapDataDecoder implements Decoder<Map<String, String>> {

@Override
public Map<String, String> decode(ByteBuf buf, State state) {
String value = buf.toString(CharsetUtil.UTF_8);
Map<String, String> result = new HashMap<String, String>();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
result.put(parts[0], parts[1]);
}
return result;
}

}
6 changes: 4 additions & 2 deletions src/main/java/org/redisson/connection/BaseLoadBalancer.java
Expand Up @@ -39,11 +39,13 @@ abstract class BaseLoadBalancer implements LoadBalancer {


private MasterSlaveServersConfig config; private MasterSlaveServersConfig config;


private ConnectionManager connectionManager;
private final ReclosableLatch clientsEmpty = new ReclosableLatch(); private final ReclosableLatch clientsEmpty = new ReclosableLatch();
final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>(); final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>();


public void init(MasterSlaveServersConfig config) { public void init(MasterSlaveServersConfig config, ConnectionManager connectionManager) {
this.config = config; this.config = config;
this.connectionManager = connectionManager;
} }


public synchronized void add(SubscribesConnectionEntry entry) { public synchronized void add(SubscribesConnectionEntry entry) {
Expand Down Expand Up @@ -225,7 +227,7 @@ public void shutdown() {


public void shutdownAsync() { public void shutdownAsync() {
for (SubscribesConnectionEntry entry : clients) { for (SubscribesConnectionEntry entry : clients) {
entry.getClient().shutdownAsync(); connectionManager.shutdownAsync(entry.getClient());
} }
} }


Expand Down

0 comments on commit 3ee81d3

Please sign in to comment.