Skip to content

Commit

Permalink
Client fails to reconnect when domain names are used in member list
Browse files Browse the repository at this point in the history
Make sure that the provided address string is used in the activeConnectionsMap. This fixes the domain name to ip address conversion problem and duplicate connection initiation problem to the same member one with ip address and one with domain name.

We made the assumption that the user will have to use the same host name or ip in the client config if the member is configured with a public ip address.

fixes hazelcast#11116
fixes hazelcast#11264
backport of hazelcast#11226
  • Loading branch information
sancar committed Sep 14, 2017
1 parent 2a88ee8 commit d202a12
Show file tree
Hide file tree
Showing 12 changed files with 166 additions and 70 deletions.
Expand Up @@ -16,17 +16,20 @@

package com.hazelcast.client.connection;

import java.net.InetSocketAddress;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.annotation.PrivateApi;

import java.util.Collection;

/**
* Provides initial addresses for client to find and connect to a node
*/
@PrivateApi
public interface AddressProvider {

/**
* @return Collection of InetSocketAddress
* @return The possible member addresses to connect to.
*/
Collection<InetSocketAddress> loadAddresses();
Collection<Address> loadAddresses();

}
Expand Up @@ -22,8 +22,8 @@
import com.hazelcast.client.util.AddressHelper;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.LoggingService;
import com.hazelcast.nio.Address;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -45,10 +45,10 @@ public AwsAddressProvider(ClientAwsConfig awsConfig, LoggingService loggingServi
}

@Override
public Collection<InetSocketAddress> loadAddresses() {
public Collection<Address> loadAddresses() {
updateLookupTable();
final Map<String, String> lookupTable = getLookupTable();
final Collection<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>(lookupTable.size());
final Collection<Address> addresses = new ArrayList<Address>(lookupTable.size());

for (String privateAddress : lookupTable.keySet()) {
addresses.addAll(AddressHelper.getSocketAddresses(privateAddress));
Expand Down
Expand Up @@ -33,7 +33,6 @@
import com.hazelcast.util.Clock;
import com.hazelcast.util.executor.SingleExecutorThreadFactory;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -108,23 +107,23 @@ public void shutdown() {
}
}

private Collection<InetSocketAddress> getSocketAddresses() {
final List<InetSocketAddress> socketAddresses = new LinkedList<InetSocketAddress>();
private Collection<Address> getAddresses() {
final List<Address> addresses = new LinkedList<Address>();

Collection<Member> memberList = getMemberList();
for (Member member : memberList) {
socketAddresses.add(member.getSocketAddress());
addresses.add(member.getAddress());
}

for (AddressProvider addressProvider : addressProviders) {
socketAddresses.addAll(addressProvider.loadAddresses());
addresses.addAll(addressProvider.loadAddresses());
}

if (shuffleMemberList) {
Collections.shuffle(socketAddresses);
Collections.shuffle(addresses);
}

return socketAddresses;
return addresses;
}

public ClientPrincipal getPrincipal() {
Expand All @@ -145,7 +144,7 @@ public void connectToCluster() throws Exception {
final int connectionAttemptLimit = connAttemptLimit == 0 ? Integer.MAX_VALUE : connAttemptLimit;

int attempt = 0;
Set<InetSocketAddress> triedAddresses = new HashSet<InetSocketAddress>();
Set<Address> triedAddresses = new HashSet<Address>();
while (attempt < connectionAttemptLimit) {
checkIfClientStillRuning("Giving up on retrying to connect to cluster since client is shutdown.");
attempt++;
Expand Down Expand Up @@ -186,9 +185,9 @@ private void checkIfClientStillRuning(String shutdownMessage) {
}
}

private boolean connect(Set<InetSocketAddress> triedAddresses) throws Exception {
final Collection<InetSocketAddress> socketAddresses = getSocketAddresses();
for (InetSocketAddress inetSocketAddress : socketAddresses) {
private boolean connect(Set<Address> triedAddresses) throws Exception {
final Collection<Address> memberAddresses = getAddresses();
for (Address address : memberAddresses) {
if (!client.getLifecycleService().isRunning()) {
if (logger.isFinestEnabled()) {
logger.finest("Giving up on retrying to connect to cluster since client is shutdown");
Expand All @@ -197,17 +196,16 @@ private boolean connect(Set<InetSocketAddress> triedAddresses) throws Exception
}
Connection connection = null;
try {
triedAddresses.add(inetSocketAddress);
Address address = new Address(inetSocketAddress);
triedAddresses.add(address);
logger.info("Trying to connect to " + address + " as owner member");
connection = connectionManager.getOrConnect(address, true);
clientMembershipListener.listenMembershipEvents(ownerConnectionAddress);
fireConnectionEvent(LifecycleEvent.LifecycleState.CLIENT_CONNECTED);
return true;
} catch (Exception e) {
logger.warning("Exception during initial connection to " + inetSocketAddress + ", exception " + e);
logger.warning("Exception during initial connection to " + address + ", exception " + e);
if (null != connection) {
connection.close("Could not connect to " + inetSocketAddress + " as owner", e);
connection.close("Could not connect to " + address + " as owner", e);
}
}
}
Expand Down
Expand Up @@ -19,8 +19,8 @@
import com.hazelcast.client.config.ClientNetworkConfig;
import com.hazelcast.client.connection.AddressProvider;
import com.hazelcast.client.util.AddressHelper;
import com.hazelcast.nio.Address;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -41,16 +41,16 @@ public DefaultAddressProvider(ClientNetworkConfig networkConfig, boolean noOther
}

@Override
public Collection<InetSocketAddress> loadAddresses() {
public Collection<Address> loadAddresses() {
final List<String> addresses = networkConfig.getAddresses();
if (addresses.isEmpty() && noOtherAddressProviderExist) {
addresses.add("localhost");
addresses.add("127.0.0.1");
}
final List<InetSocketAddress> socketAddresses = new LinkedList<InetSocketAddress>();
final List<Address> possibleAddresses = new LinkedList<Address>();

for (String address : addresses) {
socketAddresses.addAll(AddressHelper.getSocketAddresses(address));
possibleAddresses.addAll(AddressHelper.getSocketAddresses(address));
}
return socketAddresses;
return possibleAddresses;
}
}
Expand Up @@ -23,8 +23,6 @@
import com.hazelcast.spi.discovery.DiscoveryNode;
import com.hazelcast.spi.discovery.integration.DiscoveryService;

import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collection;

Expand All @@ -42,18 +40,13 @@ public DiscoveryAddressProvider(DiscoveryService discoveryService, LoggingServic
}

@Override
public Collection<InetSocketAddress> loadAddresses() {
public Collection<Address> loadAddresses() {
Iterable<DiscoveryNode> discoveredNodes = checkNotNull(discoveryService.discoverNodes(),
"Discovered nodes cannot be null!");

Collection<InetSocketAddress> possibleMembers = new ArrayList<InetSocketAddress>();
Collection<Address> possibleMembers = new ArrayList<Address>();
for (DiscoveryNode discoveryNode : discoveredNodes) {
Address discoveredAddress = discoveryNode.getPrivateAddress();
try {
possibleMembers.add(discoveredAddress.getInetSocketAddress());
} catch (UnknownHostException e) {
logger.warning("Unresolvable host exception", e);
}
possibleMembers.add(discoveryNode.getPrivateAddress());
}
return possibleMembers;
}
Expand Down
Expand Up @@ -17,13 +17,13 @@
package com.hazelcast.client.util;

import com.hazelcast.logging.Logger;
import com.hazelcast.nio.Address;
import com.hazelcast.util.AddressUtil;
import com.hazelcast.util.AddressUtil.AddressHolder;

import java.net.Inet4Address;
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.LinkedList;
Expand All @@ -41,7 +41,7 @@ public final class AddressHelper {
private AddressHelper() {
}

public static Collection<InetSocketAddress> getSocketAddresses(String address) {
public static Collection<Address> getSocketAddresses(String address) {
final AddressHolder addressHolder = AddressUtil.getAddressHolder(address, -1);
final String scopedAddress = addressHolder.getScopeId() != null
? addressHolder.getAddress() + '%' + addressHolder.getScopeId()
Expand All @@ -55,7 +55,7 @@ public static Collection<InetSocketAddress> getSocketAddresses(String address) {
return getPossibleSocketAddresses(port, scopedAddress, maxPortTryCount);
}

public static Collection<InetSocketAddress> getPossibleSocketAddresses(int port, String scopedAddress,
public static Collection<Address> getPossibleSocketAddresses(int port, String scopedAddress,
int portTryCount) {
InetAddress inetAddress = null;
try {
Expand All @@ -68,24 +68,29 @@ public static Collection<InetSocketAddress> getPossibleSocketAddresses(int port,
if (possiblePort == -1) {
possiblePort = INITIAL_FIRST_PORT;
}
final Collection<InetSocketAddress> socketAddresses = new LinkedList<InetSocketAddress>();
final Collection<Address> addresses = new LinkedList<Address>();

if (inetAddress == null) {
for (int i = 0; i < portTryCount; i++) {
socketAddresses.add(new InetSocketAddress(scopedAddress, possiblePort + i));
try {
addresses.add(new Address(scopedAddress, possiblePort + i));
} catch (UnknownHostException ignored) {
Logger.getLogger(AddressHelper.class).finest("Address not available", ignored);
}
}
} else if (inetAddress instanceof Inet4Address) {
for (int i = 0; i < portTryCount; i++) {
socketAddresses.add(new InetSocketAddress(inetAddress, possiblePort + i));
addresses.add(new Address(scopedAddress, inetAddress, possiblePort + i));
}
} else if (inetAddress instanceof Inet6Address) {
final Collection<Inet6Address> addresses = getPossibleInetAddressesFor((Inet6Address) inetAddress);
for (Inet6Address inet6Address : addresses) {
final Collection<Inet6Address> possibleInetAddresses = getPossibleInetAddressesFor((Inet6Address) inetAddress);
for (Inet6Address inet6Address : possibleInetAddresses) {
for (int i = 0; i < portTryCount; i++) {
socketAddresses.add(new InetSocketAddress(inet6Address, possiblePort + i));
addresses.add(new Address(scopedAddress, inet6Address, possiblePort + i));
}
}
}

return socketAddresses;
return addresses;
}
}
Expand Up @@ -43,7 +43,6 @@
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -124,12 +123,12 @@ public void testMemberConnectionOrder() {
config.setProperty(ClientProperty.SHUFFLE_MEMBER_LIST.getName(), "false");
config.getNetworkConfig().setSmartRouting(false);

InetSocketAddress socketAddress1 = server1.getCluster().getLocalMember().getSocketAddress();
InetSocketAddress socketAddress2 = server2.getCluster().getLocalMember().getSocketAddress();
Address address1 = server1.getCluster().getLocalMember().getAddress();
Address address2 = server2.getCluster().getLocalMember().getAddress();

config.getNetworkConfig().
addAddress(socketAddress1.getHostName() + ":" + socketAddress1.getPort()).
addAddress(socketAddress2.getHostName() + ":" + socketAddress2.getPort());
addAddress(address1.getHost() + ":" + address1.getPort()).
addAddress(address2.getHost() + ":" + address2.getPort());

hazelcastFactory.newHazelcastClient(config);

Expand All @@ -151,7 +150,7 @@ public void destroyConnection_whenDestroyedMultipleTimes_thenListenerRemoveCalle

connectionManager.addConnectionListener(listener);

final Address serverAddress = new Address(server.getCluster().getLocalMember().getSocketAddress());
final Address serverAddress = server.getCluster().getLocalMember().getAddress();
final Connection connectionToServer = connectionManager.getConnection(serverAddress);

final CountDownLatch isConnected = new CountDownLatch(1);
Expand Down

0 comments on commit d202a12

Please sign in to comment.