Skip to content

Commit

Permalink
Add connectToNodeAsExtension in TransportService
Browse files Browse the repository at this point in the history
Signed-off-by: Craig Perkins <cwperx@amazon.com>
  • Loading branch information
cwperks committed Mar 29, 2023
1 parent bd9b00d commit de8b8b8
Showing 1 changed file with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ public void close() {}
* Build the service.
*
* @param clusterSettings if non null, the {@linkplain TransportService} will register with the {@link ClusterSettings} for settings
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
* * updates for {@link TransportSettings#TRACE_LOG_EXCLUDE_SETTING} and {@link TransportSettings#TRACE_LOG_INCLUDE_SETTING}.
*/
public TransportService(
Settings settings,
Expand Down Expand Up @@ -397,6 +397,15 @@ public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, (ConnectionProfile) null);
}

/**
* Connect to the specified node as an extension with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNodeAsExtension(DiscoveryNode node, String extensionUniqueId) throws ConnectTransportException {
connectToNodeAsExtension(node, (ConnectionProfile) null, extensionUniqueId);
}

// We are skipping node validation for extensibility as extensionNode and opensearchNode(LocalNode) will have different ephemeral id's
public void connectToExtensionNode(final DiscoveryNode node) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, (ConnectionProfile) null, ActionListener.map(fut, x -> null)));
Expand All @@ -412,6 +421,19 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
PlainActionFuture.get(fut -> connectToNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}

/**
* Connect to the specified node with the given connection profile
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueIq the id of the extension
*/
public void connectToNodeAsExtension(final DiscoveryNode node, ConnectionProfile connectionProfile, String extensionUniqueIq) {
PlainActionFuture.get(
fut -> connectToNodeAsExtension(node, connectionProfile, extensionUniqueIq, ActionListener.map(fut, x -> null))
);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile) {
PlainActionFuture.get(fut -> connectToExtensionNode(node, connectionProfile, ActionListener.map(fut, x -> null)));
}
Expand Down Expand Up @@ -447,6 +469,28 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
connectionManager.connectToNode(node, connectionProfile, connectionValidator(node), listener);
}

/**
* Connect to the specified node with the given connection profile.
* The ActionListener will be called on the calling thread or the generic thread pool.
*
* @param node the node to connect to
* @param connectionProfile the connection profile to use when connecting to this node
* @param extensionUniqueId the id of the extension
* @param listener the action listener to notify
*/
public void connectToNodeAsExtension(
final DiscoveryNode node,
ConnectionProfile connectionProfile,
String extensionUniqueId,
ActionListener<Void> listener
) {
if (isLocalNode(node)) {
listener.onResponse(null);
return;
}
connectionManager.connectToNode(node, connectionProfile, connectionValidatorForExtension(node, extensionUniqueId), listener);
}

public void connectToExtensionNode(final DiscoveryNode node, ConnectionProfile connectionProfile, ActionListener<Void> listener) {
if (isLocalNode(node)) {
listener.onResponse(null);
Expand All @@ -470,6 +514,22 @@ public ConnectionManager.ConnectionValidator connectionValidator(DiscoveryNode n
};
}

public ConnectionManager.ConnectionValidator connectionValidatorForExtension(DiscoveryNode node, String extensionUniqueId) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
threadPool.getThreadContext().putHeader("extension_unique_id", extensionUniqueId);
handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true, ActionListener.map(listener, resp -> {
final DiscoveryNode remote = resp.discoveryNode;

if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}

return null;
}));
};
}

public ConnectionManager.ConnectionValidator extensionConnectionValidator(DiscoveryNode node) {
return (newConnection, actualProfile, listener) -> {
// We don't validate cluster names to allow for CCS connections.
Expand Down

0 comments on commit de8b8b8

Please sign in to comment.