Skip to content

Commit

Permalink
[Monitoring] Dont get cluster state until recovery
Browse files Browse the repository at this point in the history
The LocalExporter would call `clusterService.state()` as part of a
scheduled runnable, however this could end up running before the
cluster state was recovered, and calling state() before recovery is
not permitted (this trips an assertion in tests)

The class already listened to cluster events and detected when
cluster state recovery was complete, this commit causes the
scheduled cleanup method to do nothing if the recovery event has not
yet been received.
  • Loading branch information
tvernum committed Oct 10, 2023
1 parent bf14530 commit 525b5b6
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,11 @@ private boolean canUseWatcher() {

@Override
public void onCleanUpIndices(TimeValue retention) {
if (stateInitialized.get() == false) {
// ^ this is once the cluster state is recovered. Don't try to interact with the cluster service until that happens
logger.debug("exporter not yet initialized");
return;
}
ClusterState clusterState = clusterService.state();
if (clusterService.localNode() == null
|| clusterState == null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,25 @@

package org.elasticsearch.xpack.monitoring.exporter.local;

import org.elasticsearch.action.support.replication.ClusterStateCreationUtils;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.license.XPackLicenseState;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.xpack.monitoring.cleaner.CleanerService;
import org.elasticsearch.xpack.monitoring.exporter.Exporter;
import org.elasticsearch.xpack.monitoring.exporter.MonitoringMigrationCoordinator;

import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

public class LocalExporterTests extends ESTestCase {

Expand All @@ -37,4 +45,29 @@ public void testLocalExporterRemovesListenersOnClose() {
verify(licenseState).removeListener(exporter);
}

public void testLocalExporterDoesNotInteractWithClusterServiceUntilStateIsRecovered() {
final ClusterService clusterService = mock(ClusterService.class);
final XPackLicenseState licenseState = mock(XPackLicenseState.class);
final Exporter.Config config = new Exporter.Config("name", "type", Settings.EMPTY, clusterService, licenseState);
final CleanerService cleanerService = mock(CleanerService.class);
final MonitoringMigrationCoordinator migrationCoordinator = new MonitoringMigrationCoordinator();
try (Client client = new NoOpClient(getTestName())) {
final LocalExporter exporter = new LocalExporter(config, client, migrationCoordinator, cleanerService);

final TimeValue retention = TimeValue.timeValueDays(randomIntBetween(1, 90));
exporter.onCleanUpIndices(retention);

verify(clusterService).addListener(same(exporter));
verifyNoMoreInteractions(clusterService);

final ClusterState oldState = ClusterState.EMPTY_STATE;
final ClusterState newState = ClusterStateCreationUtils.stateWithNoShard();
exporter.clusterChanged(new ClusterChangedEvent(getTestName(), newState, oldState));
verify(clusterService).localNode();

exporter.onCleanUpIndices(retention);
verify(clusterService).state();
verify(clusterService, times(2)).localNode();
}
}
}

0 comments on commit 525b5b6

Please sign in to comment.