Skip to content

Commit

Permalink
Connections restore with Sentinel servers fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Dec 25, 2015
1 parent c20e550 commit 76eebdd
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 28 deletions.
Expand Up @@ -545,7 +545,6 @@ public void operationComplete(Future<PubSubConnectionEntry> future)
protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
MasterSlaveEntry entry = getEntry(slotRange); MasterSlaveEntry entry = getEntry(slotRange);
slaveDown(entry, host, port, freezeReason); slaveDown(entry, host, port, freezeReason);
log.info("slave: {}:{} has down", host, port);
} }


protected void changeMaster(ClusterSlotRange slotRange, String host, int port) { protected void changeMaster(ClusterSlotRange slotRange, String host, int port) {
Expand Down
11 changes: 7 additions & 4 deletions src/main/java/org/redisson/connection/MasterSlaveEntry.java
Expand Up @@ -92,8 +92,9 @@ public Collection<RedisPubSubConnection> slaveDown(String host, int port, Freeze
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (slaveBalancer.getAvailableClients() == 0) { if (slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort());
}
} }
return conns; return conns;
} }
Expand All @@ -120,15 +121,17 @@ public RedisClient getClient() {
return masterEntry.getClient(); return masterEntry.getClient();
} }


public void slaveUp(String host, int port, FreezeReason freezeReason) { public boolean slaveUp(String host, int port, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(host, port, freezeReason)) { if (!slaveBalancer.unfreeze(host, port, freezeReason)) {
return; return false;
} }

InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves // exclude master from slaves
if (!addr.getHostName().equals(host) || port != addr.getPort()) { if (!addr.getHostName().equals(host) || port != addr.getPort()) {
connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); connectionManager.slaveDown(this, addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM);
} }
return true;
} }


/** /**
Expand Down
Expand Up @@ -49,7 +49,6 @@ public class SentinelConnectionManager extends MasterSlaveConnectionManager {


private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<String, RedisClient> sentinels = PlatformDependent.newConcurrentHashMap();
private final AtomicReference<String> currentMaster = new AtomicReference<String>(); private final AtomicReference<String> currentMaster = new AtomicReference<String>();
private final ConcurrentMap<String, Boolean> freezeSlaves = PlatformDependent.newConcurrentHashMap();
private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap(); private final ConcurrentMap<String, Boolean> slaves = PlatformDependent.newConcurrentHashMap();




Expand Down Expand Up @@ -222,8 +221,10 @@ protected void onSlaveAdded(URI addr, String msg) {


// to avoid addition twice // to avoid addition twice
if (slaves.putIfAbsent(slaveAddr, true) == null) { if (slaves.putIfAbsent(slaveAddr, true) == null) {
addSlave(ip, Integer.valueOf(port)); getEntry(singleSlotRange).addSlave(ip, Integer.valueOf(port));
log.info("slave: {} added", slaveAddr); log.info("slave: {} added", slaveAddr);
} else {
slaveUp(ip, port);
} }
} else { } else {
log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort()); log.warn("onSlaveAdded. Invalid message: {} from Sentinel {}:{}", msg, addr.getHost(), addr.getPort());
Expand Down Expand Up @@ -266,11 +267,8 @@ private void onNodeDown(URI sentinelAddr, String msg) {
} }


private void slaveDown(String ip, String port) { private void slaveDown(String ip, String port) {
// to avoid freeze twice slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER);
String addr = ip + ":" + port; log.info("slave: {}:{} has down", ip, port);
if (freezeSlaves.putIfAbsent(addr, true) == null) {
slaveDown(singleSlotRange, ip, Integer.valueOf(port), FreezeReason.MANAGER);
}
} }


private void onNodeUp(URI addr, String msg) { private void onNodeUp(URI addr, String msg) {
Expand All @@ -281,11 +279,7 @@ private void onNodeUp(URI addr, String msg) {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];


String slaveAddr = ip + ":" + port; slaveUp(ip, port);
if (freezeSlaves.remove(slaveAddr) != null) {
slaveUp(ip, Integer.valueOf(port));
log.info("slave: {} has up", slaveAddr);
}
} else if ("master".equals(parts[0])) { } else if ("master".equals(parts[0])) {
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];
Expand All @@ -303,6 +297,13 @@ private void onNodeUp(URI addr, String msg) {
} }
} }


private void slaveUp(String ip, String port) {
if (getEntry(singleSlotRange).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr);
}
}

private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) { private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
String[] parts = msg.split(" "); String[] parts = msg.split(" ");


Expand All @@ -324,14 +325,6 @@ private void onMasterChange(SentinelServersConfig cfg, URI addr, String msg) {
} }
} }


private void addSlave(String host, int port) {
getEntry(0).addSlave(host, port);
}

private void slaveUp(String host, int port) {
getEntry(0).slaveUp(host, port, FreezeReason.MANAGER);
}

@Override @Override
public void shutdown() { public void shutdown() {
super.shutdown(); super.shutdown();
Expand Down
Expand Up @@ -134,9 +134,11 @@ public Collection<RedisPubSubConnection> freeze(String host, int port, FreezeRea
connection.closeAsync(); connection.closeAsync();
} }


List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections()); synchronized (connectionEntry) {
connectionEntry.getAllSubscribeConnections().clear(); List<RedisPubSubConnection> list = new ArrayList<RedisPubSubConnection>(connectionEntry.getAllSubscribeConnections());
return list; connectionEntry.getAllSubscribeConnections().clear();
return list;
}
} }


public Future<RedisPubSubConnection> nextPubSubConnection() { public Future<RedisPubSubConnection> nextPubSubConnection() {
Expand Down

0 comments on commit 76eebdd

Please sign in to comment.