Skip to content

Commit

Permalink
CLUSTER_INFO checking
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 5, 2015
1 parent 1ac08cf commit c8b7d6f
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 35 deletions.
12 changes: 7 additions & 5 deletions src/main/java/org/redisson/connection/BaseLoadBalancer.java
Expand Up @@ -28,7 +28,6 @@
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;
import org.redisson.misc.ReclosableLatch; import org.redisson.misc.ReclosableLatch;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -38,15 +37,12 @@ abstract class BaseLoadBalancer implements LoadBalancer {


private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());


private Codec codec;

private MasterSlaveServersConfig config; private MasterSlaveServersConfig config;


private final ReclosableLatch clientsEmpty = new ReclosableLatch(); private final ReclosableLatch clientsEmpty = new ReclosableLatch();
final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>(); final Queue<SubscribesConnectionEntry> clients = new ConcurrentLinkedQueue<SubscribesConnectionEntry>();


public void init(Codec codec, MasterSlaveServersConfig config) { public void init(MasterSlaveServersConfig config) {
this.codec = codec;
this.config = config; this.config = config;
} }


Expand Down Expand Up @@ -227,4 +223,10 @@ public void shutdown() {
} }
} }


public void shutdownAsync() {
for (SubscribesConnectionEntry entry : clients) {
entry.getClient().shutdownAsync();
}
}

} }
Expand Up @@ -62,7 +62,7 @@ public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
RedisConnection connection = client.connect(); RedisConnection connection = client.connect();
String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES); String nodesValue = connection.sync(RedisCommands.CLUSTER_NODES);


Map<Integer, ClusterPartition> partitions = extractPartitions(nodesValue); Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
for (ClusterPartition partition : partitions.values()) { for (ClusterPartition partition : partitions.values()) {
addMasterEntry(partition, cfg); addMasterEntry(partition, cfg);
} }
Expand All @@ -85,14 +85,28 @@ protected void initEntry(MasterSlaveServersConfig config) {


private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) { private void addMasterEntry(ClusterPartition partition, ClusterServersConfig cfg) {
if (partition.isMasterFail()) { if (partition.isMasterFail()) {
log.warn("master: {} for slot range: {}-{} add failed. Reason - server has FAIL flag", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
return; return;
} }


MasterSlaveServersConfig c = create(cfg); RedisClient client = createClient(partition.getMasterAddress().getHost(), partition.getMasterAddress().getPort(), cfg.getTimeout());
try {
RedisConnection c = client.connect();
String info = c.sync(RedisCommands.CLUSTER_INFO);
Map<String, String> params = parseInfo(info);
if ("fail".equals(params.get("cluster_state"))) {
log.warn("master: {} for slot range: {}-{} add failed. Reason - cluster_state:fail", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
return;
}
} finally {
client.shutdownAsync();
}

MasterSlaveServersConfig config = create(cfg);
log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot()); log.info("master: {} for slot range: {}-{} added", partition.getMasterAddress(), partition.getStartSlot(), partition.getEndSlot());
c.setMasterAddress(partition.getMasterAddress()); config.setMasterAddress(partition.getMasterAddress());


SingleEntry entry = new SingleEntry(codec, this, c); SingleEntry entry = new SingleEntry(this, config);
entries.put(partition.getEndSlot(), entry); entries.put(partition.getEndSlot(), entry);
lastPartitions.put(partition.getEndSlot(), partition); lastPartitions.put(partition.getEndSlot(), partition);
} }
Expand All @@ -110,7 +124,7 @@ public void run() {


log.debug("cluster nodes state: {}", nodesValue); log.debug("cluster nodes state: {}", nodesValue);


Map<Integer, ClusterPartition> partitions = extractPartitions(nodesValue); Map<Integer, ClusterPartition> partitions = parsePartitions(nodesValue);
for (ClusterPartition newPart : partitions.values()) { for (ClusterPartition newPart : partitions.values()) {
for (ClusterPartition part : lastPartitions.values()) { for (ClusterPartition part : lastPartitions.values()) {
if (newPart.getMasterAddress().equals(part.getMasterAddress())) { if (newPart.getMasterAddress().equals(part.getMasterAddress())) {
Expand Down Expand Up @@ -187,7 +201,16 @@ private void checkSlotsChange(ClusterServersConfig cfg, Map<Integer, ClusterPart
} }
} }


private Map<Integer, ClusterPartition> extractPartitions(String nodesValue) { private Map<String, String> parseInfo(String value) {
Map<String, String> result = new HashMap<String, String>();
for (String entry : value.split("\r\n|\n")) {
String[] parts = entry.split(":");
result.put(parts[0], parts[1]);
}
return result;
}

private Map<Integer, ClusterPartition> parsePartitions(String nodesValue) {
Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>(); Map<String, ClusterPartition> partitions = new HashMap<String, ClusterPartition>();
Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>(); Map<Integer, ClusterPartition> result = new HashMap<Integer, ClusterPartition>();
List<ClusterNodeInfo> nodes = parse(nodesValue); List<ClusterNodeInfo> nodes = parse(nodesValue);
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/org/redisson/connection/LoadBalancer.java
Expand Up @@ -24,13 +24,15 @@


public interface LoadBalancer { public interface LoadBalancer {


void shutdownAsync();

void shutdown(); void shutdown();


void unfreeze(String host, int port); void unfreeze(String host, int port);


Collection<RedisPubSubConnection> freeze(String host, int port); Collection<RedisPubSubConnection> freeze(String host, int port);


void init(Codec codec, MasterSlaveServersConfig config); void init(MasterSlaveServersConfig config);


void add(SubscribesConnectionEntry entry); void add(SubscribesConnectionEntry entry);


Expand Down
Expand Up @@ -30,9 +30,11 @@
import org.redisson.client.BaseRedisPubSubListener; import org.redisson.client.BaseRedisPubSubListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.RedisPubSubListener; import org.redisson.client.RedisPubSubListener;
import org.redisson.client.codec.Codec; import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.pubsub.PubSubType; import org.redisson.client.protocol.pubsub.PubSubType;
import org.redisson.misc.InfinitySemaphoreLatch; import org.redisson.misc.InfinitySemaphoreLatch;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand Down Expand Up @@ -122,14 +124,10 @@ protected void init(MasterSlaveServersConfig config) {
} }


protected void initEntry(MasterSlaveServersConfig config) { protected void initEntry(MasterSlaveServersConfig config) {
MasterSlaveEntry entry = new MasterSlaveEntry(codec, this, config); MasterSlaveEntry entry = new MasterSlaveEntry(this, config);
entries.put(Integer.MAX_VALUE, entry); entries.put(Integer.MAX_VALUE, entry);
} }


public static void main(String[] args) {
System.out.println(1210 % 100);
}

protected void init(Config cfg) { protected void init(Config cfg) {
if (cfg.isUseLinuxNativeEpoll()) { if (cfg.isUseLinuxNativeEpoll()) {
this.group = new EpollEventLoopGroup(cfg.getThreads()); this.group = new EpollEventLoopGroup(cfg.getThreads());
Expand All @@ -141,14 +139,17 @@ protected void init(Config cfg) {
this.codec = cfg.getCodec(); this.codec = cfg.getCodec();
} }


@Override
public RedisClient createClient(String host, int port) { public RedisClient createClient(String host, int port) {
return createClient(host, port, config.getTimeout()); return createClient(host, port, config.getTimeout());
} }


@Override
public RedisClient createClient(String host, int port, int timeout) { public RedisClient createClient(String host, int port, int timeout) {
return new RedisClient(group, socketChannelClass, host, port, timeout); return new RedisClient(group, socketChannelClass, host, port, timeout);
} }


@Override
public <T> FutureListener<T> createReleaseWriteListener(final int slot, public <T> FutureListener<T> createReleaseWriteListener(final int slot,
final RedisConnection conn, final Timeout timeout) { final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
Expand All @@ -161,6 +162,7 @@ public void operationComplete(io.netty.util.concurrent.Future<T> future) throws
}; };
} }


@Override
public <T> FutureListener<T> createReleaseReadListener(final int slot, public <T> FutureListener<T> createReleaseReadListener(final int slot,
final RedisConnection conn, final Timeout timeout) { final RedisConnection conn, final Timeout timeout) {
return new FutureListener<T>() { return new FutureListener<T>() {
Expand Down
22 changes: 8 additions & 14 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -23,11 +23,8 @@
import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException; import org.redisson.client.RedisException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -45,25 +42,22 @@ public class MasterSlaveEntry {
volatile ConnectionEntry masterEntry; volatile ConnectionEntry masterEntry;


final MasterSlaveServersConfig config; final MasterSlaveServersConfig config;
final Codec codec;
final ConnectionManager connectionManager; final ConnectionManager connectionManager;


public MasterSlaveEntry(Codec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public MasterSlaveEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
this.codec = codec;
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.config = config; this.config = config;


slaveBalancer = config.getLoadBalancer(); slaveBalancer = config.getLoadBalancer();
slaveBalancer.init(codec, config); slaveBalancer.init(config);


List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses()); List<URI> addresses = new ArrayList<URI>(config.getSlaveAddresses());
addresses.add(config.getMasterAddress()); addresses.add(config.getMasterAddress());
for (URI address : addresses) { for (URI address : addresses) {
RedisClient client = connectionManager.createClient(address.getHost(), address.getPort()); RedisClient client = connectionManager.createClient(address.getHost(), address.getPort());
SubscribesConnectionEntry entry = new SubscribesConnectionEntry(client, slaveBalancer.add(new SubscribesConnectionEntry(client,
config.getSlaveConnectionPoolSize(), this.config.getSlaveConnectionPoolSize(),
config.getSlaveSubscriptionConnectionPoolSize()); this.config.getSlaveSubscriptionConnectionPoolSize()));
slaveBalancer.add(entry);
} }
if (config.getSlaveAddresses().size() > 1) { if (config.getSlaveAddresses().size() > 1) {
slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); slaveDown(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
Expand All @@ -73,8 +67,8 @@ public MasterSlaveEntry(Codec codec, ConnectionManager connectionManager, Master
} }


public void setupMasterEntry(String host, int port) { public void setupMasterEntry(String host, int port) {
RedisClient masterClient = connectionManager.createClient(host, port); RedisClient client = connectionManager.createClient(host, port);
masterEntry = new ConnectionEntry(masterClient, config.getMasterConnectionPoolSize()); masterEntry = new ConnectionEntry(client, config.getMasterConnectionPoolSize());
} }


public Collection<RedisPubSubConnection> slaveDown(String host, int port) { public Collection<RedisPubSubConnection> slaveDown(String host, int port) {
Expand Down Expand Up @@ -113,7 +107,7 @@ public void changeMaster(String host, int port) {


public void shutdownMasterAsync() { public void shutdownMasterAsync() {
masterEntry.getClient().shutdownAsync(); masterEntry.getClient().shutdownAsync();
slaveBalancer.shutdown(); slaveBalancer.shutdownAsync();
} }


public RedisConnection connectionWriteOp() { public RedisConnection connectionWriteOp() {
Expand Down
Expand Up @@ -40,7 +40,7 @@ public SingleConnectionManager(SingleServerConfig cfg, Config config) {


@Override @Override
protected void initEntry(MasterSlaveServersConfig config) { protected void initEntry(MasterSlaveServersConfig config) {
SingleEntry entry = new SingleEntry(codec, this, config); SingleEntry entry = new SingleEntry(this, config);
entries.put(Integer.MAX_VALUE, entry); entries.put(Integer.MAX_VALUE, entry);
} }


Expand Down
5 changes: 2 additions & 3 deletions src/main/java/org/redisson/connection/SingleEntry.java
Expand Up @@ -20,13 +20,12 @@
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.client.codec.Codec;
import org.redisson.client.protocol.RedisCommands; import org.redisson.client.protocol.RedisCommands;


public class SingleEntry extends MasterSlaveEntry { public class SingleEntry extends MasterSlaveEntry {


public SingleEntry(Codec codec, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public SingleEntry(ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(codec, connectionManager, config); super(connectionManager, config);
} }


@Override @Override
Expand Down

0 comments on commit c8b7d6f

Please sign in to comment.