From b86b471517e76040dc4501c8979bbd61b7e7115f Mon Sep 17 00:00:00 2001 From: Boaz Leskes Date: Sat, 2 Nov 2013 15:16:37 +0100 Subject: [PATCH] Make SimpleNodeSampler populate the list of connected nodes using the information returned from the cluster This is to allow people to introspect things like data settings and attributes. Also makes it consistent with the sniff sampler. Closes #4162 --- .../TransportClientNodesService.java | 72 ++++++++++++------- .../transport/TransportClientTests.java | 39 ++++++++++ 2 files changed, 86 insertions(+), 25 deletions(-) create mode 100644 src/test/java/org/elasticsearch/client/transport/TransportClientTests.java diff --git a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java index 17e361edef62e..a3ed6da3947b0 100644 --- a/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java +++ b/src/main/java/org/elasticsearch/client/transport/TransportClientNodesService.java @@ -252,7 +252,7 @@ public void onResponse(Response response) { public void onFailure(Throwable e) { if (ExceptionsHelper.unwrapCause(e) instanceof ConnectTransportException) { int i = ++this.i; - if (i == nodes.size()) { + if (i >= nodes.size()) { listener.onFailure(new NoNodeAvailableException()); } else { try { @@ -296,6 +296,28 @@ public void sample() { } protected abstract void doSample(); + + /** + * validates a set of potentially newly discovered nodes and returns an immutable + * list of the nodes that has passed. + */ + protected ImmutableList validateNewNodes(Set nodes) { + for (Iterator it = nodes.iterator(); it.hasNext(); ) { + DiscoveryNode node = it.next(); + if (!transportService.nodeConnected(node)) { + try { + logger.trace("connecting to node [{}]", node); + transportService.connectToNode(node); + } catch (Throwable e) { + it.remove(); + logger.debug("failed to connect to discovered node [" + node + "]", e); + } + } + } + + return new ImmutableList.Builder().addAll(nodes).build(); + } + } class ScheduledNodeSampler implements Runnable { @@ -317,17 +339,19 @@ class SimpleNodeSampler extends NodeSampler { @Override protected void doSample() { HashSet newNodes = new HashSet(); - for (DiscoveryNode node : listedNodes) { - if (!transportService.nodeConnected(node)) { + for (DiscoveryNode listedNode : listedNodes) { + if (!transportService.nodeConnected(listedNode)) { try { - transportService.connectToNode(node); + // its a listed node, light connect to it... + logger.trace("connecting to listed node (light) [{}]", listedNode); + transportService.connectToNodeLight(listedNode); } catch (Throwable e) { - logger.debug("failed to connect to node [{}], removed from nodes list", e, node); + logger.debug("failed to connect to node [{}], removed from nodes list", e, listedNode); continue; } } try { - NodesInfoResponse nodeInfo = transportService.submitRequest(node, NodesInfoAction.NAME, + NodesInfoResponse nodeInfo = transportService.submitRequest(listedNode, NodesInfoAction.NAME, Requests.nodesInfoRequest("_local"), TransportRequestOptions.options().withType(TransportRequestOptions.Type.STATE).withTimeout(pingTimeout), new FutureTransportResponseHandler() { @@ -337,16 +361,26 @@ public NodesInfoResponse newInstance() { } }).txGet(); if (!ignoreClusterName && !clusterName.equals(nodeInfo.getClusterName())) { - logger.warn("node {} not part of the cluster {}, ignoring...", node, clusterName); + logger.warn("node {} not part of the cluster {}, ignoring...", listedNode, clusterName); + } else if (nodeInfo.getNodes().length != 0) { + // use discovered information but do keep the original transport address, so people can control which address + // is exactly used. + + DiscoveryNode nodeWithInfo = nodeInfo.getNodes()[0].getNode(); + newNodes.add(new DiscoveryNode(nodeWithInfo.name(), nodeWithInfo.id(), listedNode.address(), nodeWithInfo.attributes(), nodeWithInfo.version())); } else { - newNodes.add(node); + // although we asked for one node, our target may not have completed initialization yet and doesn't have + // cluster nodes + logger.debug("node {} didn't return any discovery info, temporarily using transport discovery node", listedNode); + newNodes.add(listedNode); } } catch (Exception e) { - logger.info("failed to get node info for {}, disconnecting...", e, node); - transportService.disconnectFromNode(node); + logger.info("failed to get node info for {}, disconnecting...", e, listedNode); + transportService.disconnectFromNode(listedNode); } } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } @@ -442,20 +476,8 @@ public void handleException(TransportException e) { newNodes.add(node); } } - // now, make sure we are connected to all the updated nodes - for (Iterator it = newNodes.iterator(); it.hasNext(); ) { - DiscoveryNode node = it.next(); - if (!transportService.nodeConnected(node)) { - try { - logger.trace("connecting to node [{}]", node); - transportService.connectToNode(node); - } catch (Throwable e) { - it.remove(); - logger.debug("failed to connect to discovered node [" + node + "]", e); - } - } - } - nodes = new ImmutableList.Builder().addAll(newNodes).build(); + + nodes = validateNewNodes(newNodes); } } diff --git a/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java new file mode 100644 index 0000000000000..d9095576bb7f6 --- /dev/null +++ b/src/test/java/org/elasticsearch/client/transport/TransportClientTests.java @@ -0,0 +1,39 @@ +package org.elasticsearch.client.transport; +/* + * Licensed to ElasticSearch under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.ElasticsearchIntegrationTest.ClusterScope; +import org.hamcrest.Matchers; +import org.junit.Test; + +@ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numNodes = 0, transportClientRatio = 1.0) +public class TransportClientTests extends ElasticsearchIntegrationTest { + + @Test + public void testPickingUpChangesInDiscoveryNode() { + String nodeName = cluster().startNode(ImmutableSettings.builder().put("node.data", false)); + + TransportClient client = (TransportClient) cluster().client(nodeName); + assertThat(client.connectedNodes().get(0).dataNode(), Matchers.equalTo(false)); + + } +}