From f24955846b6dfef25e835d57a81468749ccc2abc Mon Sep 17 00:00:00 2001 From: Nikita Date: Fri, 25 Aug 2017 17:48:04 +0300 Subject: [PATCH] LoadBalancerManager should use ip only to map ClientConnectionsEntry. #945 --- .../cluster/ClusterConnectionManager.java | 8 ++-- .../MasterSlaveConnectionManager.java | 4 -- .../redisson/connection/MasterSlaveEntry.java | 29 +++++++----- .../connection/SentinelConnectionManager.java | 21 ++++++--- .../balancer/LoadBalancerManager.java | 47 ++++++++++++------- .../connection/pool/ConnectionPool.java | 15 +++--- 6 files changed, 74 insertions(+), 50 deletions(-) diff --git a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java index 3b1e4e23d21..a21ee968929 100644 --- a/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java +++ b/redisson/src/main/java/org/redisson/cluster/ClusterConnectionManager.java @@ -424,7 +424,7 @@ private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition c aliveSlaves.removeAll(newPart.getFailedSlaveAddresses()); for (URI uri : aliveSlaves) { currentPart.removeFailedSlaveAddress(uri); - if (entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { log.info("slave: {} has up for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -433,7 +433,7 @@ private void upDownSlaves(final MasterSlaveEntry entry, final ClusterPartition c failedSlaves.removeAll(currentPart.getFailedSlaveAddresses()); for (URI uri : failedSlaves) { currentPart.addFailedSlaveAddress(uri); - if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.warn("slave: {} has down for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -446,7 +446,7 @@ private Set addRemoveSlaves(final MasterSlaveEntry entry, final ClusterPart for (URI uri : removedSlaves) { currentPart.removeSlaveAddress(uri); - if (entry.slaveDown(uri.getHost(), uri.getPort(), FreezeReason.MANAGER)) { + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.info("slave {} removed for slot ranges: {}", uri, currentPart.getSlotRanges()); } } @@ -464,7 +464,7 @@ public void operationComplete(Future future) throws Exception { } currentPart.addSlaveAddress(uri); - entry.slaveUp(uri.getHost(), uri.getPort(), FreezeReason.MANAGER); + entry.slaveUp(uri, FreezeReason.MANAGER); log.info("slave: {} added for slot ranges: {}", uri, currentPart.getSlotRanges()); } }); diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 145c56a1fbc..fa78f53a23b 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -683,10 +683,6 @@ public MasterSlaveEntry getEntry(int slot) { return entries.get(slot); } - protected void slaveDown(ClusterSlotRange slotRange, String host, int port, FreezeReason freezeReason) { - getEntry(slotRange.getStartSlot()).slaveDown(host, port, freezeReason); - } - protected void changeMaster(int slot, URI address) { getEntry(slot).changeMaster(address); } diff --git a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java index c0d5b74444b..8b78fc11515 100644 --- a/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java +++ b/redisson/src/main/java/org/redisson/connection/MasterSlaveEntry.java @@ -129,8 +129,8 @@ private boolean slaveDown(ClientConnectionsEntry entry, FreezeReason freezeReaso return slaveDown(e, freezeReason == FreezeReason.SYSTEM); } - public boolean slaveDown(String host, int port, FreezeReason freezeReason) { - ClientConnectionsEntry entry = slaveBalancer.freeze(host, port, freezeReason); + public boolean slaveDown(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = slaveBalancer.freeze(address, freezeReason); if (entry == null) { return false; } @@ -141,9 +141,9 @@ public boolean slaveDown(String host, int port, FreezeReason freezeReason) { private boolean slaveDown(ClientConnectionsEntry entry, boolean temporaryDown) { // add master as slave if no more slaves available if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() == 0) { - InetSocketAddress addr = masterEntry.getClient().getAddr(); - if (slaveUp(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM)) { - log.info("master {}:{} used as slave", addr.getHostName(), addr.getPort()); + URI addr = masterEntry.getClient().getConfig().getAddress(); + if (slaveUp(addr, FreezeReason.SYSTEM)) { + log.info("master {} used as slave", addr); } } @@ -309,7 +309,11 @@ public void operationComplete(ChannelFuture future) throws Exception { public boolean hasSlave(InetSocketAddress addr) { return slaveBalancer.contains(addr); } - + + public boolean hasSlave(String addr) { + return slaveBalancer.contains(addr); + } + public RFuture addSlave(URI address) { return addSlave(address, true, NodeType.SLAVE); } @@ -334,17 +338,18 @@ public RedisClient getClient() { return masterEntry.getClient(); } - public boolean slaveUp(String host, int port, FreezeReason freezeReason) { - if (!slaveBalancer.unfreeze(host, port, freezeReason)) { + public boolean slaveUp(URI address, FreezeReason freezeReason) { + if (!slaveBalancer.unfreeze(address, freezeReason)) { return false; } + InetSocketAddress naddress = new InetSocketAddress(address.getHost(), address.getPort()); InetSocketAddress addr = masterEntry.getClient().getAddr(); // exclude master from slaves if (config.getReadMode() == ReadMode.SLAVE - && (!addr.getHostName().equals(host) || port != addr.getPort())) { - slaveDown(addr.getHostName(), addr.getPort(), FreezeReason.SYSTEM); - log.info("master {}:{} excluded from slaves", addr.getHostName(), addr.getPort()); + && (!addr.getAddress().getHostAddress().equals(naddress.getAddress().getHostAddress()) || naddress.getPort() != addr.getPort())) { + slaveDown(address, FreezeReason.SYSTEM); + log.info("master {} excluded from slaves", addr); } return true; } @@ -369,7 +374,7 @@ public void operationComplete(Future future) throws Exception { // more than one slave available, so master can be removed from slaves if (config.getReadMode() == ReadMode.SLAVE && slaveBalancer.getAvailableClients() > 1) { - slaveDown(address.getHost(), address.getPort(), FreezeReason.SYSTEM); + slaveDown(address, FreezeReason.SYSTEM); } connectionManager.shutdownAsync(oldMaster.getClient()); } diff --git a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java index 4e9c3588cad..d0cfbb35f6b 100755 --- a/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java +++ b/redisson/src/main/java/org/redisson/connection/SentinelConnectionManager.java @@ -171,7 +171,7 @@ private RFuture registerSentinel(final SentinelServersCon @Override public void operationComplete(Future future) throws Exception { if (!future.isSuccess()) { - log.warn("Can't connect to sentinel: {}:{}", addr.getHost(), addr.getPort()); + log.warn("Can't connect to sentinel: {}", addr); return; } @@ -220,8 +220,7 @@ protected void onSentinelAdded(SentinelServersConfig cfg, String msg, MasterSlav String ip = parts[2]; String port = parts[3]; - String addr = createAddress(ip, port); - URI uri = URIBuilder.create(addr); + URI uri = convert(ip, port); registerSentinel(cfg, uri, c); } } @@ -253,11 +252,13 @@ public void operationComplete(Future future) throws Exception { return; } - if (entry.slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (entry.slaveUp(uri, FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} added", slaveAddr); } } + }); } else { slaveUp(ip, port); @@ -267,6 +268,12 @@ public void operationComplete(Future future) throws Exception { } } + protected URI convert(String ip, String port) { + String addr = createAddress(ip, port); + URI uri = URIBuilder.create(addr); + return uri; + } + private void onNodeDown(URI sentinelAddr, String msg) { String[] parts = msg.split(" "); @@ -309,7 +316,8 @@ private void slaveDown(String ip, String port) { log.warn("slave: {}:{} has down", ip, port); } else { MasterSlaveEntry entry = getEntry(singleSlotRange.getStartSlot()); - if (entry.slaveDown(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (entry.slaveDown(uri, FreezeReason.MANAGER)) { log.warn("slave: {}:{} has down", ip, port); } } @@ -367,7 +375,8 @@ private void slaveUp(String ip, String port) { return; } - if (getEntry(singleSlotRange.getStartSlot()).slaveUp(ip, Integer.valueOf(port), FreezeReason.MANAGER)) { + URI uri = convert(ip, port); + if (getEntry(singleSlotRange.getStartSlot()).slaveUp(uri, FreezeReason.MANAGER)) { String slaveAddr = ip + ":" + port; log.info("slave: {} has up", slaveAddr); } diff --git a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java index 40e46a5c82f..6c047a07640 100644 --- a/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java +++ b/redisson/src/main/java/org/redisson/connection/balancer/LoadBalancerManager.java @@ -16,6 +16,7 @@ package org.redisson.connection.balancer; import java.net.InetSocketAddress; +import java.net.URI; import java.util.Map; import java.util.concurrent.atomic.AtomicInteger; @@ -44,9 +45,10 @@ public class LoadBalancerManager { private final Logger log = LoggerFactory.getLogger(getClass()); private final ConnectionManager connectionManager; - private final Map addr2Entry = PlatformDependent.newConcurrentHashMap(); private final PubSubConnectionPool pubSubConnectionPool; private final SlaveConnectionPool slaveConnectionPool; + + private final Map ip2Entry = PlatformDependent.newConcurrentHashMap(); public LoadBalancerManager(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { this.connectionManager = connectionManager; @@ -65,7 +67,8 @@ public void operationComplete(Future future) throws Exception { return; } if (counter.decrementAndGet() == 0) { - addr2Entry.put(entry.getClient().getAddr(), entry); + String addr = convert(entry.getClient().getConfig().getAddress()); + ip2Entry.put(addr, entry); result.trySuccess(null); } } @@ -80,7 +83,7 @@ public void operationComplete(Future future) throws Exception { public int getAvailableClients() { int count = 0; - for (ClientConnectionsEntry connectionEntry : addr2Entry.values()) { + for (ClientConnectionsEntry connectionEntry : ip2Entry.values()) { if (!connectionEntry.isFreezed()) { count++; } @@ -88,11 +91,10 @@ public int getAvailableClients() { return count; } - public boolean unfreeze(String host, int port, FreezeReason freezeReason) { - InetSocketAddress addr = new InetSocketAddress(host, port); - ClientConnectionsEntry entry = addr2Entry.get(addr); + public boolean unfreeze(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry entry = getEntry(address); if (entry == null) { - throw new IllegalStateException("Can't find " + addr + " in slaves!"); + throw new IllegalStateException("Can't find " + address + " in slaves!"); } synchronized (entry) { @@ -111,12 +113,21 @@ public boolean unfreeze(String host, int port, FreezeReason freezeReason) { return false; } - public ClientConnectionsEntry freeze(String host, int port, FreezeReason freezeReason) { - InetSocketAddress addr = new InetSocketAddress(host, port); - ClientConnectionsEntry connectionEntry = addr2Entry.get(addr); + private String convert(URI address) { + InetSocketAddress addr = new InetSocketAddress(address.getHost(), address.getPort()); + return addr.getAddress().getHostAddress() + ":" + addr.getPort(); + } + + public ClientConnectionsEntry freeze(URI address, FreezeReason freezeReason) { + ClientConnectionsEntry connectionEntry = getEntry(address); return freeze(connectionEntry, freezeReason); } + protected ClientConnectionsEntry getEntry(URI address) { + String addr = convert(address); + return ip2Entry.get(addr); + } + public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, FreezeReason freezeReason) { if (connectionEntry == null) { return null; @@ -143,11 +154,15 @@ public RFuture nextPubSubConnection() { } public boolean contains(InetSocketAddress addr) { - return addr2Entry.containsKey(addr); + return ip2Entry.containsKey(addr.getAddress().getHostAddress() + ":" + addr.getPort()); + } + + public boolean contains(String addr) { + return ip2Entry.containsKey(addr); } public RFuture getConnection(RedisCommand command, InetSocketAddress addr) { - ClientConnectionsEntry entry = addr2Entry.get(addr); + ClientConnectionsEntry entry = ip2Entry.get(addr.getAddress().getHostAddress()); if (entry != null) { return slaveConnectionPool.get(command, entry); } @@ -160,23 +175,23 @@ public RFuture nextConnection(RedisCommand command) { } public void returnPubSubConnection(RedisPubSubConnection connection) { - ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress()); pubSubConnectionPool.returnConnection(entry, connection); } public void returnConnection(RedisConnection connection) { - ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); + ClientConnectionsEntry entry = ip2Entry.get(connection.getRedisClient().getAddr().getAddress().getHostAddress()); slaveConnectionPool.returnConnection(entry, connection); } public void shutdown() { - for (ClientConnectionsEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : ip2Entry.values()) { entry.getClient().shutdown(); } } public void shutdownAsync() { - for (ClientConnectionsEntry entry : addr2Entry.values()) { + for (ClientConnectionsEntry entry : ip2Entry.values()) { connectionManager.shutdownAsync(entry.getClient()); } } diff --git a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 69f88a1a445..132a28f21d6 100644 --- a/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/redisson/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -296,7 +296,7 @@ private void connectedSuccessful(ClientConnectionsEntry entry, RPromise promi private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, Throwable cause) { if (entry.incFailedAttempts() == config.getFailedAttempts()) { - checkForReconnect(entry); + checkForReconnect(entry, cause); } releaseConnection(entry); @@ -308,7 +308,7 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, T int attempts = entry.incFailedAttempts(); if (attempts == config.getFailedAttempts()) { conn.closeAsync(); - checkForReconnect(entry); + checkForReconnect(entry, null); } else if (attempts < config.getFailedAttempts()) { releaseConnection(entry, conn); } else { @@ -321,15 +321,14 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise promise, T promise.tryFailure(cause); } - private void checkForReconnect(ClientConnectionsEntry entry) { + private void checkForReconnect(ClientConnectionsEntry entry, Throwable cause) { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), - entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); - log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); + masterSlaveEntry.slaveDown(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); + log.error("slave " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); scheduleCheck(entry); } else { if (entry.freezeMaster(FreezeReason.RECONNECT)) { - log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts()); + log.error("host " + entry.getClient().getAddr() + " disconnected due to failedAttempts=" + config.getFailedAttempts() + " limit reached", cause); scheduleCheck(entry); } } @@ -385,7 +384,7 @@ public void operationComplete(Future future) throws Exception { public void operationComplete(Future future) throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { - masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); + masterSlaveEntry.slaveUp(entry.getClient().getConfig().getAddress(), FreezeReason.RECONNECT); log.info("slave {} successfully reconnected", entry.getClient().getAddr()); } else { synchronized (entry) {