Skip to content

Commit

Permalink
Cluster slave discovery regression since 2.1.5 fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jan 25, 2016
1 parent e3cd375 commit 91e4d7e
Showing 1 changed file with 25 additions and 9 deletions.
34 changes: 25 additions & 9 deletions src/main/java/org/redisson/cluster/ClusterConnectionManager.java
Expand Up @@ -99,7 +99,7 @@ public ClusterConnectionManager(ClusterServersConfig cfg, Config config) {
throw new RedisConnectionException("Can't connect to servers!", lastException); throw new RedisConnectionException("Can't connect to servers!", lastException);
} }


monitorClusterChange(cfg); scheduleClusterChangeCheck(cfg);
} }


private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) { private Future<RedisConnection> connect(ClusterServersConfig cfg, final URI addr) {
Expand Down Expand Up @@ -203,7 +203,9 @@ public void operationComplete(Future<Map<String, String>> future) throws Excepti
List<Future<Void>> fs = e.initSlaveBalancer(config); List<Future<Void>> fs = e.initSlaveBalancer(config);
futures.addAll(fs); futures.addAll(fs);


log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges()); if (!partition.getSlaveAddresses().isEmpty()) {
log.info("slaves: {} added for slot ranges: {}", partition.getSlaveAddresses(), partition.getSlotRanges());
}
} }


Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort()); Future<Void> f = e.setupMasterEntry(config.getMasterAddress().getHost(), config.getMasterAddress().getPort());
Expand Down Expand Up @@ -232,8 +234,8 @@ public void operationComplete(Future<Void> future) throws Exception {
return result; return result;
} }


private void monitorClusterChange(final ClusterServersConfig cfg) { private void scheduleClusterChangeCheck(final ClusterServersConfig cfg) {
monitorFuture = GlobalEventExecutor.INSTANCE.scheduleWithFixedDelay(new Runnable() { monitorFuture = GlobalEventExecutor.INSTANCE.schedule(new Runnable() {
@Override @Override
public void run() { public void run() {
List<URI> nodes = new ArrayList<URI>(); List<URI> nodes = new ArrayList<URI>();
Expand All @@ -243,17 +245,19 @@ public void run() {
nodes.add(partition.getMasterAddress()); nodes.add(partition.getMasterAddress());
slaves.addAll(partition.getSlaveAddresses()); slaves.addAll(partition.getSlaveAddresses());
} }
// master nodes first
nodes.addAll(slaves); nodes.addAll(slaves);


checkClusterState(cfg, nodes.iterator(), lastException); checkClusterState(cfg, nodes.iterator(), lastException);
} }


}, cfg.getScanInterval(), cfg.getScanInterval(), TimeUnit.MILLISECONDS); }, cfg.getScanInterval(), TimeUnit.MILLISECONDS);
} }


private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) { private void checkClusterState(final ClusterServersConfig cfg, final Iterator<URI> iterator, final AtomicReference<Throwable> lastException) {
if (!iterator.hasNext()) { if (!iterator.hasNext()) {
log.error("Can't update cluster state", lastException.get()); log.error("Can't update cluster state", lastException.get());
scheduleClusterChangeCheck(cfg);
return; return;
} }
URI uri = iterator.next(); URI uri = iterator.next();
Expand All @@ -263,6 +267,7 @@ private void checkClusterState(final ClusterServersConfig cfg, final Iterator<UR
public void operationComplete(Future<RedisConnection> future) throws Exception { public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
lastException.set(future.cause()); lastException.set(future.cause());
System.out.println("Can't connect!!!!!");
checkClusterState(cfg, iterator, lastException); checkClusterState(cfg, iterator, lastException);
return; return;
} }
Expand All @@ -280,6 +285,7 @@ private void updateClusterState(final ClusterServersConfig cfg, final RedisConne
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause()); log.error("Can't execute CLUSTER_NODES with " + connection.getRedisClient().getAddr(), future.cause());
scheduleClusterChangeCheck(cfg);
return; return;
} }


Expand All @@ -290,6 +296,7 @@ public void operationComplete(Future<String> future) throws Exception {
checkMasterNodesChange(newPartitions); checkMasterNodesChange(newPartitions);
checkSlaveNodesChange(newPartitions); checkSlaveNodesChange(newPartitions);
checkSlotsChange(cfg, newPartitions); checkSlotsChange(cfg, newPartitions);
scheduleClusterChangeCheck(cfg);
} }
}); });
} }
Expand All @@ -315,11 +322,20 @@ private void checkSlaveNodesChange(Collection<ClusterPartition> newPartitions) {
Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses()); Set<URI> addedSlaves = new HashSet<URI>(newPart.getSlaveAddresses());
addedSlaves.removeAll(currentPart.getSlaveAddresses()); addedSlaves.removeAll(currentPart.getSlaveAddresses());
for (URI uri : addedSlaves) { for (URI uri : addedSlaves) {
currentPart.addSlaveAddress(uri); Future<Void> future = entry.addSlave(uri.getHost(), uri.getPort());
future.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> future) throws Exception {
if (!future.isSuccess()) {
log.error("Can't add slave: " + uri, future.cause());
return;
}


entry.addSlave(uri.getHost(), uri.getPort()); currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER);
log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
}
});
} }


break; break;
Expand Down

0 comments on commit 91e4d7e

Please sign in to comment.