Skip to content

Commit

Permalink
Node type added. #399
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Mar 1, 2016
1 parent 7ff02e4 commit a3b6bda
Show file tree
Hide file tree
Showing 14 changed files with 65 additions and 16 deletions.
14 changes: 14 additions & 0 deletions src/main/java/org/redisson/RedisNodes.java
Expand Up @@ -30,6 +30,7 @@
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.RedisClientEntry;
import org.redisson.core.Node;
import org.redisson.core.NodeType;
import org.redisson.core.NodesGroup;

import io.netty.util.concurrent.Future;
Expand All @@ -44,6 +45,19 @@ public RedisNodes(ConnectionManager connectionManager) {
this.connectionManager = connectionManager;
}

@Override
public Collection<N> getNodes(NodeType type) {
Collection<N> clients = (Collection<N>) connectionManager.getClients();
List<N> result = new ArrayList<N>();
for (N node : clients) {
if (node.getType().equals(type)) {
result.add(node);
}
}
return result;
}


@Override
public Collection<N> getNodes() {
return (Collection<N>) connectionManager.getClients();
Expand Down
Expand Up @@ -21,7 +21,7 @@
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.DefaultConnectionListener;
import org.redisson.connection.FutureConnectionListener;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;

public class ClusterConnectionListener extends DefaultConnectionListener {

Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -48,8 +49,6 @@ public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
private FreezeReason freezeReason;
final RedisClient client;

public enum NodeType {SLAVE, MASTER, SENTINEL}

private final NodeType nodeType;
private ConnectionManager connectionManager;

Expand Down
Expand Up @@ -17,7 +17,7 @@

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;

import io.netty.util.concurrent.Promise;

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/connection/ConnectionManager.java
Expand Up @@ -28,6 +28,7 @@
import org.redisson.client.protocol.RedisCommand;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch;

import io.netty.channel.EventLoopGroup;
Expand Down Expand Up @@ -87,7 +88,7 @@ public interface ConnectionManager {

RedisClient createClient(String host, int port, int timeout);

RedisClient createClient(String host, int port);
RedisClient createClient(NodeType type, String host, int port);

MasterSlaveEntry getEntry(InetSocketAddress addr);

Expand Down
Expand Up @@ -19,7 +19,7 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.core.NodeType;

import io.netty.util.concurrent.Promise;

Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.core.NodeType;
import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -259,14 +260,14 @@ protected MasterSlaveServersConfig create(BaseMasterSlaveServersConfig<?> cfg) {
}

@Override
public RedisClient createClient(String host, int port) {
public RedisClient createClient(NodeType type, String host, int port) {
RedisClient client = createClient(host, port, config.getConnectTimeout());
clients.add(new RedisClientEntry(client, this));
clients.add(new RedisClientEntry(client, this, type));
return client;
}

public void shutdownAsync(RedisClient client) {
clients.remove(new RedisClientEntry(client, this));
clients.remove(new RedisClientEntry(client, this, null));
client.shutdownAsync();
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -30,10 +30,10 @@
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.connection.pool.MasterConnectionPool;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -84,7 +84,7 @@ public List<Future<Void>> initSlaveBalancer(Collection<URI> disconnectedNodes) {
}

public Future<Void> setupMasterEntry(String host, int port) {
RedisClient client = connectionManager.createClient(host, port);
RedisClient client = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(client, config.getMasterConnectionMinimumIdleSize(), config.getMasterConnectionPoolSize(),
0, 0, connectionManager, NodeType.MASTER, config);
return writeConnectionHolder.add(masterEntry);
Expand All @@ -108,7 +108,7 @@ public Future<Void> addSlave(String host, int port) {
}

private Future<Void> addSlave(String host, int port, boolean freezed, NodeType mode) {
RedisClient client = connectionManager.createClient(host, port);
RedisClient client = connectionManager.createClient(NodeType.SLAVE, host, port);
ClientConnectionsEntry entry = new ClientConnectionsEntry(client,
this.config.getSlaveConnectionMinimumIdleSize(),
this.config.getSlaveConnectionPoolSize(),
Expand Down
10 changes: 9 additions & 1 deletion src/main/java/org/redisson/connection/RedisClientEntry.java
Expand Up @@ -20,6 +20,7 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.core.ClusterNode;
import org.redisson.core.NodeType;

import java.net.InetSocketAddress;
import java.util.Map;
Expand All @@ -28,11 +29,18 @@ public class RedisClientEntry implements ClusterNode {

private final RedisClient client;
private final ConnectionManager manager;
private final NodeType type;

public RedisClientEntry(RedisClient client, ConnectionManager manager) {
public RedisClientEntry(RedisClient client, ConnectionManager manager, NodeType type) {
super();
this.client = client;
this.manager = manager;
this.type = type;
}

@Override
public NodeType getType() {
return type;
}

public RedisClient getClient() {
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/redisson/connection/SingleEntry.java
Expand Up @@ -24,9 +24,9 @@
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.connection.pool.SinglePubSubConnectionPool;
import org.redisson.core.NodeType;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
Expand All @@ -43,7 +43,7 @@ public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectio

@Override
public Future<Void> setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port);
RedisClient masterClient = connectionManager.createClient(NodeType.MASTER, host, port);
masterEntry = new ClientConnectionsEntry(masterClient,
config.getMasterConnectionMinimumIdleSize(),
config.getMasterConnectionPoolSize(),
Expand Down
Expand Up @@ -28,9 +28,9 @@
import org.redisson.client.protocol.RedisCommands;
import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry;
import org.redisson.core.NodeType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/redisson/core/Node.java
Expand Up @@ -25,6 +25,15 @@
*/
public interface Node {

/**
* Returns node type
*
* @see {@link NodeType}
*
* @return
*/
NodeType getType();

/**
* Get Redis node address
*
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/org/redisson/core/NodeType.java
@@ -0,0 +1,7 @@
package org.redisson.core;

public enum NodeType {

MASTER, SLAVE

}
10 changes: 10 additions & 0 deletions src/main/java/org/redisson/core/NodesGroup.java
Expand Up @@ -41,6 +41,16 @@ public interface NodesGroup<N extends Node> {
*/
void removeConnectionListener(int listenerId);

/**
* Get all nodes by type
*
* @see {@link NodeType}
*
* @param type
* @return
*/
Collection<N> getNodes(NodeType type);

/**
* All Redis nodes used by Redisson.
* This collection may change during master change, cluster topology update and etc.
Expand Down

0 comments on commit a3b6bda

Please sign in to comment.