Skip to content

Commit

Permalink
Handling timeout exceptions on watcher startup (elastic#90421)
Browse files Browse the repository at this point in the history
Right now if watcher throws an exception while starting up (for example a TimeoutException
while waiting for a refresh of .watches or .triggered_watches to complete) then watcher gets
into a state where it will never be restarted automatically, and is incredibly difficult to start
manually. This PR catches those exceptions and sets the state to STOPPED so that when the
next cluster change event comes through it will attempt to start watcher again.
  • Loading branch information
masseyke authored and javanna committed Oct 4, 2022
1 parent 04e9b5c commit 6db24ab
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 14 deletions.
6 changes: 6 additions & 0 deletions docs/changelog/90421.yaml
@@ -0,0 +1,6 @@
pr: 90421
summary: Handling timeout exceptions on watcher startup
area: Watcher
type: bug
issues:
- 44981
Expand Up @@ -101,7 +101,11 @@ public void clusterChanged(ClusterChangedEvent event) {
// if this is not a data node, we need to start it ourselves possibly
if (event.state().nodes().getLocalNode().canContainData() == false && isWatcherStoppedManually == false && isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
watcherService.start(
event.state(),
() -> this.state.set(WatcherState.STARTED),
(exception -> this.state.set(WatcherState.STOPPED))
);
return;
}

Expand Down Expand Up @@ -165,7 +169,10 @@ public void clusterChanged(ClusterChangedEvent event) {
watcherService.reload(event.state(), "new local watcher shard allocation ids");
} else if (isStoppedOrStopping) {
this.state.set(WatcherState.STARTING);
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED));
watcherService.start(event.state(), () -> this.state.set(WatcherState.STARTED), (exception) -> {
clearAllocationIds();
this.state.set(WatcherState.STOPPED);
});
}
} else {
clearAllocationIds();
Expand Down
Expand Up @@ -230,14 +230,17 @@ void reload(ClusterState state, String reason) {
* @param state the current cluster state
* @param postWatchesLoadedCallback the callback to be triggered, when watches where loaded successfully
*/
public void start(ClusterState state, Runnable postWatchesLoadedCallback) {
public void start(ClusterState state, Runnable postWatchesLoadedCallback, Consumer<Exception> exceptionConsumer) {
executionService.unPause();
processedClusterStateVersion.set(state.getVersion());
executor.execute(wrapWatcherService(() -> {
if (reloadInner(state, "starting", true)) {
postWatchesLoadedCallback.run();
}
}, e -> logger.error("error starting watcher", e)));
}, e -> {
logger.error("error starting watcher", e);
exceptionConsumer.accept(e);
}));
}

/**
Expand Down Expand Up @@ -311,13 +314,7 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
SearchResponse response = null;
List<Watch> watches = new ArrayList<>();
try {
RefreshResponse refreshResponse = client.admin()
.indices()
.refresh(new RefreshRequest(INDEX))
.actionGet(TimeValue.timeValueSeconds(5));
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
throw illegalState("not all required shards have been refreshed");
}
refreshWatches(indexMetadata);

// find out local shards
String watchIndexName = indexMetadata.getIndex().getName();
Expand Down Expand Up @@ -403,6 +400,17 @@ private Collection<Watch> loadWatches(ClusterState clusterState) {
return watches;
}

// Non private for unit testing purposes
void refreshWatches(IndexMetadata indexMetadata) {
RefreshResponse refreshResponse = client.admin()
.indices()
.refresh(new RefreshRequest(INDEX))
.actionGet(TimeValue.timeValueSeconds(5));
if (refreshResponse.getSuccessfulShards() < indexMetadata.getNumberOfShards()) {
throw illegalState("not all required shards have been refreshed");
}
}

/**
* Find out if the watch with this id, should be parsed and triggered on this node
*
Expand Down
Expand Up @@ -6,6 +6,7 @@
*/
package org.elasticsearch.xpack.watcher;

import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
import org.elasticsearch.cluster.ClusterChangedEvent;
Expand Down Expand Up @@ -41,13 +42,17 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static java.util.Arrays.asList;
import static org.elasticsearch.cluster.routing.ShardRoutingState.RELOCATING;
import static org.elasticsearch.cluster.routing.ShardRoutingState.STARTED;
import static org.elasticsearch.xpack.core.watcher.support.WatcherIndexTemplateRegistryField.HISTORY_TEMPLATE_NAME;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -184,14 +189,79 @@ public void testManualStartStop() {
reset(watcherService);
when(watcherService.validate(clusterState)).thenReturn(true);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, stoppedClusterState));
verify(watcherService, times(1)).start(eq(clusterState), any());
verify(watcherService, times(1)).start(eq(clusterState), any(), any());

// no change, keep going
reset(watcherService);
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
verifyNoMoreInteractions(watcherService);
}

@SuppressWarnings("unchecked")
public void testExceptionOnStart() {
/*
* This tests that if watcher fails to start because of some exception (for example a timeout while refreshing indices) that it
* will fail gracefully, and will start the next time there is a cluster change event if there is no exception that time.
*/
Index index = new Index(Watch.INDEX, "uuid");
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(index);
indexRoutingTableBuilder.addShard(
TestShardRouting.newShardRouting(new ShardId(index, 0), "node_1", true, ShardRoutingState.STARTED)
);
IndexMetadata.Builder indexMetadataBuilder = IndexMetadata.builder(Watch.INDEX)
.settings(settings(Version.CURRENT).put(IndexMetadata.INDEX_FORMAT_SETTING.getKey(), 6)) // the internal index format, required
.numberOfShards(1)
.numberOfReplicas(0);
Metadata.Builder metadataBuilder = Metadata.builder()
.put(indexMetadataBuilder)
.put(IndexTemplateMetadata.builder(HISTORY_TEMPLATE_NAME).patterns(randomIndexPatterns()));
if (randomBoolean()) {
metadataBuilder.putCustom(WatcherMetadata.TYPE, new WatcherMetadata(false));
}
Metadata metadata = metadataBuilder.build();
IndexRoutingTable indexRoutingTable = indexRoutingTableBuilder.build();
ClusterState clusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
.metadata(metadata)
.build();

// mark watcher manually as stopped
ClusterState stoppedClusterState = ClusterState.builder(new ClusterName("my-cluster"))
.nodes(new DiscoveryNodes.Builder().masterNodeId("node_1").localNodeId("node_1").add(newNode("node_1")))
.routingTable(RoutingTable.builder().add(indexRoutingTable).build())
.metadata(Metadata.builder(metadata).putCustom(WatcherMetadata.TYPE, new WatcherMetadata(true)).build())
.build();

lifeCycleService.clusterChanged(new ClusterChangedEvent("foo", stoppedClusterState, clusterState));
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPING));

// Now attempt to start watcher with a simulated TimeoutException. Should be stopped
when(watcherService.validate(clusterState)).thenReturn(true);
AtomicBoolean exceptionHit = new AtomicBoolean(false);
doAnswer(invocation -> {
Consumer<Exception> exceptionConsumer = invocation.getArgument(2);
exceptionConsumer.accept(new ElasticsearchTimeoutException(new TimeoutException("Artificial timeout")));
exceptionHit.set(true);
return null;
}).when(watcherService).start(any(), any(), any(Consumer.class));
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
assertTrue("Expected simulated timeout not hit", exceptionHit.get());
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STOPPED));

// And now attempt to start watcher with no exception. It should start up.
AtomicBoolean runnableCalled = new AtomicBoolean(false);
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(1);
runnable.run();
runnableCalled.set(true);
return null;
}).when(watcherService).start(any(), any(Runnable.class), any());
lifeCycleService.clusterChanged(new ClusterChangedEvent("any", clusterState, clusterState));
assertTrue("Runnable not called", runnableCalled.get());
assertThat(lifeCycleService.getState().get(), equalTo(WatcherState.STARTED));
}

public void testNoLocalShards() {
Index watchIndex = new Index(Watch.INDEX, "foo");
ShardId shardId = new ShardId(watchIndex, 0);
Expand Down Expand Up @@ -453,7 +523,7 @@ public void testWatcherServiceDoesNotStartIfIndexTemplatesAreMissing() throws Ex
when(watcherService.validate(eq(state))).thenReturn(true);

lifeCycleService.clusterChanged(new ClusterChangedEvent("any", state, state));
verify(watcherService, times(0)).start(any(ClusterState.class), any());
verify(watcherService, times(0)).start(any(ClusterState.class), any(), any());
}

public void testWatcherStopsWhenMasterNodeIsMissing() {
Expand Down
Expand Up @@ -7,6 +7,7 @@
package org.elasticsearch.xpack.watcher;

import org.apache.lucene.search.TotalHits;
import org.elasticsearch.ElasticsearchTimeoutException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
Expand Down Expand Up @@ -64,7 +65,10 @@
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -244,7 +248,7 @@ void stopExecutor() {}
return null;
}).when(client).execute(eq(ClearScrollAction.INSTANCE), any(ClearScrollRequest.class), anyActionListener());

service.start(clusterState, () -> {});
service.start(clusterState, () -> {}, exception -> {});

ArgumentCaptor<List> captor = ArgumentCaptor.forClass(List.class);
verify(triggerService).start(captor.capture());
Expand All @@ -253,6 +257,45 @@ void stopExecutor() {}
assertThat(watches, hasSize(activeWatchCount));
}

public void testExceptionHandling() {
/*
* This tests that if the WatcherService throws an exception while refreshing indices that the exception is handled by the
* exception consumer rather than being propagated higher in the stack.
*/
TriggerService triggerService = mock(TriggerService.class);
TriggeredWatchStore triggeredWatchStore = mock(TriggeredWatchStore.class);
ExecutionService executionService = mock(ExecutionService.class);
WatchParser parser = mock(WatchParser.class);
final ElasticsearchTimeoutException exception = new ElasticsearchTimeoutException(new TimeoutException("Artifical timeout"));
WatcherService service = new WatcherService(
Settings.EMPTY,
triggerService,
triggeredWatchStore,
executionService,
parser,
client,
EsExecutors.DIRECT_EXECUTOR_SERVICE
) {
@Override
void refreshWatches(IndexMetadata indexMetadata) {
throw exception;
}
};

ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
Metadata.Builder metadataBuilder = Metadata.builder();
Settings indexSettings = settings(Version.CURRENT).put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metadataBuilder.put(IndexMetadata.builder(Watch.INDEX).settings(indexSettings));
csBuilder.metadata(metadataBuilder);
ClusterState clusterState = csBuilder.build();

AtomicReference<Exception> exceptionReference = new AtomicReference<>();
service.start(clusterState, () -> { fail("Excepted an exception"); }, exceptionReference::set);
assertThat(exceptionReference.get(), equalTo(exception));
}

@SuppressWarnings({ "unchecked", "rawtypes" })
public void testPausingWatcherServiceAlsoPausesTriggerService() {
String engineType = "foo";
Expand Down

0 comments on commit 6db24ab

Please sign in to comment.