Skip to content

Commit

Permalink
Simplify watcher indexing listener. (elastic#52627)
Browse files Browse the repository at this point in the history
Add watcher to trigger server after index operation has succeeded,
instead of adding a watch to trigger service before
the actual index operation has performed on the shard level.

This logic is simpler to reason about in the case that a failure
does occur during the execution of an index operation on
the shard level.

Relates to elastic#52453, but I think doesn't fix it, but makes it easier
to debug.
  • Loading branch information
martijnvg committed Mar 3, 2020
1 parent a154f9c commit e900579
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 49 deletions.
Expand Up @@ -97,20 +97,25 @@ void setConfiguration(Configuration configuration) {
*
* @param shardId The shard id object of the document being processed
* @param operation The index operation
* @return The index operation
* @param result The result of the operation
*/
@Override
public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
public void postIndex(ShardId shardId, Engine.Index operation, Engine.IndexResult result) {
if (isWatchDocument(shardId.getIndexName())) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
postIndex(shardId, operation, result.getFailure());
return;
}

ZonedDateTime now = Instant.ofEpochMilli(clock.millis()).atZone(ZoneOffset.UTC);
try {
Watch watch = parser.parseWithSecrets(operation.id(), true, operation.source(), now, XContentType.JSON,
operation.getIfSeqNo(), operation.getIfPrimaryTerm());
ShardAllocationConfiguration shardAllocationConfiguration = configuration.localShards.get(shardId);
if (shardAllocationConfiguration == null) {
logger.debug("no distributed watch execution info found for watch [{}] on shard [{}], got configuration for {}",
watch.id(), shardId, configuration.localShards.keySet());
return operation;
watch.id(), shardId, configuration.localShards.keySet());
return;
}

boolean shouldBeTriggered = shardAllocationConfiguration.shouldBeTriggered(watch.id());
Expand All @@ -128,32 +133,12 @@ public Engine.Index preIndex(ShardId shardId, Engine.Index operation) {
} catch (IOException e) {
throw new ElasticsearchParseException("Could not parse watch with id [{}]", e, operation.id());
}

}

return operation;
}

/**
* In case of a document related failure (for example version conflict), then clean up resources for a watch
* in the trigger service.
*/
@Override
public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult result) {
if (result.getResultType() == Engine.Result.Type.FAILURE) {
assert result.getFailure() != null;
postIndex(shardId, index, result.getFailure());
}
}

/**
* In case of an engine related error, we have to ensure that the triggerservice does not leave anything behind
*
* TODO: If the configuration changes between preindex and postindex methods and we add a
* watch, that could not be indexed
* TODO: this watch might not be deleted from the triggerservice. Are we willing to accept this?
* TODO: This could be circumvented by using a threadlocal in preIndex(), that contains the
* watch and is cleared afterwards
* In case of an engine related error, we just log that we failed the add the watch to the trigger service.
* No need to interact with the trigger service.
*
* @param shardId The shard id object of the document being processed
* @param index The index operation
Expand All @@ -162,8 +147,7 @@ public void postIndex(ShardId shardId, Engine.Index index, Engine.IndexResult re
@Override
public void postIndex(ShardId shardId, Engine.Index index, Exception ex) {
if (isWatchDocument(shardId.getIndexName())) {
logger.debug(() -> new ParameterizedMessage("removing watch [{}] from trigger", index.id()), ex);
triggerService.remove(index.id());
logger.debug(() -> new ParameterizedMessage("failed to add watch [{}] to trigger service", index.id()), ex);
}
}

Expand Down
Expand Up @@ -114,18 +114,20 @@ public void testPreIndexCheckActive() throws Exception {
verifyZeroInteractions(parser);
}

public void testPreIndex() throws Exception {
public void testPostIndex() throws Exception {
when(operation.id()).thenReturn(randomAlphaOfLength(10));
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);
when(result.getResultType()).thenReturn(randomFrom(types));

boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch("_id", watchActive, isNewWatch);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);

Engine.Index returnedOperation = listener.preIndex(shardId, operation);
assertThat(returnedOperation, is(operation));
listener.postIndex(shardId, operation, result);
ZonedDateTime now = DateUtils.nowWithMillisResolution(clock);
verify(parser).parseWithSecrets(eq(operation.id()), eq(true), eq(BytesArray.EMPTY), eq(now), anyObject(), anyLong(), anyLong());

Expand All @@ -140,12 +142,13 @@ public void testPreIndex() throws Exception {

// this test emulates an index with 10 shards, and ensures that triggering only happens on a
// single shard
public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
public void testPostIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Exception {
String id = randomAlphaOfLength(10);
int totalShardCount = randomIntBetween(1, 10);
boolean watchActive = randomBoolean();
boolean isNewWatch = randomBoolean();
Watch watch = mockWatch(id, watchActive, isNewWatch);
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong())).thenReturn(watch);
Expand All @@ -155,7 +158,7 @@ public void testPreIndexWatchGetsOnlyTriggeredOnceAcrossAllShards() throws Excep
localShards.put(shardId, new ShardAllocationConfiguration(idx, totalShardCount, Collections.emptyList()));
Configuration configuration = new Configuration(Watch.INDEX, localShards);
listener.setConfiguration(configuration);
listener.preIndex(shardId, operation);
listener.postIndex(shardId, operation, result);
}

// no matter how many shards we had, this should have been only called once
Expand Down Expand Up @@ -187,16 +190,17 @@ private Watch mockWatch(String id, boolean active, boolean isNewWatch) {
return watch;
}

public void testPreIndexCheckParsingException() throws Exception {
public void testPostIndexCheckParsingException() throws Exception {
String id = randomAlphaOfLength(10);
when(operation.id()).thenReturn(id);
when(operation.source()).thenReturn(BytesArray.EMPTY);
when(shardId.getIndexName()).thenReturn(Watch.INDEX);
when(parser.parseWithSecrets(anyObject(), eq(true), anyObject(), anyObject(), anyObject(), anyLong(), anyLong()))
.thenThrow(new IOException("self thrown"));
when(result.getResultType()).thenReturn(Engine.Result.Type.SUCCESS);

ElasticsearchParseException exc = expectThrows(ElasticsearchParseException.class,
() -> listener.preIndex(shardId, operation));
() -> listener.postIndex(shardId, operation, result));
assertThat(exc.getMessage(), containsString("Could not parse watch"));
assertThat(exc.getMessage(), containsString(id));
}
Expand All @@ -207,19 +211,6 @@ public void testPostIndexRemoveTriggerOnDocumentRelatedException() throws Except
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verify(triggerService).remove(eq("_id"));
}

public void testPostIndexRemoveTriggerOnDocumentRelatedException_ignoreOtherEngineResultTypes() throws Exception {
List<Engine.Result.Type> types = new ArrayList<>(Arrays.asList(Engine.Result.Type.values()));
types.remove(Engine.Result.Type.FAILURE);

when(operation.id()).thenReturn("_id");
when(result.getResultType()).thenReturn(randomFrom(types));
when(result.getFailure()).thenReturn(new RuntimeException());
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, result);
verifyZeroInteractions(triggerService);
}
Expand All @@ -239,7 +230,7 @@ public void testPostIndexRemoveTriggerOnEngineLevelException() throws Exception
when(shardId.getIndexName()).thenReturn(Watch.INDEX);

listener.postIndex(shardId, operation, new ElasticsearchParseException("whatever"));
verify(triggerService).remove(eq("_id"));
verifyZeroInteractions(triggerService);
}

public void testPostIndexRemoveTriggerOnEngineLevelException_ignoreNonWatcherDocument() throws Exception {
Expand Down

0 comments on commit e900579

Please sign in to comment.