diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 17e361edef62e..a3ed6da3947b0 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -252,7 +252,7 @@ public void onResponse(Response response) { public void onFailure(Throwable e) { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { int i = ++this.i; - if (i == nodes.size()) { + if (i >= nodes.size()) { listener.onFailure(new NoNodeAvailableException()); } else { try { @@ -296,6 +296,28 @@ public void sample() { } protected abstract void doSample(); + + /** + * validates a set of potentially newly discovered nodes and returns an immutable + * list of the nodes that has passed. + */ + protected ImmutableList validateNewNodes(Set nodes) { + for (Iterator it = nodes.iterator(); it.hasNext(); ) { + DiscoveryNode node = it.next(); + if (!transportService.nodeConnected(node)) { + try { + logger.trace("connecting to node [{}]", node); + transportService.connectToNode(node); + } catch (Throwable e) { + it.remove(); + logger.debug("failed to connect to discovered node [" + node + "]", e); + } + } + } + + return new ImmutableList.Builder().addAll(nodes).build(); + } + } class ScheduledNodeSampler implements Runnable { @@ -317,17 +339,19 @@ class SimpleNodeSampler extends NodeSampler { @Override protected void doSample() { HashSet newNodes = new HashSet(); - for (DiscoveryNode node : listedNodes) { - if (!transportService.nodeConnected(node)) { + for (DiscoveryNode listedNode : listedNodes) { + if (!transportService.nodeConnected(listedNode)) { try { - transportService.connectToNode(node); + // its a listed node, light connect to it... + logger.trace("connecting to listed node (light) [{}]", listedNode); + transportService.connectToNodeLight(listedNode); } catch (Throwable e) { - logger.debug("failed to connect to node [{}], removed from nodes list", e, node); + logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); continue; } } try { - NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME, + NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME, Requests.nodesInfoRequest("_local"), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new FutureTransportResponseHandler() { @@ -337,16 +361,26 @@ public NodesInfoResponse newInstance() { } }).txGet(); if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) { - logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName); + logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); + } else if (nodeInfo.getNodes().length != 0) { + // use discovered information but do keep the original transport address, so people can control which address + // is exactly used. + + DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode(); + newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version())); } else { - newNodes.add(node); + // although we asked for one node, our target may not have completed initialization yet and doesn't have + // cluster nodes + logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode); + newNodes.add(listedNode); } } catch (Exception e) { - logger.info("failed to get node info for {}, disconnecting...", e, node); - transportService.disconnectFromNode(node); + logger.info("failed to get node info for {}, disconnecting...", e, listedNode); + transportService.disconnectFromNode(listedNode); } } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } @@ -442,20 +476,8 @@ public void handleException(TransportException e) { newNodes.add(node); } } - // now, make sure we are connected to all the updated nodes - for (Iterator it = newNodes.iterator(); it.hasNext(); ) { - DiscoveryNode node = it.next(); - if (!transportService.nodeConnected(node)) { - try { - logger.trace("connecting to node [{}]", node); - transportService.connectToNode(node); - } catch (Throwable e) { - it.remove(); - logger.debug("failed to connect to discovered node [" + node + "]", e); - } - } - } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java new file mode 100644 index 0000000000000..d9095576bb7f6 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -0,0 +1,39 @@ +package org.elasticsearch.client.transport; +/* + * Licensed to ElasticSearch under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.hamcrest.Matchers; +import org.junit.Test; + +@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0, transportClientRatio = 1.0) +public class TransportClientTests extends ElasticsearchIntegrationTest { + + @Test + public void testPickingUpChangesInDiscoveryNode() { + String nodeName = cluster().startNode(ImmutableSettings.builder().put("node.data", false)); + + TransportClient client = (TransportClient) cluster().client(nodeName); + assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false)); + + } +}