Skip to content

Commit

Permalink
Hot-reloadable remote cluster credentials (with blocking call fix) (e…
Browse files Browse the repository at this point in the history
…lastic#103215)

Brings back elastic#102798, with a
tweak to avoid tripping on a blocking operation.

Only change compared to the original PR is
elastic@4072fac
  • Loading branch information
n1v0lg authored and javanna committed Dec 21, 2023
1 parent 2645652 commit 1d2f264
Show file tree
Hide file tree
Showing 25 changed files with 1,069 additions and 260 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102798.yaml
@@ -0,0 +1,5 @@
pr: 102798
summary: Hot-reloadable remote cluster credentials
area: Security
type: enhancement
issues: []
Expand Up @@ -179,7 +179,7 @@ public class ProxyConnectionStrategy extends RemoteConnectionStrategy {
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
newConnection,
clusterAlias,
actualProfile.getTransportProfile()
connectionManager.getCredentialsManager()
),
actualProfile.getHandshakeTimeout(),
cn -> true,
Expand Down
Expand Up @@ -57,15 +57,28 @@ final class RemoteClusterConnection implements Closeable {
* @param settings the nodes settings object
* @param clusterAlias the configured alias of the cluster to connect to
* @param transportService the local nodes transport service
* @param credentialsProtected Whether the remote cluster is protected by a credentials, i.e. it has a credentials configured
* via secure setting. This means the remote cluster uses the new configurable access RCS model
* (as opposed to the basic model).
* @param credentialsManager object to lookup remote cluster credentials by cluster alias. If a cluster is protected by a credential,
* i.e. it has a credential configured via secure setting.
* This means the remote cluster uses the advances RCS model (as opposed to the basic model).
*/
RemoteClusterConnection(Settings settings, String clusterAlias, TransportService transportService, boolean credentialsProtected) {
RemoteClusterConnection(
Settings settings,
String clusterAlias,
TransportService transportService,
RemoteClusterCredentialsManager credentialsManager
) {
this.transportService = transportService;
this.clusterAlias = clusterAlias;
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(clusterAlias, settings, credentialsProtected);
this.remoteConnectionManager = new RemoteConnectionManager(clusterAlias, createConnectionManager(profile, transportService));
ConnectionProfile profile = RemoteConnectionStrategy.buildConnectionProfile(
clusterAlias,
settings,
credentialsManager.hasCredentials(clusterAlias)
);
this.remoteConnectionManager = new RemoteConnectionManager(
clusterAlias,
credentialsManager,
createConnectionManager(profile, transportService)
);
this.connectionStrategy = RemoteConnectionStrategy.buildStrategy(clusterAlias, transportService, remoteConnectionManager, settings);
// we register the transport service here as a listener to make sure we notify handlers on disconnect etc.
this.remoteConnectionManager.addListener(transportService);
Expand Down
@@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.transport;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.Nullable;

import java.util.Map;

import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_CREDENTIALS;

public class RemoteClusterCredentialsManager {

private static final Logger logger = LogManager.getLogger(RemoteClusterCredentialsManager.class);

private volatile Map<String, SecureString> clusterCredentials;

public RemoteClusterCredentialsManager(Settings settings) {
updateClusterCredentials(settings);
}

public void updateClusterCredentials(Settings settings) {
clusterCredentials = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings);
logger.debug(
() -> Strings.format(
"Updated remote cluster credentials for clusters: [%s]",
Strings.collectionToCommaDelimitedString(clusterCredentials.keySet())
)
);
}

@Nullable
public SecureString resolveCredentials(String clusterAlias) {
return clusterCredentials.get(clusterAlias);
}

public boolean hasCredentials(String clusterAlias) {
return clusterCredentials.containsKey(clusterAlias);
}

public static final RemoteClusterCredentialsManager EMPTY = new RemoteClusterCredentialsManager(Settings.EMPTY);
}
Expand Up @@ -147,15 +147,14 @@ public boolean isRemoteClusterServerEnabled() {

private final TransportService transportService;
private final Map<String, RemoteClusterConnection> remoteClusters = ConcurrentCollections.newConcurrentMap();
private final Set<String> credentialsProtectedRemoteClusters;
private final RemoteClusterCredentialsManager remoteClusterCredentialsManager;

RemoteClusterService(Settings settings, TransportService transportService) {
super(settings);
this.enabled = DiscoveryNode.isRemoteClusterClient(settings);
this.remoteClusterServerEnabled = REMOTE_CLUSTER_SERVER_ENABLED.get(settings);
this.transportService = transportService;
this.credentialsProtectedRemoteClusters = REMOTE_CLUSTER_CREDENTIALS.getAsMap(settings).keySet();

this.remoteClusterCredentialsManager = new RemoteClusterCredentialsManager(settings);
if (remoteClusterServerEnabled) {
registerRemoteClusterHandshakeRequestHandler(transportService);
}
Expand Down Expand Up @@ -305,6 +304,14 @@ private synchronized void updateSkipUnavailable(String clusterAlias, Boolean ski
}
}

public void updateRemoteClusterCredentials(Settings settings) {
remoteClusterCredentialsManager.updateClusterCredentials(settings);
}

public RemoteClusterCredentialsManager getRemoteClusterCredentialsManager() {
return remoteClusterCredentialsManager;
}

@Override
protected void updateRemoteCluster(String clusterAlias, Settings settings) {
CountDownLatch latch = new CountDownLatch(1);
Expand Down Expand Up @@ -363,12 +370,7 @@ synchronized void updateRemoteCluster(
if (remote == null) {
// this is a new cluster we have to add a new representation
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.CONNECTED));
} else if (remote.shouldRebuildConnection(newSettings)) {
Expand All @@ -380,12 +382,7 @@ synchronized void updateRemoteCluster(
}
remoteClusters.remove(clusterAlias);
Settings finalSettings = Settings.builder().put(this.settings, false).put(newSettings, false).build();
remote = new RemoteClusterConnection(
finalSettings,
clusterAlias,
transportService,
credentialsProtectedRemoteClusters.contains(clusterAlias)
);
remote = new RemoteClusterConnection(finalSettings, clusterAlias, transportService, remoteClusterCredentialsManager);
remoteClusters.put(clusterAlias, remote);
remote.ensureConnected(listener.map(ignored -> RemoteClusterConnectionStatus.RECONNECTED));
} else {
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.TransportVersion;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.SecureString;
import org.elasticsearch.common.util.CollectionUtils;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Releasable;
Expand All @@ -25,18 +26,19 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

import static org.elasticsearch.transport.RemoteClusterPortSettings.REMOTE_CLUSTER_PROFILE;
import static org.elasticsearch.transport.RemoteClusterService.REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;

public class RemoteConnectionManager implements ConnectionManager {

private final String clusterAlias;
private final RemoteClusterCredentialsManager credentialsManager;
private final ConnectionManager delegate;
private final AtomicLong counter = new AtomicLong();
private volatile List<DiscoveryNode> connectedNodes = Collections.emptyList();

RemoteConnectionManager(String clusterAlias, ConnectionManager delegate) {
RemoteConnectionManager(String clusterAlias, RemoteClusterCredentialsManager credentialsManager, ConnectionManager delegate) {
this.clusterAlias = clusterAlias;
this.credentialsManager = credentialsManager;
this.delegate = delegate;
this.delegate.addListener(new TransportConnectionListener() {
@Override
Expand All @@ -51,6 +53,10 @@ public void onNodeDisconnected(DiscoveryNode node, Transport.Connection connecti
});
}

public RemoteClusterCredentialsManager getCredentialsManager() {
return credentialsManager;
}

/**
* Remote cluster connections have a different lifecycle from intra-cluster connections. Use {@link #connectToRemoteClusterNode}
* instead of this method.
Expand Down Expand Up @@ -95,13 +101,7 @@ public void openConnection(DiscoveryNode node, @Nullable ConnectionProfile profi
node,
profile,
listener.delegateFailureAndWrap(
(l, connection) -> l.onResponse(
new InternalRemoteConnection(
connection,
clusterAlias,
profile != null ? profile.getTransportProfile() : getConnectionProfile().getTransportProfile()
)
)
(l, connection) -> l.onResponse(wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager))
)
);
}
Expand Down Expand Up @@ -182,16 +182,35 @@ public void closeNoBlock() {
* @return a cluster alias if the connection target a node in the remote cluster, otherwise an empty result
*/
public static Optional<String> resolveRemoteClusterAlias(Transport.Connection connection) {
return resolveRemoteClusterAliasWithCredentials(connection).map(RemoteClusterAliasWithCredentials::clusterAlias);
}

public record RemoteClusterAliasWithCredentials(String clusterAlias, @Nullable SecureString credentials) {
@Override
public String toString() {
return "RemoteClusterAliasWithCredentials{clusterAlias='" + clusterAlias + "', credentials='::es_redacted::'}";
}
}

/**
* This method returns information (alias and credentials) for remote cluster for the given transport connection.
* Either or both of alias and credentials can be null depending on the connection.
*
* @param connection the transport connection for which to resolve a remote cluster alias
*/
public static Optional<RemoteClusterAliasWithCredentials> resolveRemoteClusterAliasWithCredentials(Transport.Connection connection) {
Transport.Connection unwrapped = TransportService.unwrapConnection(connection);
if (unwrapped instanceof InternalRemoteConnection remoteConnection) {
return Optional.of(remoteConnection.getClusterAlias());
return Optional.of(
new RemoteClusterAliasWithCredentials(remoteConnection.getClusterAlias(), remoteConnection.getClusterCredentials())
);
}
return Optional.empty();
}

private Transport.Connection getConnectionInternal(DiscoveryNode node) throws NodeNotConnectedException {
Transport.Connection connection = delegate.getConnection(node);
return new InternalRemoteConnection(connection, clusterAlias, getConnectionProfile().getTransportProfile());
return wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, credentialsManager);
}

private synchronized void addConnectedNode(DiscoveryNode addedNode) {
Expand Down Expand Up @@ -297,21 +316,27 @@ private static final class InternalRemoteConnection implements Transport.Connect
private static final Logger logger = LogManager.getLogger(InternalRemoteConnection.class);
private final Transport.Connection connection;
private final String clusterAlias;
private final boolean isRemoteClusterProfile;
@Nullable
private final SecureString clusterCredentials;

InternalRemoteConnection(Transport.Connection connection, String clusterAlias, String transportProfile) {
private InternalRemoteConnection(Transport.Connection connection, String clusterAlias, @Nullable SecureString clusterCredentials) {
assert false == connection instanceof InternalRemoteConnection : "should not double wrap";
assert false == connection instanceof ProxyConnection
: "proxy connection should wrap internal remote connection, not the other way around";
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.connection = Objects.requireNonNull(connection);
this.isRemoteClusterProfile = REMOTE_CLUSTER_PROFILE.equals(Objects.requireNonNull(transportProfile));
this.clusterAlias = Objects.requireNonNull(clusterAlias);
this.clusterCredentials = clusterCredentials;
}

public String getClusterAlias() {
return clusterAlias;
}

@Nullable
public SecureString getClusterCredentials() {
return clusterCredentials;
}

@Override
public DiscoveryNode getNode() {
return connection.getNode();
Expand All @@ -321,7 +346,7 @@ public DiscoveryNode getNode() {
public void sendRequest(long requestId, String action, TransportRequest request, TransportRequestOptions options)
throws IOException, TransportException {
final String effectiveAction;
if (isRemoteClusterProfile && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
if (clusterCredentials != null && TransportService.HANDSHAKE_ACTION_NAME.equals(action)) {
logger.trace("sending remote cluster specific handshake to node [{}] of remote cluster [{}]", getNode(), clusterAlias);
effectiveAction = REMOTE_CLUSTER_HANDSHAKE_ACTION_NAME;
} else {
Expand Down Expand Up @@ -389,8 +414,8 @@ public boolean hasReferences() {
static InternalRemoteConnection wrapConnectionWithRemoteClusterInfo(
Transport.Connection connection,
String clusterAlias,
String transportProfile
RemoteClusterCredentialsManager credentialsManager
) {
return new InternalRemoteConnection(connection, clusterAlias, transportProfile);
return new InternalRemoteConnection(connection, clusterAlias, credentialsManager.resolveCredentials(clusterAlias));
}
}
Expand Up @@ -357,7 +357,11 @@ private ConnectionManager.ConnectionValidator getConnectionValidator(DiscoveryNo
: "transport profile must be consistent between the connection manager and the actual profile";
transportService.connectionValidator(node)
.validate(
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(connection, clusterAlias, profile.getTransportProfile()),
RemoteConnectionManager.wrapConnectionWithRemoteClusterInfo(
connection,
clusterAlias,
connectionManager.getCredentialsManager()
),
profile,
listener
);
Expand Down

0 comments on commit 1d2f264

Please sign in to comment.