Skip to content

Commit

Permalink
Async search should retry updates on version conflict
Browse files Browse the repository at this point in the history
The _async_search APIs can throw version conflict exception when the internal response
is updated concurrently. That can happen if the final response is written while the user
extends the expiration time. That scenario should be rare but it happened in Kibana for
several users so this change ensures that updates are retried at least 5 times. That
should resolve the transient errors for Kibana. This change also preserves the version
conflict exception in case the retry didn't work instead of returning a confusing 404.
This commit also ensures that we don't delete the response if the search was cancelled
internally and not deleted explicitly by the user.

Closes elastic#63213
  • Loading branch information
jimczi committed Oct 14, 2020
1 parent b7893ba commit 11d0bce
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -419,4 +420,35 @@ public void testSearchPhaseFailureNoCause() throws Exception {
assertNotNull(response.getFailure());
ensureTaskNotRunning(response.getId());
}

public void testRetryVersionConflict() throws Exception {
SubmitAsyncSearchRequest request = new SubmitAsyncSearchRequest(indexName);
request.setWaitForCompletionTimeout(TimeValue.timeValueMinutes(10));
request.setKeepOnCompletion(true);
AsyncSearchResponse response = submitAsyncSearch(request);
assertNotNull(response.getSearchResponse());
assertFalse(response.isRunning());

List<Thread> threads = new ArrayList<>();
CountDownLatch latch = new CountDownLatch(1);
for (int i = 0; i < 2; i++) {
Runnable runnable = () -> {
for (int j = 0; j < 10; j++) {
try {
latch.await();
getAsyncSearch(response.getId(), TimeValue.timeValueMinutes(10));
} catch (Exception exc) {
throw new AssertionError(exc);
}
}
};
Thread thread = new Thread(runnable);
thread.start();
threads.add(thread);
}
latch.countDown();
for (Thread thread : threads) {
thread.join();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,24 +173,12 @@ private void onFatalFailure(AsyncSearchTask task, Exception error, boolean shoul
private void onFinalResponse(AsyncSearchTask searchTask,
AsyncSearchResponse response,
Runnable nextAction) {
if (searchTask.isCancelled()) {
// the task was cancelled so we ensure that there is nothing stored in the response index.
store.deleteResponse(searchTask.getExecutionId(), ActionListener.wrap(
resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
logger.error(() -> new ParameterizedMessage("failed to clean async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
unregisterTaskAndMoveOn(searchTask, nextAction);
}));
return;
}

store.updateResponse(searchTask.getExecutionId().getDocId(), threadContext.getResponseHeaders(),response,
ActionListener.wrap(resp -> unregisterTaskAndMoveOn(searchTask, nextAction),
exc -> {
Throwable cause = ExceptionsHelper.unwrapCause(exc);
if (cause instanceof DocumentMissingException == false &&
cause instanceof VersionConflictEngineException == false) {
cause instanceof VersionConflictEngineException == false) {
logger.error(() -> new ParameterizedMessage("failed to store async-search [{}]",
searchTask.getExecutionId().getEncoded()), exc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,14 +85,16 @@ public void retrieveResult(GetAsyncResultRequest request, ActionListener<Respons
ActionListener.wrap(
p -> getSearchResponseFromTask(searchId, request, nowInMillis, expirationTime, listener),
exc -> {
//don't log when: the async search document or its index is not found. That can happen if an invalid
//search id is provided or no async search initial response has been stored yet.
RestStatus status = ExceptionsHelper.status(ExceptionsHelper.unwrapCause(exc));
if (status != RestStatus.NOT_FOUND) {
logger.error(() -> new ParameterizedMessage("failed to update expiration time for async-search [{}]",
searchId.getEncoded()), exc);
listener.onFailure(exc);
} else {
//the async search document or its index is not found.
//That can happen if an invalid/deleted search id is provided.
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
listener.onFailure(new ResourceNotFoundException(searchId.getEncoded()));
}
));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ public void updateResponse(String docId,
UpdateRequest request = new UpdateRequest()
.index(index)
.id(docId)
.doc(source, XContentType.JSON);
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
} catch(Exception e) {
listener.onFailure(e);
Expand All @@ -210,7 +211,8 @@ public void updateExpirationTime(String docId,
Map<String, Object> source = Collections.singletonMap(EXPIRATION_TIME_FIELD, expirationTimeMillis);
UpdateRequest request = new UpdateRequest().index(index)
.id(docId)
.doc(source, XContentType.JSON);
.doc(source, XContentType.JSON)
.retryOnConflict(5);
client.update(request, listener);
}

Expand Down

0 comments on commit 11d0bce

Please sign in to comment.