Skip to content

Commit

Permalink
ZOOKEEPER-3188: fix LeaderElection to work with multiple election add…
Browse files Browse the repository at this point in the history
…resses
  • Loading branch information
symat committed Aug 12, 2019
1 parent 7bfbe7e commit 5b22432
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 29 deletions.
Expand Up @@ -26,14 +26,17 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.NoRouteToHostException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
Expand Down Expand Up @@ -197,11 +200,11 @@ static public class Message {
*/
static public class InitialMessage {
public Long sid;
public InetSocketAddress electionAddr;
public List<InetSocketAddress> electionAddr;

InitialMessage(Long sid, InetSocketAddress address) {
InitialMessage(Long sid, List<InetSocketAddress> addresses) {
this.sid = sid;
this.electionAddr = address;
this.electionAddr = addresses;
}

@SuppressWarnings("serial")
Expand Down Expand Up @@ -237,28 +240,33 @@ static public InitialMessage parse(Long protocolVersion, DataInputStream din)
num_read, remaining, sid);
}

String addr = new String(b);
String[] host_port;
try {
host_port = ConfigUtils.getHostAndPort(addr);
} catch (ConfigException e) {
throw new InitialMessageException("Badly formed address: %s", addr);
}
String[] addressStrings = new String(b).split(",");
List<InetSocketAddress> addresses = new ArrayList<>(addressStrings.length);
for(String addr : addressStrings) {

if (host_port.length != 2) {
throw new InitialMessageException("Badly formed address: %s", addr);
}
String[] host_port;
try {
host_port = ConfigUtils.getHostAndPort(addr);
} catch (ConfigException e) {
throw new InitialMessageException("Badly formed address: %s", addr);
}

int port;
try {
port = Integer.parseInt(host_port[1]);
} catch (NumberFormatException e) {
throw new InitialMessageException("Bad port number: %s", host_port[1]);
} catch (ArrayIndexOutOfBoundsException e) {
throw new InitialMessageException("No port number in: %s", addr);
if (host_port.length != 2) {
throw new InitialMessageException("Badly formed address: %s", addr);
}

int port;
try {
port = Integer.parseInt(host_port[1]);
} catch (NumberFormatException e) {
throw new InitialMessageException("Bad port number: %s", host_port[1]);
} catch (ArrayIndexOutOfBoundsException e) {
throw new InitialMessageException("No port number in: %s", addr);
}
addresses.add(new InetSocketAddress(host_port[0], port));
}

return new InitialMessage(sid, new InetSocketAddress(host_port[0], port));
return new InitialMessage(sid, addresses);
}
}

Expand Down Expand Up @@ -424,8 +432,8 @@ private boolean startConnection(Socket sock, Long sid)
// represents protocol version (in other words - message type)
dout.writeLong(PROTOCOL_VERSION);
dout.writeLong(self.getId());
InetSocketAddress address = self.getElectionAddress().getReachableOrOne();
String addr = formatInetAddr(address);
String addr = self.getElectionAddress().getAllAddresses().stream()
.map(NetUtils::formatInetAddr).collect(Collectors.joining(","));
byte[] addr_bytes = addr.getBytes();
dout.writeInt(addr_bytes.length);
dout.write(addr_bytes);
Expand Down Expand Up @@ -532,7 +540,7 @@ public void run() {
private void handleConnection(Socket sock, DataInputStream din)
throws IOException {
Long sid = null, protocolVersion = null;
InetSocketAddress electionAddr = null;
MultipleAddresses electionAddr = null;

try {
protocolVersion = din.readLong();
Expand All @@ -542,7 +550,7 @@ private void handleConnection(Socket sock, DataInputStream din)
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din);
sid = init.sid;
electionAddr = init.electionAddr;
electionAddr = new MultipleAddresses(init.electionAddr);
} catch (InitialMessage.InitialMessageException ex) {
LOG.error(ex.toString());
closeSocket(sock);
Expand Down Expand Up @@ -585,7 +593,7 @@ private void handleConnection(Socket sock, DataInputStream din)
closeSocket(sock);

if (electionAddr != null) {
connectOne(sid, new MultipleAddresses(electionAddr));
connectOne(sid, electionAddr);
} else {
connectOne(sid);
}
Expand Down Expand Up @@ -648,6 +656,10 @@ public void toSend(Long sid, ByteBuffer b) {
synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and if we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
return true;
}

Expand All @@ -660,7 +672,7 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
sock = new Socket();
}
setSockOpts(sock);
sock.connect(electionAddr.getReachableOrOne(), cnxTO);
sock.connect(electionAddr.getReachableAddress(), cnxTO);
if (sock instanceof SSLSocket) {
SSLSocket sslSock = (SSLSocket) sock;
sslSock.startHandshake();
Expand Down Expand Up @@ -692,6 +704,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
+ " at election address " + electionAddr, e);
closeSocket(sock);
return false;
} catch (NoRouteToHostException e) {
LOG.warn("None of the addresses ({}) are reachable for sid {}", electionAddr, sid, e);
closeSocket(sock);
return false;
} catch (IOException e) {
LOG.warn("Cannot open channel to " + sid
+ " at election address " + electionAddr,
Expand All @@ -709,6 +725,10 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr){
synchronized void connectOne(long sid){
if (senderWorkerMap.get(sid) != null) {
LOG.debug("There is a connection already for server {}", sid);
// since ZOOKEEPER-3188 we can use multiple election addresses to reach a server. It is possible, that the
// one we are using is already dead and if we need to clean-up, so when we will create a new connection
// then we will choose an other one, which is actually reachable
senderWorkerMap.get(sid).asyncValidateIfSocketIsStillReachable();
return;
}
synchronized (self.QV_LOCK) {
Expand Down Expand Up @@ -1100,6 +1120,7 @@ class SendWorker extends ZooKeeperThread {
RecvWorker recvWorker;
volatile boolean running = true;
DataOutputStream dout;
AtomicBoolean ongoingAsyncValidation = new AtomicBoolean(false);

/**
* An instance of this thread receives messages to send
Expand Down Expand Up @@ -1239,6 +1260,37 @@ public void run() {
this.finish();
LOG.warn("Send worker leaving thread " + " id " + sid + " my id = " + self.getId());
}

public void asyncValidateIfSocketIsStillReachable() {
if(ongoingAsyncValidation.compareAndSet(false, true)) {
Thread validator = new Thread(() -> {
LOG.debug("validate if destination address is reachable for sid {}", sid);
if(sock != null) {
InetAddress address = sock.getInetAddress();
try {
if (address.isReachable(500)) {
LOG.debug("destination address {} is reachable for sid {}", address.toString(), sid);
return;
}
} catch (NullPointerException | IOException ignored) {
}
LOG.warn("destination address {} not reachable anymore, shutting down the SendWorker for sid {}", address.toString(), sid);
this.finish();
}
});
validator.start();
try {
validator.join();
} catch (InterruptedException ignored) {
// we don't care if the validation was interrupted. If SenderWorker is not working, we will
// try to connect and re-validate later
}
ongoingAsyncValidation.set(false);
} else {
LOG.debug("validation of destination address for sid {} is skipped (it is already running)", sid);
}
}

}

/**
Expand Down
Expand Up @@ -480,8 +480,7 @@ public void testBadPeerAddressInQuorum() throws Exception {
LineNumberReader r = new LineNumberReader(new StringReader(os.toString()));
String line;
boolean found = false;
Pattern p =
Pattern.compile(".*Cannot open channel to .* at election address .*");
Pattern p = Pattern.compile(".*None of the addresses .* are reachable for sid 2");
while ((line = r.readLine()) != null) {
found = p.matcher(line).matches();
if (found) {
Expand Down

0 comments on commit 5b22432

Please sign in to comment.