diff --git a/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index f9845a886485d..cd170bc7eb729 100644 --- a/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -42,6 +42,7 @@ import org.elasticsearch.monitor.fs.FsStats; import org.elasticsearch.node.settings.NodeSettingsService; import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.ReceiveTimeoutTransportException; import java.util.*; import java.util.concurrent.CountDownLatch; @@ -61,6 +62,7 @@ public class InternalClusterInfoService extends AbstractComponent implements ClusterInfoService, LocalNodeMasterListener, ClusterStateListener { public static final String INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL = "cluster.info.update.interval"; + public static final String INTERNAL_CLUSTER_INFO_TIMEOUT = "cluster.info.update.timeout"; private volatile TimeValue updateFrequency; @@ -68,6 +70,7 @@ public class InternalClusterInfoService extends AbstractComponent implements Clu private volatile ImmutableMap shardSizes; private volatile boolean isMaster = false; private volatile boolean enabled; + private volatile TimeValue fetchTimeout; private final TransportNodesStatsAction transportNodesStatsAction; private final TransportIndicesStatsAction transportIndicesStatsAction; private final ClusterService clusterService; @@ -87,6 +90,7 @@ public InternalClusterInfoService(Settings settings, NodeSettingsService nodeSet this.clusterService = clusterService; this.threadPool = threadPool; this.updateFrequency = settings.getAsTime(INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, TimeValue.timeValueSeconds(30)); + this.fetchTimeout = settings.getAsTime(INTERNAL_CLUSTER_INFO_TIMEOUT, TimeValue.timeValueSeconds(15)); this.enabled = settings.getAsBoolean(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_DISK_THRESHOLD_ENABLED, true); nodeSettingsService.addListener(new ApplySettings()); @@ -113,6 +117,13 @@ public void onRefreshSettings(Settings settings) { } } + TimeValue newFetchTimeout = settings.getAsTime(INTERNAL_CLUSTER_INFO_TIMEOUT, null); + if (newFetchTimeout != null) { + logger.info("updating fetch timeout [{}] from [{}] to [{}]", INTERNAL_CLUSTER_INFO_TIMEOUT, fetchTimeout, newFetchTimeout); + InternalClusterInfoService.this.fetchTimeout = newFetchTimeout; + } + + // We don't log about enabling it here, because the DiskThresholdDecider will already be logging about enable/disable if (newEnabled != null) { InternalClusterInfoService.this.enabled = newEnabled; @@ -131,7 +142,7 @@ public void onMaster() { threadPool.schedule(updateFrequency, executorName(), new SubmitReschedulingClusterInfoUpdatedJob()); if (clusterService.state().getNodes().getDataNodes().size() > 1) { // Submit an info update job to be run immediately - threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false)); + updateOnce(); } } catch (EsRejectedExecutionException ex) { if (logger.isDebugEnabled()) { @@ -140,6 +151,16 @@ public void onMaster() { } } + + // called from tests as well + + /** + * will collect a fresh {@link ClusterInfo} from the nodes, without scheduling a future collection + */ + void updateOnce() { + threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false)); + } + @Override public void offMaster() { this.isMaster = false; @@ -169,7 +190,7 @@ public void clusterChanged(ClusterChangedEvent event) { if (logger.isDebugEnabled()) { logger.debug("data node was added, retrieving new cluster info"); } - threadPool.executor(executorName()).execute(new ClusterInfoUpdateJob(false)); + updateOnce(); } if (this.isMaster && event.nodesRemoved()) { @@ -227,7 +248,7 @@ protected CountDownLatch updateNodeStats(final ActionListener(listener, latch)); return latch; @@ -316,12 +337,18 @@ public void onResponse(NodesStatsResponse nodeStatses) { @Override public void onFailure(Throwable e) { - if (e instanceof ClusterBlockException) { - if (logger.isTraceEnabled()) { - logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); - } + if (e instanceof ReceiveTimeoutTransportException) { + logger.error("NodeStatsAction timed out for ClusterInfoUpdateJob (reason [{}])", e.getMessage()); } else { - logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); + if (e instanceof ClusterBlockException) { + if (logger.isTraceEnabled()) { + logger.trace("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); + } + } else { + logger.warn("Failed to execute NodeStatsAction for ClusterInfoUpdateJob", e); + } + // we empty the usages list, to be safe - we don't know what's going on. + usages = ImmutableMap.of(); } } }); @@ -344,24 +371,30 @@ public void onResponse(IndicesStatsResponse indicesStatsResponse) { @Override public void onFailure(Throwable e) { - if (e instanceof ClusterBlockException) { - if (logger.isTraceEnabled()) { - logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); - } + if (e instanceof ReceiveTimeoutTransportException) { + logger.error("IndicesStatsAction timed out for ClusterInfoUpdateJob (reason [{}])", e.getMessage()); } else { - logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); + if (e instanceof ClusterBlockException) { + if (logger.isTraceEnabled()) { + logger.trace("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); + } + } else { + logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); + } + // we empty the usages list, to be safe - we don't know what's going on. + shardSizes = ImmutableMap.of(); } } }); try { - nodeLatch.await(15, TimeUnit.SECONDS); + nodeLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.warn("Failed to update node information for ClusterInfoUpdateJob within 15s timeout"); } try { - indicesLatch.await(15, TimeUnit.SECONDS); + indicesLatch.await(fetchTimeout.getMillis(), TimeUnit.MILLISECONDS); } catch (InterruptedException e) { logger.warn("Failed to update shard information for ClusterInfoUpdateJob within 15s timeout"); } diff --git a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java index fcc63ce86d947..a90e4f3e5f9e6 100644 --- a/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java +++ b/src/main/java/org/elasticsearch/cluster/settings/ClusterDynamicSettingsModule.java @@ -88,6 +88,7 @@ public ClusterDynamicSettingsModule() { clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_INCLUDE_RELOCATIONS, Validator.BOOLEAN); clusterDynamicSettings.addDynamicSetting(DiskThresholdDecider.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL, Validator.TIME_NON_NEGATIVE); clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, Validator.TIME_NON_NEGATIVE); + clusterDynamicSettings.addDynamicSetting(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT, Validator.TIME_NON_NEGATIVE); clusterDynamicSettings.addDynamicSetting(SnapshotInProgressAllocationDecider.CLUSTER_ROUTING_ALLOCATION_SNAPSHOT_RELOCATION_ENABLED); clusterDynamicSettings.addDynamicSetting(DestructiveOperations.REQUIRES_NAME); clusterDynamicSettings.addDynamicSetting(DiscoverySettings.PUBLISH_TIMEOUT, Validator.TIME_NON_NEGATIVE); diff --git a/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java b/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java new file mode 100644 index 0000000000000..3eeb405fadba7 --- /dev/null +++ b/src/test/java/org/elasticsearch/cluster/ClusterInfoServiceTests.java @@ -0,0 +1,263 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.cluster; + +import com.google.common.collect.ImmutableSet; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.admin.cluster.node.stats.NodesStatsAction; +import org.elasticsearch.action.admin.indices.stats.IndicesStatsAction; +import org.elasticsearch.action.support.ActionFilter; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.ImmutableSettings; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.plugins.AbstractPlugin; +import org.elasticsearch.test.ElasticsearchIntegrationTest; +import org.elasticsearch.test.InternalTestCluster; +import org.elasticsearch.test.transport.MockTransportService; +import org.elasticsearch.transport.*; +import org.hamcrest.Matchers; +import org.junit.Test; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; + +/** + * Integration tests for the ClusterInfoService collecting information + */ +@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.TEST, numDataNodes = 0) +public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { + + public static class Plugin extends AbstractPlugin { + + @Override + public String name() { + return "ClusterInfoServiceTests"; + } + + @Override + public String description() { + return "ClusterInfoServiceTests"; + } + + public void onModule(ActionModule module) { + module.registerFilter(BlockingActionFilter.class); + } + } + + public static class BlockingActionFilter extends org.elasticsearch.action.support.ActionFilter.Simple { + + ImmutableSet blockedActions = ImmutableSet.of(); + + @Inject + public BlockingActionFilter(Settings settings) { + super(settings); + } + + @Override + protected boolean apply(String action, ActionRequest request, ActionListener listener) { + if (blockedActions.contains(action)) { + throw new ElasticsearchException("force exception on [" + action + "]"); + } + return true; + } + + @Override + protected boolean apply(String action, ActionResponse response, ActionListener listener) { + return true; + } + + @Override + public int order() { + return 0; + } + + public void blockActions(String... actions) { + blockedActions = ImmutableSet.copyOf(actions); + } + } + + static class InfoListener implements ClusterInfoService.Listener { + final AtomicReference collected = new AtomicReference<>(new CountDownLatch(1)); + volatile ClusterInfo lastInfo = null; + + @Override + public void onNewInfo(ClusterInfo info) { + lastInfo = info; + CountDownLatch latch = collected.get(); + latch.countDown(); + } + + public void reset() { + lastInfo = null; + collected.set(new CountDownLatch(1)); + } + + public ClusterInfo get() throws InterruptedException { + CountDownLatch latch = collected.get(); + if (!latch.await(10, TimeUnit.SECONDS)) { + fail("failed to get a new cluster info"); + } + return lastInfo; + } + } + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return ImmutableSettings.builder() + // manual collection or upon cluster forming. + .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_TIMEOUT, "1s") + .put(TransportModule.TRANSPORT_SERVICE_TYPE_KEY, MockTransportService.class.getName()) + .put("plugin.types", Plugin.class.getName()) + .build(); + } + + @Test + public void testClusterInfoServiceCollectsInformation() throws Exception { + internalCluster().startNodesAsync(2, + ImmutableSettings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "200ms").build()) + .get(); + createIndex("test"); + ensureGreen("test"); + InternalTestCluster internalTestCluster = internalCluster(); + // Get the cluster info service on the master node + final InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName()); + InfoListener listener = new InfoListener(); + infoService.addListener(listener); + ClusterInfo info = listener.get(); + assertNotNull("info should not be null", info); + Map usages = info.getNodeDiskUsages(); + Map shardSizes = info.getShardSizes(); + assertNotNull(usages); + assertNotNull(shardSizes); + assertThat("some usages are populated", usages.values().size(), Matchers.equalTo(2)); + assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0)); + for (DiskUsage usage : usages.values()) { + logger.info("--> usage: {}", usage); + assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L)); + } + for (Long size : shardSizes.values()) { + logger.info("--> shard size: {}", size); + assertThat("shard size is greater than 0", size, greaterThan(0L)); + } + } + + @Test + public void testClusterInfoServiceInformationClearOnError() throws InterruptedException, ExecutionException { + internalCluster().startNodesAsync(2, + // manually control publishing + ImmutableSettings.builder().put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "60m").build()) + .get(); + createIndex("test"); + ensureGreen("test"); + InternalTestCluster internalTestCluster = internalCluster(); + InternalClusterInfoService infoService = (InternalClusterInfoService) internalTestCluster.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName()); + InfoListener listener = new InfoListener(); + infoService.addListener(listener); + + // get one healthy sample + infoService.updateOnce(); + ClusterInfo info = listener.get(); + assertNotNull("failed to collect info", info); + assertThat("some usages are populated", info.getNodeDiskUsages().size(), Matchers.equalTo(2)); + assertThat("some shard sizes are populated", info.getShardSizes().size(), greaterThan(0)); + + + MockTransportService mockTransportService = (MockTransportService) internalCluster().getInstance(TransportService.class, internalTestCluster.getMasterName()); + + final AtomicBoolean timeout = new AtomicBoolean(false); + final Set blockedActions = ImmutableSet.of(NodesStatsAction.NAME, NodesStatsAction.NAME + "[n]", IndicesStatsAction.NAME, IndicesStatsAction.NAME + "[s]"); + // drop all outgoing stats requests to force a timeout. + for (DiscoveryNode node : internalTestCluster.clusterService().state().getNodes()) { + mockTransportService.addDelegate(node, new MockTransportService.DelegateTransport(mockTransportService.original()) { + @Override + public void sendRequest(DiscoveryNode node, long requestId, String action, TransportRequest request, + TransportRequestOptions options) throws IOException, TransportException { + if (blockedActions.contains(action)) { + if (timeout.get()) { + logger.info("dropping [{}] to [{}]", action, node); + return; + } + } + super.sendRequest(node, requestId, action, request, options); + } + }); + } + + // timeouts shouldn't clear the info + timeout.set(true); + listener.reset(); + infoService.updateOnce(); + info = listener.get(); + assertNotNull("info should not be null", info); + // node info will time out both on the request level on the count down latch. this means + // it is likely to update the node disk usage based on the one response that came be from local + // node. + assertThat(info.getNodeDiskUsages().size(), greaterThanOrEqualTo(1)); + // indices is guaranteed to time out on the latch, not updating anything. + assertThat(info.getShardSizes().size(), greaterThan(1)); + + // now we cause an exception + timeout.set(false); + ActionFilters actionFilters = internalTestCluster.getInstance(ActionFilters.class, internalTestCluster.getMasterName()); + BlockingActionFilter blockingActionFilter = null; + for (ActionFilter filter : actionFilters.filters()) { + if (filter instanceof BlockingActionFilter) { + blockingActionFilter = (BlockingActionFilter) filter; + break; + } + } + + assertNotNull("failed to find BlockingActionFilter", blockingActionFilter); + blockingActionFilter.blockActions(blockedActions.toArray(Strings.EMPTY_ARRAY)); + listener.reset(); + infoService.updateOnce(); + info = listener.get(); + assertNotNull("info should not be null", info); + assertThat(info.getNodeDiskUsages().size(), equalTo(0)); + assertThat(info.getShardSizes().size(), equalTo(0)); + + // check we recover + blockingActionFilter.blockActions(); + listener.reset(); + infoService.updateOnce(); + info = listener.get(); + assertNotNull("info should not be null", info); + assertThat(info.getNodeDiskUsages().size(), equalTo(2)); + assertThat(info.getShardSizes().size(), greaterThan(0)); + + } +} diff --git a/src/test/java/org/elasticsearch/cluster/routing/ClusterInfoServiceTests.java b/src/test/java/org/elasticsearch/cluster/routing/ClusterInfoServiceTests.java deleted file mode 100644 index b8136e9b2b5eb..0000000000000 --- a/src/test/java/org/elasticsearch/cluster/routing/ClusterInfoServiceTests.java +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to Elasticsearch under one or more contributor - * license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright - * ownership. Elasticsearch licenses this file to you under - * the Apache License, Version 2.0 (the "License"); you may - * not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.elasticsearch.cluster.routing; - -import org.elasticsearch.cluster.ClusterInfo; -import org.elasticsearch.cluster.ClusterInfoService; -import org.elasticsearch.cluster.DiskUsage; -import org.elasticsearch.cluster.InternalClusterInfoService; -import org.elasticsearch.common.settings.ImmutableSettings; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.test.ElasticsearchIntegrationTest; -import org.elasticsearch.test.InternalTestCluster; -import org.junit.Test; - -import java.util.Map; - -import static org.hamcrest.Matchers.greaterThan; - -/** - * Integration tests for the ClusterInfoService collecting information - */ -@ElasticsearchIntegrationTest.ClusterScope(scope= ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes =0) -public class ClusterInfoServiceTests extends ElasticsearchIntegrationTest { - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return ImmutableSettings.builder() - .put(InternalClusterInfoService.INTERNAL_CLUSTER_INFO_UPDATE_INTERVAL, "1s") - .build(); - } - - @Test - public void testClusterInfoServiceCollectsInformation() throws Exception { - createIndex("test"); - ensureGreen("test"); - Thread.sleep(2000); // wait 2 seconds for new information to be gathered - InternalTestCluster internalTestCluster = internalCluster(); - // Get the cluster info service on the master node - ClusterInfoService infoService = internalTestCluster.getInstance(ClusterInfoService.class, internalTestCluster.getMasterName()); - ClusterInfo info = infoService.getClusterInfo(); - Map usages = info.getNodeDiskUsages(); - Map shardSizes = info.getShardSizes(); - assertNotNull(usages); - assertNotNull(shardSizes); - assertThat("some usages are populated", usages.values().size(), greaterThan(0)); - assertThat("some shard sizes are populated", shardSizes.values().size(), greaterThan(0)); - for (DiskUsage usage : usages.values()) { - logger.info("--> usage: {}", usage); - assertThat("usage has be retrieved", usage.getFreeBytes(), greaterThan(0L)); - } - for (Long size : shardSizes.values()) { - logger.info("--> shard size: {}", size); - assertThat("shard size is greater than 0", size, greaterThan(0L)); - } - } -}