From 5b22432c15794d069ee48a1711161b1bc6f71fd6 Mon Sep 17 00:00:00 2001 From: Mate Szalay-Beko Date: Mon, 12 Aug 2019 13:00:36 +0200 Subject: [PATCH] ZOOKEEPER-3188: fix LeaderElection to work with multiple election addresses --- .../server/quorum/QuorumCnxManager.java | 106 +++++++++++++----- .../server/quorum/QuorumPeerMainTest.java | 3 +- 2 files changed, 80 insertions(+), 29 deletions(-) diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java index 9b9a6f6dec5..e505fb49542 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumCnxManager.java @@ -26,7 +26,9 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.NoRouteToHostException; import java.net.ServerSocket; import java.net.Socket; import java.net.SocketException; @@ -34,6 +36,7 @@ import java.nio.BufferUnderflowException; import java.nio.ByteBuffer; import java.nio.channels.UnresolvedAddressException; +import java.util.ArrayList; import java.util.Collections; import java.util.Enumeration; import java.util.HashSet; @@ -197,11 +200,11 @@ static public class Message { */ static public class InitialMessage { public Long sid; - public InetSocketAddress electionAddr; + public List electionAddr; - InitialMessage(Long sid, InetSocketAddress address) { + InitialMessage(Long sid, List addresses) { this.sid = sid; - this.electionAddr = address; + this.electionAddr = addresses; } @SuppressWarnings("serial") @@ -237,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din) num_read, remaining, sid); } - String addr = new String(b); - String[] host_port; - try { - host_port = ConfigUtils.getHostAndPort(addr); - } catch (ConfigException e) { - throw new InitialMessageException("Badly formed address: %s", addr); - } + String[] addressStrings = new String(b).split(","); + List addresses = new ArrayList<>(addressStrings.length); + for(String addr : addressStrings) { - if (host_port.length != 2) { - throw new InitialMessageException("Badly formed address: %s", addr); - } + String[] host_port; + try { + host_port = ConfigUtils.getHostAndPort(addr); + } catch (ConfigException e) { + throw new InitialMessageException("Badly formed address: %s", addr); + } - int port; - try { - port = Integer.parseInt(host_port[1]); - } catch (NumberFormatException e) { - throw new InitialMessageException("Bad port number: %s", host_port[1]); - } catch (ArrayIndexOutOfBoundsException e) { - throw new InitialMessageException("No port number in: %s", addr); + if (host_port.length != 2) { + throw new InitialMessageException("Badly formed address: %s", addr); + } + + int port; + try { + port = Integer.parseInt(host_port[1]); + } catch (NumberFormatException e) { + throw new InitialMessageException("Bad port number: %s", host_port[1]); + } catch (ArrayIndexOutOfBoundsException e) { + throw new InitialMessageException("No port number in: %s", addr); + } + addresses.add(new InetSocketAddress(host_port[0], port)); } - return new InitialMessage(sid, new InetSocketAddress(host_port[0], port)); + return new InitialMessage(sid, addresses); } } @@ -424,8 +432,8 @@ private boolean startConnection(Socket sock, Long sid) // represents protocol version (in other words - message type) dout.writeLong(PROTOCOL_VERSION); dout.writeLong(self.getId()); - InetSocketAddress address = self.getElectionAddress().getReachableOrOne(); - String addr = formatInetAddr(address); + String addr = self.getElectionAddress().getAllAddresses().stream() + .map(NetUtils::formatInetAddr).collect(Collectors.joining(",")); byte[] addr_bytes = addr.getBytes(); dout.writeInt(addr_bytes.length); dout.write(addr_bytes); @@ -532,7 +540,7 @@ public void run() { private void handleConnection(Socket sock, DataInputStream din) throws IOException { Long sid = null, protocolVersion = null; - InetSocketAddress electionAddr = null; + MultipleAddresses electionAddr = null; try { protocolVersion = din.readLong(); @@ -542,7 +550,7 @@ private void handleConnection(Socket sock, DataInputStream din) try { InitialMessage init = InitialMessage.parse(protocolVersion, din); sid = init.sid; - electionAddr = init.electionAddr; + electionAddr = new MultipleAddresses(init.electionAddr); } catch (InitialMessage.InitialMessageException ex) { LOG.error(ex.toString()); closeSocket(sock); @@ -585,7 +593,7 @@ private void handleConnection(Socket sock, DataInputStream din) closeSocket(sock); if (electionAddr != null) { - connectOne(sid, new MultipleAddresses(electionAddr)); + connectOne(sid, electionAddr); } else { connectOne(sid); } @@ -648,6 +656,10 @@ public void toSend(Long sid, ByteBuffer b) { synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server {}", sid); + // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the + // one we are using is already dead and if we need to clean-up, so when we will create a new connection + // then we will choose an other one, which is actually reachable + senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable(); return true; } @@ -660,7 +672,7 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){ sock = new Socket(); } setSockOpts(sock); - sock.connect(electionAddr.getReachableOrOne(), cnxTO); + sock.connect(electionAddr.getReachableAddress(), cnxTO); if (sock instanceof SSLSocket) { SSLSocket sslSock = (SSLSocket) sock; sslSock.startHandshake(); @@ -692,6 +704,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){ + " at election address " + electionAddr, e); closeSocket(sock); return false; + } catch (NoRouteToHostException e) { + LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e); + closeSocket(sock); + return false; } catch (IOException e) { LOG.warn("Cannot open channel to " + sid + " at election address " + electionAddr, @@ -709,6 +725,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){ synchronized void connectOne(long sid){ if (senderWorkerMap.get(sid) != null) { LOG.debug("There is a connection already for server {}", sid); + // since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the + // one we are using is already dead and if we need to clean-up, so when we will create a new connection + // then we will choose an other one, which is actually reachable + senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable(); return; } synchronized (self.QV_LOCK) { @@ -1100,6 +1120,7 @@ class SendWorker extends ZooKeeperThread { RecvWorker recvWorker; volatile boolean running = true; DataOutputStream dout; + AtomicBoolean ongoingAsyncValidation = new AtomicBoolean(false); /** * An instance of this thread receives messages to send @@ -1239,6 +1260,37 @@ public void run() { this.finish(); LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId()); } + + public void asyncValidateIfSocketIsStillReachable() { + if(ongoingAsyncValidation.compareAndSet(false, true)) { + Thread validator = new Thread(() -> { + LOG.debug("validate if destination address is reachable for sid {}", sid); + if(sock != null) { + InetAddress address = sock.getInetAddress(); + try { + if (address.isReachable(500)) { + LOG.debug("destination address {} is reachable for sid {}", address.toString(), sid); + return; + } + } catch (NullPointerException | IOException ignored) { + } + LOG.warn("destination address {} not reachable anymore, shutting down the SendWorker for sid {}", address.toString(), sid); + this.finish(); + } + }); + validator.start(); + try { + validator.join(); + } catch (InterruptedException ignored) { + // we don't care if the validation was interrupted. If SenderWorker is not working, we will + // try to connect and re-validate later + } + ongoingAsyncValidation.set(false); + } else { + LOG.debug("validation of destination address for sid {} is skipped (it is already running)", sid); + } + } + } /** diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java index 84dcedcd6ca..62d66a943c1 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumPeerMainTest.java @@ -480,8 +480,7 @@ public void testBadPeerAddressInQuorum() throws Exception { LineNumberReader r = new LineNumberReader(new StringReader(os.toString())); String line; boolean found = false; - Pattern p = - Pattern.compile(".*Cannot open channel to .* at election address .*"); + Pattern p = Pattern.compile(".*None of the addresses .* are reachable for sid 2"); while ((line = r.readLine()) != null) { found = p.matcher(line).matches(); if (found) {