Skip to content

Commit

Permalink
LoadBalancerManager should use ip only to map ClientConnectionsEntry. #…
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Aug 25, 2017
1 parent f1e4cad commit f249558
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 50 deletions.
Expand Up @@ -424,7 +424,7 @@ private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition c
aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); aliveSlaves.removeAll(newPart.getFailedSlaveAddresses());
for (URI uri : aliveSlaves) { for (URI uri : aliveSlaves) {
currentPart.removeFailedSlaveAddress(uri); currentPart.removeFailedSlaveAddress(uri);
if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
Expand All @@ -433,7 +433,7 @@ private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition c
failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); failedSlaves.removeAll(currentPart.getFailedSlaveAddresses());
for (URI uri : failedSlaves) { for (URI uri : failedSlaves) {
currentPart.addFailedSlaveAddress(uri); currentPart.addFailedSlaveAddress(uri);
if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
Expand All @@ -446,7 +446,7 @@ private Set<URI> addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPart
for (URI uri : removedSlaves) { for (URI uri : removedSlaves) {
currentPart.removeSlaveAddress(uri); currentPart.removeSlaveAddress(uri);


if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
} }
Expand All @@ -464,7 +464,7 @@ public void operationComplete(Future<Void> future) throws Exception {
} }


currentPart.addSlaveAddress(uri); currentPart.addSlaveAddress(uri);
entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); entry.slaveUp(uri, FreezeReason.MANAGER);
log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges());
} }
}); });
Expand Down
Expand Up @@ -683,10 +683,6 @@ public MasterSlaveEntry getEntry(int slot) {
return entries.get(slot); return entries.get(slot);
} }


protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) {
getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason);
}

protected void changeMaster(int slot, URI address) { protected void changeMaster(int slot, URI address) {
getEntry(slot).changeMaster(address); getEntry(slot).changeMaster(address);
} }
Expand Down
Expand Up @@ -129,8 +129,8 @@ private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReaso
return slaveDown(e, freezeReason == FreezeReason.SYSTEM); return slaveDown(e, freezeReason == FreezeReason.SYSTEM);
} }


public boolean slaveDown(String host, int port, FreezeReason freezeReason) { public boolean slaveDown(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason); ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason);
if (entry == null) { if (entry == null) {
return false; return false;
} }
Expand All @@ -141,9 +141,9 @@ public boolean slaveDown(String host, int port, FreezeReason freezeReason) {
private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) {
// add master as slave if no more slaves available // add master as slave if no more slaves available
if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) {
InetSocketAddress addr = masterEntry.getClient().getAddr(); URI addr = masterEntry.getClient().getConfig().getAddress();
if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) { if (slaveUp(addr, FreezeReason.SYSTEM)) {
log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); log.info("master {} used as slave", addr);
} }
} }


Expand Down Expand Up @@ -309,7 +309,11 @@ public void operationComplete(ChannelFuture future) throws Exception {
public boolean hasSlave(InetSocketAddress addr) { public boolean hasSlave(InetSocketAddress addr) {
return slaveBalancer.contains(addr); return slaveBalancer.contains(addr);
} }


public boolean hasSlave(String addr) {
return slaveBalancer.contains(addr);
}

public RFuture<Void> addSlave(URI address) { public RFuture<Void> addSlave(URI address) {
return addSlave(address, true, NodeType.SLAVE); return addSlave(address, true, NodeType.SLAVE);
} }
Expand All @@ -334,17 +338,18 @@ public RedisClient getClient() {
return masterEntry.getClient(); return masterEntry.getClient();
} }


public boolean slaveUp(String host, int port, FreezeReason freezeReason) { public boolean slaveUp(URI address, FreezeReason freezeReason) {
if (!slaveBalancer.unfreeze(host, port, freezeReason)) { if (!slaveBalancer.unfreeze(address, freezeReason)) {
return false; return false;
} }


InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort());
InetSocketAddress addr = masterEntry.getClient().getAddr(); InetSocketAddress addr = masterEntry.getClient().getAddr();
// exclude master from slaves // exclude master from slaves
if (config.getReadMode() == ReadMode.SLAVE if (config.getReadMode() == ReadMode.SLAVE
&& (!addr.getHostName().equals(host) || port != addr.getPort())) { && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) {
slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); log.info("master {} excluded from slaves", addr);
} }
return true; return true;
} }
Expand All @@ -369,7 +374,7 @@ public void operationComplete(Future<Void> future) throws Exception {
// more than one slave available, so master can be removed from slaves // more than one slave available, so master can be removed from slaves
if (config.getReadMode() == ReadMode.SLAVE if (config.getReadMode() == ReadMode.SLAVE
&& slaveBalancer.getAvailableClients() > 1) { && slaveBalancer.getAvailableClients() > 1) {
slaveDown(address.getHost(), address.getPort(), FreezeReason.SYSTEM); slaveDown(address, FreezeReason.SYSTEM);
} }
connectionManager.shutdownAsync(oldMaster.getClient()); connectionManager.shutdownAsync(oldMaster.getClient());
} }
Expand Down
Expand Up @@ -171,7 +171,7 @@ private RFuture<RedisPubSubConnection> registerSentinel(final SentinelServersCon
@Override @Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception { public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) { if (!future.isSuccess()) {
log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); log.warn("Can't connect to sentinel: {}", addr);
return; return;
} }


Expand Down Expand Up @@ -220,8 +220,7 @@ protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlav
String ip = parts[2]; String ip = parts[2];
String port = parts[3]; String port = parts[3];


String addr = createAddress(ip, port); URI uri = convert(ip, port);
URI uri = URIBuilder.create(addr);
registerSentinel(cfg, uri, c); registerSentinel(cfg, uri, c);
} }
} }
Expand Down Expand Up @@ -253,11 +252,13 @@ public void operationComplete(Future<Void> future) throws Exception {
return; return;
} }


if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (entry.slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} added", slaveAddr); log.info("slave: {} added", slaveAddr);
} }
} }

}); });
} else { } else {
slaveUp(ip, port); slaveUp(ip, port);
Expand All @@ -267,6 +268,12 @@ public void operationComplete(Future<Void> future) throws Exception {
} }
} }


protected URI convert(String ip, String port) {
String addr = createAddress(ip, port);
URI uri = URIBuilder.create(addr);
return uri;
}

private void onNodeDown(URI sentinelAddr, String msg) { private void onNodeDown(URI sentinelAddr, String msg) {
String[] parts = msg.split(" "); String[] parts = msg.split(" ");


Expand Down Expand Up @@ -309,7 +316,8 @@ private void slaveDown(String ip, String port) {
log.warn("slave: {}:{} has down", ip, port); log.warn("slave: {}:{} has down", ip, port);
} else { } else {
MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot());
if (entry.slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (entry.slaveDown(uri, FreezeReason.MANAGER)) {
log.warn("slave: {}:{} has down", ip, port); log.warn("slave: {}:{} has down", ip, port);
} }
} }
Expand Down Expand Up @@ -367,7 +375,8 @@ private void slaveUp(String ip, String port) {
return; return;
} }


if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { URI uri = convert(ip, port);
if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) {
String slaveAddr = ip + ":" + port; String slaveAddr = ip + ":" + port;
log.info("slave: {} has up", slaveAddr); log.info("slave: {} has up", slaveAddr);
} }
Expand Down
Expand Up @@ -16,6 +16,7 @@
package org.redisson.connection.balancer; package org.redisson.connection.balancer;


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


Expand Down Expand Up @@ -44,9 +45,10 @@ public class LoadBalancerManager {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());


private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubConnectionPool; private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool slaveConnectionPool; private final SlaveConnectionPool slaveConnectionPool;

private final Map<String, ClientConnectionsEntry> ip2Entry = PlatformDependent.newConcurrentHashMap();


public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
Expand All @@ -65,7 +67,8 @@ public void operationComplete(Future<Void> future) throws Exception {
return; return;
} }
if (counter.decrementAndGet() == 0) { if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry); String addr = convert(entry.getClient().getConfig().getAddress());
ip2Entry.put(addr, entry);
result.trySuccess(null); result.trySuccess(null);
} }
} }
Expand All @@ -80,19 +83,18 @@ public void operationComplete(Future<Void> future) throws Exception {


public int getAvailableClients() { public int getAvailableClients() {
int count = 0; int count = 0;
for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) { for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) {
if (!connectionEntry.isFreezed()) { if (!connectionEntry.isFreezed()) {
count++; count++;
} }
} }
return count; return count;
} }


public boolean unfreeze(String host, int port, FreezeReason freezeReason) { public boolean unfreeze(URI address, FreezeReason freezeReason) {
InetSocketAddress addr = new InetSocketAddress(host, port); ClientConnectionsEntry entry = getEntry(address);
ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry == null) { if (entry == null) {
throw new IllegalStateException("Can't find " + addr + " in slaves!"); throw new IllegalStateException("Can't find " + address + " in slaves!");
} }


synchronized (entry) { synchronized (entry) {
Expand All @@ -111,12 +113,21 @@ public boolean unfreeze(String host, int port, FreezeReason freezeReason) {
return false; return false;
} }


public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { private String convert(URI address) {
InetSocketAddress addr = new InetSocketAddress(host, port); InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort());
ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); return addr.getAddress().getHostAddress() + ":" + addr.getPort();
}

public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) {
ClientConnectionsEntry connectionEntry = getEntry(address);
return freeze(connectionEntry, freezeReason); return freeze(connectionEntry, freezeReason);
} }


protected ClientConnectionsEntry getEntry(URI address) {
String addr = convert(address);
return ip2Entry.get(addr);
}

public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) {
if (connectionEntry == null) { if (connectionEntry == null) {
return null; return null;
Expand All @@ -143,11 +154,15 @@ public RFuture<RedisPubSubConnection> nextPubSubConnection() {
} }


public boolean contains(InetSocketAddress addr) { public boolean contains(InetSocketAddress addr) {
return addr2Entry.containsKey(addr); return ip2Entry.containsKey(addr.getAddress().getHostAddress() + ":" + addr.getPort());
}

public boolean contains(String addr) {
return ip2Entry.containsKey(addr);
} }


public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) { public RFuture<RedisConnection> getConnection(RedisCommand<?> command, InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr); ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress());
if (entry != null) { if (entry != null) {
return slaveConnectionPool.get(command, entry); return slaveConnectionPool.get(command, entry);
} }
Expand All @@ -160,23 +175,23 @@ public RFuture<RedisConnection> nextConnection(RedisCommand<?> command) {
} }


public void returnPubSubConnection(RedisPubSubConnection connection) { public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
pubSubConnectionPool.returnConnection(entry, connection); pubSubConnectionPool.returnConnection(entry, connection);
} }


public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress());
slaveConnectionPool.returnConnection(entry, connection); slaveConnectionPool.returnConnection(entry, connection);
} }


public void shutdown() { public void shutdown() {
for (ClientConnectionsEntry entry : addr2Entry.values()) { for (ClientConnectionsEntry entry : ip2Entry.values()) {
entry.getClient().shutdown(); entry.getClient().shutdown();
} }
} }


public void shutdownAsync() { public void shutdownAsync() {
for (ClientConnectionsEntry entry : addr2Entry.values()) { for (ClientConnectionsEntry entry : ip2Entry.values()) {
connectionManager.shutdownAsync(entry.getClient()); connectionManager.shutdownAsync(entry.getClient());
} }
} }
Expand Down
Expand Up @@ -296,7 +296,7 @@ private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promi


private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) { private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) { if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry); checkForReconnect(entry, cause);
} }


releaseConnection(entry); releaseConnection(entry);
Expand All @@ -308,7 +308,7 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T
int attempts = entry.incFailedAttempts(); int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) { if (attempts == config.getFailedAttempts()) {
conn.closeAsync(); conn.closeAsync();
checkForReconnect(entry); checkForReconnect(entry, null);
} else if (attempts < config.getFailedAttempts()) { } else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn); releaseConnection(entry, conn);
} else { } else {
Expand All @@ -321,15 +321,14 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T
promise.tryFailure(cause); promise.tryFailure(cause);
} }


private void checkForReconnect(ClientConnectionsEntry entry) { private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry); scheduleCheck(entry);
} else { } else {
if (entry.freezeMaster(FreezeReason.RECONNECT)) { if (entry.freezeMaster(FreezeReason.RECONNECT)) {
log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause);
scheduleCheck(entry); scheduleCheck(entry);
} }
} }
Expand Down Expand Up @@ -385,7 +384,7 @@ public void operationComplete(Future<String> future) throws Exception {
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr()); log.info("slave {} successfully reconnected", entry.getClient().getAddr());
} else { } else {
synchronized (entry) { synchronized (entry) {
Expand Down

0 comments on commit f249558

Please sign in to comment.