Skip to content

Commit

Permalink
[HZ-581] Address issues with hostnames (hazelcast#20014)
Browse files Browse the repository at this point in the history
This PR includes the connection manager changes which are prerequisite
for resolving the hostname related issues. What I'm trying to do in this
PR is to handle the multiple addresses that refer to the members in the
connection manager layer, and I tried to make only the public address of
the members visible at the higher level of the code except for the
places where connection initiation is made that takes addresses from the
config. Also, tried to keep the scope of this PR especially at the
connection manager layer and avoided some changes that would cause
changes in the other layers such as operation service.

The changes in this PR are:

    LocalAddressRegistry is introduced to store Address-UUID
    and UUID-Addresses mappings.
    Use endpoint UUID's for managing the connection registration. It's mainly
    adjusting the connection managers to this uuid changes
    Apply these new uuid changes to mock network classes as well.
    Use Member's MEMBER protocol public address if available (o/w when
    advanced network is being used, then use the corresponding protocol's
    public address) as the remoteAddress of connection which previously
    exposes the other target addresses to the outside of the connection
    manager layer (OperationRunnerImpl gets this remote address of the
    connection and sets it as the caller address of Operation so they expose)

The most difficult part of the PR is the determination of the lifecycle
of the UUID->Address, Address->UUID entries' lifecycle. We must
determine when to remove these entries from our registry. We must remove
these entries at some point since they can get stale (and also resource
usage) The events that can leave these entries stale are as follows:

    In some scenarios, the UUID for the specific host machine (for the
    specific network address) can be changed. These are:
        restarting a member on the same machine. The new hz member has a
        different UUID than the previous member despite using the same
        network addresses invocation of ClusterServiceImpl#reset
        triggers the UUID change on the member without any change on the
        network address to which the member is bound. This method is
        only called before the split brain merge happens.
    At Hot Restart (Persistence) recovery, a new member started on a
    different host machine can use the UUID of a crashed member on a
    different address/es.

These two cases should be considered while determining the lifecycle of
this entries. For now, UUID-Address entries registration takes place in:
For the members:

    Member handshake processing for the member connections For the
    clients: In the connection registration phase of client connection
    registrations

UUID-Address entries removal performed: For the members:

    when all connections between members are closed. (There may be
    multiple connections between members, and we must not clear these
    entries when one of these connections is closed.)

For the clients:

    We remove entries belonging to client connections when client
    connections are closed (safe to remove as there is only one active
    connection).
  • Loading branch information
ufukyilmaz committed Jan 11, 2022
1 parent 624e443 commit 16ec195
Show file tree
Hide file tree
Showing 58 changed files with 2,282 additions and 405 deletions.
Expand Up @@ -316,9 +316,9 @@ public boolean bind(final ClientEndpoint endpoint) {
// On such a case, `ClientEngine#connectionRemoved` will not be called for this connection since
// we did not register the connection.
// Endpoint removal logic(inside `ClientEngine#connectionRemoved`) will not be able to run, instead endpoint
// will be cleaned up by ClientHearbeatMonitor#cleanupEndpointsWithDeadConnections later.
if (conn.getRemoteAddress() != null) {
node.getServer().getConnectionManager(CLIENT).register(conn.getRemoteAddress(), conn);
// will be cleaned up by ClientHeartbeatMonitor#cleanupEndpointsWithDeadConnections later.
if (conn.getRemoteAddress() != null && endpoint.getUuid() != null) {
node.getServer().getConnectionManager(CLIENT).register(conn.getRemoteAddress(), endpoint.getUuid(), conn);
}
}

Expand Down Expand Up @@ -438,7 +438,11 @@ public void connectionRemoved(Connection c) {
logger.finest("connectionRemoved: No endpoint for connection:" + connection);
return;
}

UUID clientUuid = endpoint.getUuid();
if (clientUuid != null) {
node.getLocalAddressRegistry().tryRemoveRegistration(clientUuid,
endpoint.getConnection().getRemoteAddress());
}
endpointManager.removeEndpoint(endpoint);
}
}
Expand Down Expand Up @@ -480,10 +484,8 @@ Map<UUID, String> getClientsInCluster() {
if (endpoints == null) {
continue;
}
//Merge connected clients according to their UUID
for (Map.Entry<UUID, String> entry : endpoints.entrySet()) {
clientsMap.put(entry.getKey(), entry.getValue());
}
// Merge connected clients according to their UUID
clientsMap.putAll(endpoints);
} catch (Exception e) {
logger.warning("Cannot get client information from: " + target.toString(), e);
}
Expand Down
Expand Up @@ -21,7 +21,6 @@
import com.hazelcast.internal.nio.Connection;

import java.util.Map;
import java.util.UUID;

/**
* The ClientConnection is connection that lives on the client side on behalf of a Java client.
Expand All @@ -48,5 +47,4 @@ public interface ClientConnection extends Connection {
// used in tests
Map<Long, EventHandler> getEventHandlers();

UUID getRemoteUuid();
}
Expand Up @@ -45,6 +45,7 @@ public ClientClusterProxy(ClientClusterServiceImpl clusterService) {
}

@Override
@Nonnull
public UUID addMembershipListener(@Nonnull MembershipListener listener) {
return clusterService.addMembershipListener(listener);
}
Expand All @@ -55,12 +56,14 @@ public boolean removeMembershipListener(@Nonnull UUID registrationId) {
}

@Override
@Nonnull
public Set<Member> getMembers() {
final Collection<Member> members = clusterService.getMemberList();
return new LinkedHashSet<>(members);
}

@Override
@Nonnull
public Member getLocalMember() {
throw new UnsupportedOperationException("Client has no local member!");
}
Expand All @@ -82,6 +85,7 @@ public void changeClusterState(@Nonnull ClusterState newState) {
}

@Override
@Nonnull
public Version getClusterVersion() {
throw new UnsupportedOperationException();
}
Expand All @@ -92,6 +96,7 @@ public HotRestartService getHotRestartService() {
}

@Override
@Nonnull
public PersistenceService getPersistenceService() {
throw new UnsupportedOperationException();
}
Expand Down
5 changes: 5 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/cluster/Cluster.java
Expand Up @@ -54,6 +54,7 @@ public interface Cluster {
* @throws java.lang.NullPointerException if listener is null
* @see #removeMembershipListener(UUID)
*/
@Nonnull
UUID addMembershipListener(@Nonnull MembershipListener listener);

/**
Expand All @@ -79,6 +80,7 @@ public interface Cluster {
*
* @return current members in the cluster
*/
@Nonnull
Set<Member> getMembers();

/**
Expand All @@ -93,6 +95,7 @@ public interface Cluster {
*
* @return this Hazelcast instance member
*/
@Nonnull
Member getLocalMember();

/**
Expand Down Expand Up @@ -225,6 +228,7 @@ public interface Cluster {
* @return the version at which this cluster operates.
* @since 3.8
*/
@Nonnull
Version getClusterVersion();

/**
Expand All @@ -246,6 +250,7 @@ public interface Cluster {
* supported on this instance (e.g. on client)
* @since 5.0
*/
@Nonnull
PersistenceService getPersistenceService();

/**
Expand Down
20 changes: 20 additions & 0 deletions hazelcast/src/main/java/com/hazelcast/instance/AddressPicker.java
Expand Up @@ -53,8 +53,28 @@ public interface AddressPicker {
*/
Address getPublicAddress(EndpointQualifier qualifier);

/**
* Returns all public {@link Address}es of this member which are advertised to other
* members, mapped by corresponding {@link EndpointQualifier}. Also, see
* {@link com.hazelcast.internal.cluster.impl.MemberHandshake} and
* {@link com.hazelcast.internal.server.tcp.SendMemberHandshakeTask}.
*
* @return a {@code Map<EndpointQualifier, Address>} of this member's public addresses
* or an empty map if called before {@link #pickAddress()}
* @since 3.12
*/
Map<EndpointQualifier, Address> getPublicAddressMap();

/**
* Returns all bound server socket {@link Address}es of this member, mapped by
* corresponding {@link EndpointQualifier}
*
* @return a {@code Map<EndpointQualifier, Address>} of the bound addresses of
* this member's server sockets or an empty map if called before {@link #pickAddress()}
* @since 5.1
*/
Map<EndpointQualifier, Address> getBindAddressMap();

/**
* Returns a server channel.
*
Expand Down
Expand Up @@ -92,6 +92,16 @@ public Map<EndpointQualifier, Address> getPublicAddressMap() {
return pubAddressMap;
}

@Override
public Map<EndpointQualifier, Address> getBindAddressMap() {
Map<EndpointQualifier, Address> bindAddressMap = new HashMap<>(pickers.size());
for (Map.Entry<EndpointQualifier, AddressPicker> entry : pickers.entrySet()) {
bindAddressMap.put(entry.getKey(), entry.getValue().getBindAddress(entry.getKey()));
}

return bindAddressMap;
}

@Override
public ServerSocketChannel getServerSocketChannel(EndpointQualifier qualifier) {
return pickers.get(qualifier).getServerSocketChannel(qualifier);
Expand Down
Expand Up @@ -439,6 +439,13 @@ public Map<EndpointQualifier, Address> getPublicAddressMap() {
return publicAddressMap;
}

@Override
public Map<EndpointQualifier, Address> getBindAddressMap() {
HashMap<EndpointQualifier, Address> bindAddressMap = new HashMap<>();
bindAddressMap.put(MEMBER, bindAddress);
return bindAddressMap;
}

void setHostnameResolver(HostnameResolver hostnameResolver) {
this.hostnameResolver = checkNotNull(hostnameResolver);
}
Expand Down
Expand Up @@ -24,13 +24,14 @@
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.networking.ChannelErrorHandler;
import com.hazelcast.internal.networking.Networking;
import com.hazelcast.internal.server.tcp.ServerSocketRegistry;
import com.hazelcast.internal.networking.nio.NioNetworking;
import com.hazelcast.internal.nio.ClassLoaderUtil;
import com.hazelcast.internal.server.Server;
import com.hazelcast.internal.server.tcp.TcpServerContext;
import com.hazelcast.internal.server.tcp.TcpServerConnectionChannelErrorHandler;
import com.hazelcast.internal.server.tcp.LocalAddressRegistry;
import com.hazelcast.internal.server.tcp.ServerSocketRegistry;
import com.hazelcast.internal.server.tcp.TcpServer;
import com.hazelcast.internal.server.tcp.TcpServerConnectionChannelErrorHandler;
import com.hazelcast.internal.server.tcp.TcpServerContext;
import com.hazelcast.internal.util.InstantiationUtils;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.impl.LoggingServiceImpl;
Expand Down Expand Up @@ -143,7 +144,7 @@ public Joiner createJoiner(Node node) {
}

@Override
public Server createServer(Node node, ServerSocketRegistry registry) {
public Server createServer(Node node, ServerSocketRegistry registry, LocalAddressRegistry addressRegistry) {
TcpServerContext context = new TcpServerContext(node, node.nodeEngine);
Networking networking = createNetworking(node);
Config config = node.getConfig();
Expand All @@ -152,6 +153,7 @@ public Server createServer(Node node, ServerSocketRegistry registry) {
return new TcpServer(config,
context,
registry,
addressRegistry,
metricsRegistry,
networking,
node.getNodeExtension().createChannelInitializerFn(context));
Expand Down
Expand Up @@ -195,7 +195,15 @@ public Map<EndpointQualifier, Address> getPublicAddressMap() {
for (Map.Entry<EndpointQualifier, InetSocketAddress> entry : publicAddresses.entrySet()) {
mappings.put(entry.getKey(), new Address(entry.getValue()));
}
return mappings;
}

@Override
public Map<EndpointQualifier, Address> getBindAddressMap() {
Map<EndpointQualifier, Address> mappings = new HashMap<>(bindAddresses.size());
for (Map.Entry<EndpointQualifier, InetSocketAddress> entry : bindAddresses.entrySet()) {
mappings.put(entry.getKey(), new Address(entry.getValue()));
}
return mappings;
}
}
30 changes: 22 additions & 8 deletions hazelcast/src/main/java/com/hazelcast/instance/impl/Node.java
Expand Up @@ -71,6 +71,7 @@
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.serialization.impl.compact.schema.MemberSchemaService;
import com.hazelcast.internal.server.Server;
import com.hazelcast.internal.server.tcp.LocalAddressRegistry;
import com.hazelcast.internal.server.tcp.ServerSocketRegistry;
import com.hazelcast.internal.services.GracefulShutdownAwareService;
import com.hazelcast.internal.usercodedeployment.UserCodeDeploymentClassLoader;
Expand Down Expand Up @@ -162,6 +163,7 @@ public class Node {
*/
public final Address address;
public final SecurityContext securityContext;

private final ILogger logger;
private final AtomicBoolean shuttingDown = new AtomicBoolean(false);
private final NodeShutdownHookThread shutdownHookThread;
Expand All @@ -173,8 +175,11 @@ public class Node {
private final BuildInfo buildInfo;
private final HealthMonitor healthMonitor;
private final Joiner joiner;
private final LocalAddressRegistry localAddressRegistry;
private ManagementCenterService managementCenterService;

// it can be changed on cluster service reset see: ClusterServiceImpl#resetLocalMemberUuid
private volatile UUID thisUuid;
private volatile NodeState state = NodeState.STARTING;

/**
Expand Down Expand Up @@ -218,14 +223,15 @@ public Node(HazelcastInstanceImpl hazelcastInstance, Config staticConfig, NodeCo

try {
boolean liteMember = config.isLiteMember();
address = addressPicker.getPublicAddress(MEMBER);
nodeExtension = nodeContext.createNodeExtension(this);
address = addressPicker.getPublicAddress(MEMBER);
thisUuid = nodeExtension.createMemberUuid();
final Map<String, String> memberAttributes = findMemberAttributes(
new MemberAttributeConfigReadOnly(config.getMemberAttributeConfig()));
MemberImpl localMember = new MemberImpl.Builder(addressPicker.getPublicAddressMap())
.version(version)
.localMember(true)
.uuid(nodeExtension.createMemberUuid())
.uuid(thisUuid)
.attributes(memberAttributes)
.liteMember(liteMember)
.instance(hazelcastInstance)
Expand All @@ -248,8 +254,8 @@ public Node(HazelcastInstanceImpl hazelcastInstance, Config staticConfig, NodeCo
config.onSecurityServiceUpdated(getSecurityService());
MetricsRegistry metricsRegistry = nodeEngine.getMetricsRegistry();
metricsRegistry.provideMetrics(nodeExtension);

server = nodeContext.createServer(this, serverSocketRegistry);
localAddressRegistry = new LocalAddressRegistry(this, addressPicker);
server = nodeContext.createServer(this, serverSocketRegistry, localAddressRegistry);
healthMonitor = new HealthMonitor(this);
clientEngine = hasClientServerSocket() ? new ClientEngineImpl(this) : new NoOpClientEngine();
JoinConfig joinConfig = getActiveMemberNetworkConfig(this.config).getJoin();
Expand Down Expand Up @@ -428,6 +434,14 @@ public Address getThisAddress() {
return address;
}

public UUID getThisUuid() {
return thisUuid;
}

public void setThisUuid(UUID uuid) {
thisUuid = uuid;
}

public MemberImpl getLocalMember() {
return clusterService.getLocalMember();
}
Expand Down Expand Up @@ -725,6 +739,10 @@ public DiscoveryService getDiscoveryService() {
return discoveryService;
}

public LocalAddressRegistry getLocalAddressRegistry() {
return localAddressRegistry;
}

private enum ShutdownHookPolicy {
TERMINATE,
GRACEFUL
Expand Down Expand Up @@ -857,10 +875,6 @@ private boolean usePublicAddress(JoinConfig join) {
|| allUsePublicAddress(AliasedDiscoveryConfigUtils.aliasedDiscoveryConfigsFrom(join));
}

public UUID getThisUuid() {
return clusterService.getThisUuid();
}

public Config getConfig() {
return config;
}
Expand Down
Expand Up @@ -18,6 +18,7 @@

import com.hazelcast.internal.cluster.Joiner;
import com.hazelcast.instance.AddressPicker;
import com.hazelcast.internal.server.tcp.LocalAddressRegistry;
import com.hazelcast.internal.server.tcp.ServerSocketRegistry;
import com.hazelcast.internal.server.Server;

Expand All @@ -36,6 +37,5 @@ public interface NodeContext {

Joiner createJoiner(Node node);

// TODO Consider the changes here (JET?)
Server createServer(Node node, ServerSocketRegistry registry);
Server createServer(Node node, ServerSocketRegistry registry, LocalAddressRegistry addressRegistry);
}

0 comments on commit 16ec195

Please sign in to comment.