From c71e36bf517bb0b4e0896c0a71c01f153a749016 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 9 Apr 2019 10:53:19 +0200 Subject: [PATCH] Fix Exception Handling for TransportShardBulkAction * Prior to #39793 exceptions for the primary write and delete actions were bubbled up to the caller so that closed shards would be handled accordingly upstream. #39793 accidentally changed the behaviour here and simply marked those exceptions as bulk item failures on the request and kept processing bulk request items on closed shards. * This fix returns to that behaviour and adjusts the listeners passed in `TransportReplicationAction` such that they behave like the previous synchronous `catch`. * Dried up the exception handling slightly for that and inlined all the listeners to make the logic a little easier to follow * Reenable SplitIndexIT now that clsoed shards are properly handled again * Closes #40944 --- .../action/bulk/TransportShardBulkAction.java | 104 ++++++++---------- .../TransportReplicationAction.java | 78 +++++++------ .../admin/indices/create/SplitIndexIT.java | 3 - 3 files changed, 84 insertions(+), 101 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 1117971e53517..da30dedfe5e60 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -154,7 +154,7 @@ public static void performOnPrimary( private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); @Override - protected void doRun() { + protected void doRun() throws Exception { while (context.hasMoreOperationsToExecute()) { if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) { @@ -168,12 +168,6 @@ protected void doRun() { finishRequest(); } - @Override - public void onFailure(Exception e) { - assert false : "All exceptions should be handled by #executeBulkItemRequest"; - onRejection(e); - } - @Override public void onRejection(Exception e) { // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request @@ -204,7 +198,7 @@ private void finishRequest() { */ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, Consumer> waitForMappingUpdate, - ActionListener itemDoneListener) { + ActionListener itemDoneListener) throws Exception { final DocWriteRequest.OpType opType = context.getCurrent().opType(); final UpdateHelper.Result updateResult; @@ -252,55 +246,51 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat final IndexShard primary = context.getPrimary(); final long version = context.getRequestToExecute().version(); final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE; - try { - final Engine.Result result; - if (isDelete) { - final DeleteRequest request = context.getRequestToExecute(); - result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(), - request.ifSeqNo(), request.ifPrimaryTerm()); - } else { - final IndexRequest request = context.getRequestToExecute(); - result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( - request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()), - request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); - } - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), - context.getRequestToExecute().type(), - new ActionListener() { - @Override - public void onResponse(Void v) { - context.markAsRequiringMappingUpdate(); - waitForMappingUpdate.accept( - ActionListener.runAfter(new ActionListener() { - @Override - public void onResponse(Void v) { - assert context.requiresWaitingForMappingUpdate(); - context.resetForExecutionForRetry(); - } - - @Override - public void onFailure(Exception e) { - context.failOnMappingUpdate(e); - } - }, () -> itemDoneListener.onResponse(null)) - ); - } - - @Override - public void onFailure(Exception e) { - onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); - // Requesting mapping update failed, so we don't have to wait for a cluster state update - assert context.isInitial(); - itemDoneListener.onResponse(null); - } - }); - return false; - } else { - onComplete(result, context, updateResult); - } - } catch (Exception e) { - onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); + final Engine.Result result; + if (isDelete) { + final DeleteRequest request = context.getRequestToExecute(); + result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(), + request.ifSeqNo(), request.ifPrimaryTerm()); + } else { + final IndexRequest request = context.getRequestToExecute(); + result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( + request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()), + request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); + } + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), + context.getRequestToExecute().type(), + new ActionListener<>() { + @Override + public void onResponse(Void v) { + context.markAsRequiringMappingUpdate(); + waitForMappingUpdate.accept( + ActionListener.runAfter(new ActionListener<>() { + @Override + public void onResponse(Void v) { + assert context.requiresWaitingForMappingUpdate(); + context.resetForExecutionForRetry(); + } + + @Override + public void onFailure(Exception e) { + context.failOnMappingUpdate(e); + } + }, () -> itemDoneListener.onResponse(null)) + ); + } + + @Override + public void onFailure(Exception e) { + onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); + // Requesting mapping update failed, so we don't have to wait for a cluster state update + assert context.isInitial(); + itemDoneListener.onResponse(null); + } + }); + return false; + } else { + onComplete(result, context, updateResult); } return true; } diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index ac23b95b3bacf..96c6cc3afa04c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -342,7 +342,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(), primaryRequest.getPrimaryTerm()), transportOptions, - new ActionListenerResponseHandler(onCompletionListener, reader) { + new ActionListenerResponseHandler<>(onCompletionListener, reader) { @Override public void handleResponse(Response response) { setPhase(replicationTask, "finished"); @@ -357,58 +357,54 @@ public void handleException(TransportException exp) { }); } else { setPhase(replicationTask, "primary"); - final ActionListener listener = createResponseListener(primaryShardReference); createReplicatedOperation(primaryRequest.getRequest(), - ActionListener.wrap(result -> result.respond(listener), listener::onFailure), - primaryShardReference) - .execute(); + ActionListener.wrap(result -> result.respond( + new ActionListener<>() { + @Override + public void onResponse(Response response) { + if (syncGlobalCheckpointAfterOperation) { + final IndexShard shard = primaryShardReference.indexShard; + try { + shard.maybeSyncGlobalCheckpoint("post-operation"); + } catch (final Exception e) { + // only log non-closed exceptions + if (ExceptionsHelper.unwrap( + e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { + // intentionally swallow, a missed global checkpoint sync should not fail this operation + logger.info( + new ParameterizedMessage( + "{} failed to execute post-operation global checkpoint sync", shard.shardId()), e); + } + } + } + primaryShardReference.close(); // release shard operation lock before responding to caller + setPhase(replicationTask, "finished"); + onCompletionListener.onResponse(response); + } + + @Override + public void onFailure(Exception e) { + handleException(primaryShardReference, e); + } + }), e -> handleException(primaryShardReference, e) + ), primaryShardReference).execute(); } } catch (Exception e) { - Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller - onFailure(e); + handleException(primaryShardReference, e); } } + private void handleException(PrimaryShardReference primaryShardReference, Exception e) { + Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller + onFailure(e); + } + @Override public void onFailure(Exception e) { setPhase(replicationTask, "finished"); onCompletionListener.onFailure(e); } - private ActionListener createResponseListener(final PrimaryShardReference primaryShardReference) { - return new ActionListener() { - @Override - public void onResponse(Response response) { - if (syncGlobalCheckpointAfterOperation) { - final IndexShard shard = primaryShardReference.indexShard; - try { - shard.maybeSyncGlobalCheckpoint("post-operation"); - } catch (final Exception e) { - // only log non-closed exceptions - if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) { - logger.info( - new ParameterizedMessage( - "{} failed to execute post-operation global checkpoint sync", - shard.shardId()), - e); - // intentionally swallow, a missed global checkpoint sync should not fail this operation - } - } - } - primaryShardReference.close(); // release shard operation lock before responding to caller - setPhase(replicationTask, "finished"); - onCompletionListener.onResponse(response); - } - - @Override - public void onFailure(Exception e) { - primaryShardReference.close(); // release shard operation lock before responding to caller - setPhase(replicationTask, "finished"); - onCompletionListener.onFailure(e); - } - }; - } - protected ReplicationOperation> createReplicatedOperation( Request request, ActionListener> listener, PrimaryShardReference primaryShardReference) { diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java b/server/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java index 8ee0fbee32678..7038505ff6fb3 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/create/SplitIndexIT.java @@ -25,7 +25,6 @@ import org.apache.lucene.search.SortedSetSortField; import org.apache.lucene.search.join.ScoreMode; import org.apache.lucene.util.Constants; -import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.Version; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse; @@ -78,8 +77,6 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo; - -@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/40944") public class SplitIndexIT extends ESIntegTestCase { @Override