Skip to content

Commit

Permalink
ZOOKEEPER-3188: skip unreachable addresses when Learner connects to L…
Browse files Browse the repository at this point in the history
…eader
  • Loading branch information
symat authored and Mate Szalay-Beko committed Nov 13, 2019
1 parent e232c55 commit 0f95678
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 12 deletions.
Expand Up @@ -255,14 +255,14 @@ protected void sockConnect(Socket sock, InetSocketAddress addr, int timeout) thr
* Establish a connection with the LearnerMaster found by findLearnerMaster.
* Followers only connect to Leaders, Observers can connect to any active LearnerMaster.
* Retries until either initLimit time has elapsed or 5 tries have happened.
* @param addr - the address of the Peer to connect to.
* @param multiAddr - the address of the Peer to connect to.
* @throws IOException - if the socket connection fails on the 5th attempt
* if there is an authentication failure while connecting to leader
*/
protected void connectToLeader(MultipleAddresses addr, String hostname) throws IOException {
protected void connectToLeader(MultipleAddresses multiAddr, String hostname) throws IOException {

this.leaderAddr = addr;
Set<InetSocketAddress> addresses = addr.getAllAddresses();
this.leaderAddr = multiAddr;
Set<InetSocketAddress> addresses = multiAddr.getAllReachableAddresses();
ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
CountDownLatch latch = new CountDownLatch(addresses.size());
AtomicReference<Socket> socket = new AtomicReference<>(null);
Expand All @@ -284,15 +284,14 @@ protected void connectToLeader(MultipleAddresses addr, String hostname) throws I
}

if (socket.get() == null) {
throw new IOException("Failed connect to " + addr);
throw new IOException("Failed connect to " + multiAddr);
} else {
sock = socket.get();
}

self.authLearner.authenticate(sock, hostname);

leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(
sock.getInputStream()));
leaderIs = BinaryInputArchive.getArchive(new BufferedInputStream(sock.getInputStream()));
bufferedOutput = new BufferedOutputStream(sock.getOutputStream());
leaderOs = BinaryOutputArchive.getArchive(bufferedOutput);
}
Expand All @@ -315,9 +314,13 @@ public void run() {
Thread.currentThread().setName("LeaderConnector-" + address);
Socket sock = connectToLeader();

if (sock != null && sock.isConnected() && !socket.compareAndSet(null, sock)) {
LOG.info("Connection to the leader is already established, close the redundant connection");
sock.close();
if (sock != null && sock.isConnected()) {
if (socket.compareAndSet(null, sock)) {
LOG.info("Successfully connected to leader, using address: {}", address);
} else {
LOG.info("Connection to the leader is already established, close the redundant connection");
sock.close();
}
}

} catch (Exception e) {
Expand Down
Expand Up @@ -120,13 +120,25 @@ public void addAddress(InetSocketAddress address) {
* @throws NoRouteToHostException if none of the addresses are reachable
*/
public InetSocketAddress getReachableAddress() throws NoRouteToHostException {
// using parallelStream() + findAny() will help to minimize the time spent, but
// using parallelStream() + findAny() will help to minimize the time spent on network operations
return addresses.parallelStream()
.filter(this::checkIfAddressIsReachable)
.findAny()
.orElseThrow(() -> new NoRouteToHostException("No valid address among " + addresses));
}

/**
* Returns a set of all reachable addresses. If none is reachable than returns empty set.
*
* @return all addresses which are reachable.
*/
public Set<InetSocketAddress> getAllReachableAddresses() {
// using parallelStream() will help to minimize the time spent on network operations
return addresses.parallelStream()
.filter(this::checkIfAddressIsReachable)
.collect(Collectors.toSet());
}

/**
* Returns a reachable address or an arbitrary one, if none is reachable. It throws an exception
* if there are no addresses registered. The function is nondeterministic in the sense that the
Expand Down
Expand Up @@ -666,7 +666,8 @@ synchronized boolean connectOne(long sid, MultipleAddresses electionAddr) {
sslSock.getSession().getCipherSuite());
}

LOG.debug("Connected to server {}", sid);
LOG.debug("Connected to server {} using election address: {}:{}",
sid, sock.getInetAddress(), sock.getPort());
// Sends connection request asynchronously if the quorum
// sasl authentication is enabled. This is required because
// sasl server authentication process may take few seconds to
Expand Down
Expand Up @@ -24,7 +24,9 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.commons.collections.CollectionUtils;
Expand Down Expand Up @@ -133,6 +135,37 @@ public void testRecreateSocketAddressesWithWrongAddresses() {
Assert.assertEquals(address, multipleAddresses.getOne());
}

@Test
public void testAlwaysGetReachableAddress() throws Exception{
InetSocketAddress reachableHost = new InetSocketAddress("127.0.0.1", 1234);
InetSocketAddress unreachableHost1 = new InetSocketAddress("unreachable1.address.zookeeper.apache.com", 1234);
InetSocketAddress unreachableHost2 = new InetSocketAddress("unreachable2.address.zookeeper.apache.com", 1234);
InetSocketAddress unreachableHost3 = new InetSocketAddress("unreachable3.address.zookeeper.apache.com", 1234);

MultipleAddresses multipleAddresses = new MultipleAddresses(
Arrays.asList(unreachableHost1, unreachableHost2, unreachableHost3, reachableHost));

// we call the getReachableAddress() function multiple times, to make sure we
// always got back a reachable address and not just a random one
for (int i = 0; i < 10; i++) {
Assert.assertEquals(reachableHost, multipleAddresses.getReachableAddress());
}
}

@Test
public void testGetAllReachableAddresses() throws Exception {
InetSocketAddress reachableHost1 = new InetSocketAddress("127.0.0.1", 1234);
InetSocketAddress reachableHost2 = new InetSocketAddress("127.0.0.1", 2345);
InetSocketAddress unreachableHost1 = new InetSocketAddress("unreachable1.address.zookeeper.apache.com", 1234);
InetSocketAddress unreachableHost2 = new InetSocketAddress("unreachable2.address.zookeeper.apache.com", 1234);

MultipleAddresses multipleAddresses = new MultipleAddresses(
Arrays.asList(unreachableHost1, unreachableHost2, reachableHost1, reachableHost2));

Set<InetSocketAddress> reachableHosts = new HashSet<>(Arrays.asList(reachableHost1, reachableHost2));
Assert.assertEquals(reachableHosts, multipleAddresses.getAllReachableAddresses());
}

@Test
public void testEquals() {
List<InetSocketAddress> addresses = getAddressList();
Expand Down

0 comments on commit 0f95678

Please sign in to comment.