Skip to content

Commit

Permalink
Ensure we don't use a remote profile if cluster name matches (elastic…
Browse files Browse the repository at this point in the history
…#31331)

If we are running into a race condition between a node being configured
to be a remote node for cross cluster search etc. and that node joining
the cluster we might connect to that node with a remote profile. If that
node now joins the cluster it connected to it as a CCS remote node we use
the wrong profile and can't use bulk connections etc. anymore. This change
uses the remote profile only if we connect to a node that has a different cluster
name than the local cluster. This is not a perfect fix for this situation but
is the safe option while potentially only loose a small optimization of using
less connections per node which is small anyways since we only connect to a
small set of nodes.

Closes elastic#29321
  • Loading branch information
s1monw committed Jun 17, 2018
1 parent be382e9 commit 3c1ab95
Show file tree
Hide file tree
Showing 4 changed files with 160 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private volatile List<DiscoveryNode> seedNodes;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
private final ClusterName localClusterName;

/**
* Creates a new {@link RemoteClusterConnection}
Expand All @@ -103,6 +104,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
this.transportService = transportService;
this.maxNumRemoteConnections = maxNumRemoteConnections;
this.nodePredicate = nodePredicate;
Expand Down Expand Up @@ -293,6 +295,21 @@ public boolean isClosed() {
return connectHandler.isClosed();
}

private ConnectionProfile getRemoteProfile(ClusterName name) {
// we can only compare the cluster name to make a decision if we should use a remote profile
// we can't use a cluster UUID here since we could be connecting to that remote cluster before
// the remote node has joined its cluster and have a cluster UUID. The fact that we just lose a
// rather smallish optimization on the connection layer under certain situations where remote clusters
// have the same name as the local one is minor here.
// the alternative here is to complicate the remote infrastructure to also wait until we formed a cluster,
// gained a cluster UUID and then start connecting etc. we rather use this simplification in order to maintain simplicity
if (this.localClusterName.equals(name)) {
return null;
} else {
return remoteProfile;
}
}

/**
* The connect handler manages node discovery and the actual connect to the remote cluster.
* There is at most one connect job running at any time. If such a connect job is triggered
Expand Down Expand Up @@ -402,7 +419,6 @@ protected void doRun() throws Exception {
collectRemoteNodes(seedNodes.iterator(), transportService, listener);
}
});

}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
Expand All @@ -414,21 +430,27 @@ void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode handshakeNode;
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
boolean success = false;
try {
try {
handshakeNode = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
handshakeResponse = transportService.handshake(connection, remoteProfile.getHandshakeTimeout().millis(),
(c) -> remoteClusterName.get() == null ? true : c.equals(remoteClusterName.get()));
} catch (IllegalStateException ex) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("seed node {} cluster name mismatch expected " +
"cluster name {}", connection.getNode(), remoteClusterName.get()), ex);
throw ex;
}

final DiscoveryNode handshakeNode = handshakeResponse.getDiscoveryNode();
if (nodePredicate.test(handshakeNode) && connectedNodes.size() < maxNumRemoteConnections) {
transportService.connectToNode(handshakeNode, remoteProfile);
transportService.connectToNode(handshakeNode, getRemoteProfile(handshakeResponse.getClusterName()));
if (remoteClusterName.get() == null) {
assert handshakeResponse.getClusterName().value() != null;
remoteClusterName.set(handshakeResponse.getClusterName());
}
connectedNodes.add(handshakeNode);
}
ClusterStateRequest request = new ClusterStateRequest();
Expand Down Expand Up @@ -536,7 +558,8 @@ public void handleResponse(ClusterStateResponse response) {
for (DiscoveryNode node : nodesIter) {
if (nodePredicate.test(node) && connectedNodes.size() < maxNumRemoteConnections) {
try {
transportService.connectToNode(node, remoteProfile); // noop if node is connected
transportService.connectToNode(node, getRemoteProfile(remoteClusterName.get())); // noop if node is
// connected
connectedNodes.add(node);
} catch (ConnectTransportException | IllegalStateException ex) {
// ISE if we fail the handshake with an version incompatible node
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,8 +340,8 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
return;
}
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for tribe node connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true).discoveryNode;
if (node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
Expand Down Expand Up @@ -377,7 +377,7 @@ public Transport.Connection openConnection(final DiscoveryNode node, ConnectionP
public DiscoveryNode handshake(
final Transport.Connection connection,
final long handshakeTimeout) throws ConnectTransportException {
return handshake(connection, handshakeTimeout, clusterName::equals);
return handshake(connection, handshakeTimeout, clusterName::equals).discoveryNode;
}

/**
Expand All @@ -389,11 +389,11 @@ public DiscoveryNode handshake(
* @param connection the connection to a specific node
* @param handshakeTimeout handshake timeout
* @param clusterNamePredicate cluster name validation predicate
* @return the connected node
* @return the handshake response
* @throws ConnectTransportException if the connection failed
* @throws IllegalStateException if the handshake failed
*/
public DiscoveryNode handshake(
public HandshakeResponse handshake(
final Transport.Connection connection,
final long handshakeTimeout, Predicate<ClusterName> clusterNamePredicate) throws ConnectTransportException {
final HandshakeResponse response;
Expand All @@ -419,7 +419,7 @@ public HandshakeResponse newInstance() {
throw new IllegalStateException("handshake failed, incompatible version [" + response.version + "] - " + node);
}

return response.discoveryNode;
return response;
}

static class HandshakeRequest extends TransportRequest {
Expand Down Expand Up @@ -460,6 +460,14 @@ public void writeTo(StreamOutput out) throws IOException {
clusterName.writeTo(out);
Version.writeVersion(version, out);
}

public DiscoveryNode getDiscoveryNode() {
return discoveryNode;
}

public ClusterName getClusterName() {
return clusterName;
}
}

public void disconnectFromNode(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,99 @@ public static MockTransportService startTransport(
}
}

public void testLocalProfileIsUsedForLocalCluster() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT)) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {

@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
});
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
.build();
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
options, futureHandler);
futureHandler.txGet();
}
}
}
}

public void testRemoteProfileIsUsedForRemoteCluster() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT, threadPool,
Settings.builder().put("cluster.name", "foobar").build());
MockTransportService discoverableTransport = startTransport("discoverable_node", knownNodes, Version.CURRENT,
threadPool, Settings.builder().put("cluster.name", "foobar").build())) {
DiscoveryNode seedNode = seedTransport.getLocalDiscoNode();
DiscoveryNode discoverableNode = discoverableTransport.getLocalDiscoNode();
knownNodes.add(seedTransport.getLocalDiscoNode());
knownNodes.add(discoverableTransport.getLocalDiscoNode());
Collections.shuffle(knownNodes, random());

try (MockTransportService service = MockTransportService.createNewService(Settings.EMPTY, Version.CURRENT, threadPool, null)) {
service.start();
service.acceptIncomingRequests();
try (RemoteClusterConnection connection = new RemoteClusterConnection(Settings.EMPTY, "test-cluster",
Arrays.asList(seedNode), service, Integer.MAX_VALUE, n -> true)) {
updateSeedNodes(connection, Arrays.asList(seedNode));
assertTrue(service.nodeConnected(seedNode));
assertTrue(service.nodeConnected(discoverableNode));
assertTrue(connection.assertNoRunningConnections());
PlainTransportFuture<ClusterSearchShardsResponse> futureHandler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {

@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
});
TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.BULK)
.build();
IllegalStateException ise = (IllegalStateException) expectThrows(SendRequestTransportException.class, () -> {
service.sendRequest(discoverableNode,
ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(), options, futureHandler);
futureHandler.txGet();
}).getCause();
assertEquals(ise.getMessage(), "can't select channel size is 0 for types: [RECOVERY, BULK, STATE]");

PlainTransportFuture<ClusterSearchShardsResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<ClusterSearchShardsResponse>() {

@Override
public ClusterSearchShardsResponse newInstance() {
return new ClusterSearchShardsResponse();
}
});
TransportRequestOptions ops = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG)
.build();
service.sendRequest(connection.getConnection(), ClusterSearchShardsAction.NAME, new ClusterSearchShardsRequest(),
ops, handler);
handler.txGet();
}
}
}
}

public void testDiscoverSingleNode() throws Exception {
List<DiscoveryNode> knownNodes = new CopyOnWriteArrayList<>();
try (MockTransportService seedTransport = startTransport("seed_node", knownNodes, Version.CURRENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,12 +175,36 @@ private void readMessage(MockChannel mockChannel, StreamInput input) throws IOEx
}
}


ConnectionProfile resolveConnectionProfile(ConnectionProfile connectionProfile) {
ConnectionProfile connectionProfile1 = connectionProfile == null ? LIGHT_PROFILE : connectionProfile;
ConnectionProfile.Builder builder = new ConnectionProfile.Builder();
Set<TransportRequestOptions.Type> allTypesWithConnection = new HashSet<>();
Set<TransportRequestOptions.Type> allTypesWithoutConnection = new HashSet<>();
for (ConnectionProfile.ConnectionTypeHandle handle : connectionProfile1.getHandles()) {
Set<TransportRequestOptions.Type> types = handle.getTypes();
if (handle.length > 0) {
allTypesWithConnection.addAll(types);
} else {
allTypesWithoutConnection.addAll(types);
}
}
// make sure we maintain at least the types that are supported by this profile even if we only use a single channel for them.
builder.addConnections(1, allTypesWithConnection.toArray(new TransportRequestOptions.Type[0]));
if (allTypesWithoutConnection.isEmpty() == false) {
builder.addConnections(0, allTypesWithoutConnection.toArray(new TransportRequestOptions.Type[0]));
}
builder.setHandshakeTimeout(connectionProfile1.getHandshakeTimeout());
builder.setConnectTimeout(connectionProfile1.getConnectTimeout());
return builder.build();
}

@Override
protected NodeChannels connectToChannels(DiscoveryNode node,
ConnectionProfile profile,
Consumer<MockChannel> onChannelClose) throws IOException {
final MockChannel[] mockChannels = new MockChannel[1];
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, LIGHT_PROFILE); // we always use light here
final NodeChannels nodeChannels = new NodeChannels(node, mockChannels, resolveConnectionProfile(profile));
boolean success = false;
final Socket socket = new Socket();
try {
Expand Down

0 comments on commit 3c1ab95

Please sign in to comment.