From 525b5b6a8c60c223712167cba6d111cb60244144 Mon Sep 17 00:00:00 2001 From: Tim Vernum Date: Tue, 10 Oct 2023 15:38:34 +1100 Subject: [PATCH] [Monitoring] Dont get cluster state until recovery The LocalExporter would call `clusterService.state()` as part of a scheduled runnable, however this could end up running before the cluster state was recovered, and calling state() before recovery is not permitted (this trips an assertion in tests) The class already listened to cluster events and detected when cluster state recovery was complete, this commit causes the scheduled cleanup method to do nothing if the recovery event has not yet been received. --- .../exporter/local/LocalExporter.java | 5 +++ .../exporter/local/LocalExporterTests.java | 33 +++++++++++++++++++ 2 files changed, 38 insertions(+) diff --git a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java index 467378f4cd738..ba43cf82d1458 100644 --- a/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java +++ b/x-pack/plugin/monitoring/src/main/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporter.java @@ -596,6 +596,11 @@ private boolean canUseWatcher() { @Override public void onCleanUpIndices(TimeValue retention) { + if (stateInitialized.get() == false) { + // ^ this is once the cluster state is recovered. Don't try to interact with the cluster service until that happens + logger.debug("exporter not yet initialized"); + return; + } ClusterState clusterState = clusterService.state(); if (clusterService.localNode() == null || clusterState == null diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java index 3b0d301099d72..a30975be1055d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/exporter/local/LocalExporterTests.java @@ -7,17 +7,25 @@ package org.elasticsearch.xpack.monitoring.exporter.local; +import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; import org.elasticsearch.client.internal.Client; +import org.elasticsearch.cluster.ClusterChangedEvent; +import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.core.TimeValue; import org.elasticsearch.license.XPackLicenseState; import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.test.client.NoOpClient; import org.elasticsearch.xpack.monitoring.cleaner.CleanerService; import org.elasticsearch.xpack.monitoring.exporter.Exporter; import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator; +import static org.mockito.ArgumentMatchers.same; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; public class LocalExporterTests extends ESTestCase { @@ -37,4 +45,29 @@ public void testLocalExporterRemovesListenersOnClose() { verify(licenseState).removeListener(exporter); } + public void testLocalExporterDoesNotInteractWithClusterServiceUntilStateIsRecovered() { + final ClusterService clusterService = mock(ClusterService.class); + final XPackLicenseState licenseState = mock(XPackLicenseState.class); + final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState); + final CleanerService cleanerService = mock(CleanerService.class); + final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator(); + try (Client client = new NoOpClient(getTestName())) { + final LocalExporter exporter = new LocalExporter(config, client, migrationCoordinator, cleanerService); + + final TimeValue retention = TimeValue.timeValueDays(randomIntBetween(1, 90)); + exporter.onCleanUpIndices(retention); + + verify(clusterService).addListener(same(exporter)); + verifyNoMoreInteractions(clusterService); + + final ClusterState oldState = ClusterState.EMPTY_STATE; + final ClusterState newState = ClusterStateCreationUtils.stateWithNoShard(); + exporter.clusterChanged(new ClusterChangedEvent(getTestName(), newState, oldState)); + verify(clusterService).localNode(); + + exporter.onCleanUpIndices(retention); + verify(clusterService).state(); + verify(clusterService, times(2)).localNode(); + } + } }