From 5d26243aba226648e1bd2c327e0ac0cae4a4c14f Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sat, 6 Apr 2019 18:11:00 +0200 Subject: [PATCH 1/3] Make Transport Shard Bulk Action Async (#39793) This is a dependency of #39504 Motivation: By refactoring `TransportShardBulkAction#shardOperationOnPrimary` to async, we enable using `DeterministicTaskQueue` based tests to run indexing operations. This was previously impossible since we were blocking on the `write` thread until the `update` thread finished the mapping update. With this change, the mapping update will trigger a new task in the `write` queue instead. This change significantly enhances the amount of coverage we get from `SnapshotResiliencyTests` (and other potential future tests) when it comes to tracking down concurrency issues with distributed state machines. The logical change is effectively all in `TransportShardBulkAction`, the rest of the changes is then simply mechanically moving the caller code and tests to being async and passing the `ActionListener` down. Since the move to async would've added more parameters to the `private static` steps in this logic, I decided to inline and dry up (between delete and update) the logic as much as I could instead of passing the listener + wait-consumer down through all of them. --- ...TransportVerifyShardBeforeCloseAction.java | 12 +- .../flush/TransportShardFlushAction.java | 13 +- .../refresh/TransportShardRefreshAction.java | 13 +- .../action/bulk/MappingUpdatePerformer.java | 3 +- .../action/bulk/TransportShardBulkAction.java | 311 +++++++++--------- .../TransportResyncReplicationAction.java | 8 +- .../replication/ReplicationOperation.java | 19 +- .../TransportReplicationAction.java | 21 +- .../replication/TransportWriteAction.java | 8 +- .../action/index/MappingUpdatedAction.java | 45 +-- .../common/util/concurrent/FutureUtils.java | 16 +- .../seqno/GlobalCheckpointSyncAction.java | 10 +- .../RetentionLeaseBackgroundSyncAction.java | 16 +- .../index/seqno/RetentionLeaseSyncAction.java | 17 +- .../elasticsearch/indices/IndicesModule.java | 4 +- ...portVerifyShardBeforeCloseActionTests.java | 31 +- .../action/bulk/BulkRejectionIT.java | 74 +++++ .../bulk/TransportShardBulkActionTests.java | 165 ++++++---- .../ReplicationOperationTests.java | 15 +- .../TransportReplicationActionTests.java | 20 +- ...ReplicationAllPermitsAcquisitionTests.java | 15 +- .../TransportWriteActionTests.java | 103 +++--- .../GlobalCheckpointSyncActionTests.java | 3 +- ...tentionLeaseBackgroundSyncActionTests.java | 23 +- .../seqno/RetentionLeaseSyncActionTests.java | 24 +- .../action/support/ActionTestUtils.java | 8 + .../ESIndexLevelReplicationTestCase.java | 95 +++--- .../TransportBulkShardOperationsAction.java | 8 +- .../ShardFollowTaskReplicationTests.java | 28 +- 29 files changed, 663 insertions(+), 465 deletions(-) create mode 100644 server/src/test/java/org/elasticsearch/action/bulk/BulkRejectionIT.java diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index e0cddcb0acf2b..f29bf6987a085 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -85,14 +85,16 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, } @Override - protected PrimaryResult shardOperationOnPrimary(final ShardRequest shardRequest, - final IndexShard primary) throws Exception { - executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + executeShardOperation(shardRequest, primary); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + }); } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception { + protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) { executeShardOperation(shardRequest, replica); return new ReplicaResult(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 63424844d7d76..a07dee9613a1b 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -51,11 +52,13 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest, - IndexShard primary) { - primary.flush(shardRequest.getRequest()); - logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + primary.flush(shardRequest.getRequest()); + logger.trace("{} flush request executed on primary", primary.shardId()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index df3ff16ff8800..c0a52ac8c0d6a 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -53,11 +54,13 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary( - BasicReplicationRequest shardRequest, IndexShard primary) { - primary.refresh("api"); - logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + primary.refresh("api"); + logger.trace("{} refresh request executed on primary", primary.shardId()); + return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 1f228b0f355e0..5a38f0f43e070 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; @@ -27,6 +28,6 @@ public interface MappingUpdatePerformer { /** * Update the mappings on the master. */ - void updateMappings(Mapping update, ShardId shardId, String type); + void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 325820532be6e..afa26bfeb9700 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -22,7 +22,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.MessageSupplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -30,7 +33,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -44,8 +46,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -57,7 +57,6 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -69,10 +68,9 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Consumer; -import java.util.function.Function; import java.util.function.LongSupplier; /** Performs shard-level bulk (index, delete or update) operations */ @@ -82,7 +80,6 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) - throws Exception { + protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary, + ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - CheckedRunnable waitForMappingUpdate = () -> { - PlainActionFuture waitingFuture = new PlainActionFuture<>(); - observer.waitForNextChange(new ClusterStateObserver.Listener() { + performOnPrimary(request, primary, updateHelper, threadPool::relativeTimeInMillis, + (update, shardId, type, mappingListener) -> { + assert update != null; + assert shardId != null; + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + }, + mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - waitingFuture.onResponse(null); + mappingUpdateListener.onResponse(null); } @Override public void onClusterServiceClose() { - waitingFuture.onFailure(new NodeClosedException(clusterService.localNode())); + mappingUpdateListener.onFailure(new NodeClosedException(clusterService.localNode())); } @Override public void onTimeout(TimeValue timeout) { - waitingFuture.onFailure( - new MapperException("timed out while waiting for a dynamic mapping update")); + mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update")); } - }); - waitingFuture.get(); - }; - return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - new ConcreteMappingUpdatePerformer(), waitForMappingUpdate); + }), listener, threadPool + ); } - public static WritePrimaryResult performOnPrimary( + public static void performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, - CheckedRunnable waitForMappingUpdate) throws Exception { - BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); - return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - } + Consumer> waitForMappingUpdate, + ActionListener> listener, + ThreadPool threadPool) { + new ActionRunnable>(listener) { + + private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE); + + private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + + @Override + protected void doRun() { + while (context.hasMoreOperationsToExecute()) { + if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, + ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) { + // We are waiting for a mapping update on another thread, that will invoke this action again once its done + // so we just break out here. + return; + } + assert context.isInitial(); // either completed and moved to next or reset + } + // We're done, there's no more operations to execute so we resolve the wrapped listener + finishRequest(); + } - private static WritePrimaryResult performOnPrimary( - BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { + @Override + public void onFailure(Exception e) { + assert false : "All exceptions should be handled by #executeBulkItemRequest"; + onRejection(e); + } - while (context.hasMoreOperationsToExecute()) { - executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - assert context.isInitial(); // either completed and moved to next or reset - } - return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), - null, context.getPrimary(), logger); + @Override + public void onRejection(Exception e) { + // Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request + while (context.hasMoreOperationsToExecute()) { + context.setRequestToExecute(context.getCurrent()); + final DocWriteRequest docWriteRequest = context.getRequestToExecute(); + onComplete( + exceptionToResult( + e, primary, docWriteRequest.opType() == DocWriteRequest.OpType.DELETE, docWriteRequest.version()), + context, null); + } + finishRequest(); + } + + private void finishRequest() { + listener.onResponse( + new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), + null, context.getPrimary(), logger)); + } + + }.doRun(); } - /** Executes bulk item requests and handles request execution exceptions */ - static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) - throws Exception { + /** + * Executes bulk item requests and handles request execution exceptions. + * @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered + * a mapping update that will finish and invoke the listener on a different thread + */ + static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, Consumer> waitForMappingUpdate, + ActionListener itemDoneListener) { final DocWriteRequest.OpType opType = context.getCurrent().opType(); final UpdateHelper.Result updateResult; @@ -179,11 +215,12 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe } catch (Exception failure) { // we may fail translating a update to index or delete operation // we use index result to communicate failure while translating update request - final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); + final Engine.Result result = + new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); context.setRequestToExecute(updateRequest); context.markOperationAsExecuted(result); context.markAsCompleted(context.getExecutionResult()); - return; + return true; } // execute translated update request switch (updateResult.getResponseResult()) { @@ -201,7 +238,7 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe case NOOP: context.markOperationAsNoOp(updateResult.action()); context.markAsCompleted(context.getExecutionResult()); - return; + return true; default: throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); } @@ -212,59 +249,95 @@ static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHe assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { - executeDeleteRequestOnPrimary(context, mappingUpdater); - } else { - executeIndexRequestOnPrimary(context, mappingUpdater); - } - - if (context.requiresWaitingForMappingUpdate()) { - try { - waitForMappingUpdate.run(); - context.resetForExecutionForRetry(); - } catch (Exception e) { - context.failOnMappingUpdate(e); + final IndexShard primary = context.getPrimary(); + final long version = context.getRequestToExecute().version(); + final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE; + try { + final Engine.Result result; + if (isDelete) { + final DeleteRequest request = context.getRequestToExecute(); + result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(), + request.ifSeqNo(), request.ifPrimaryTerm()); + } else { + final IndexRequest request = context.getRequestToExecute(); + result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( + request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()), + request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); } - return; - } - - assert context.isOperationExecuted(); - - if (opType == DocWriteRequest.OpType.UPDATE && - context.getExecutionResult().isFailed() && - isConflictException(context.getExecutionResult().getFailure().getCause())) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - if (context.getRetryCounter() < updateRequest.retryOnConflict()) { - context.resetForExecutionForRetry(); - return; + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), + context.getRequestToExecute().type(), + new ActionListener() { + @Override + public void onResponse(Void v) { + context.markAsRequiringMappingUpdate(); + waitForMappingUpdate.accept( + ActionListener.runAfter(new ActionListener() { + @Override + public void onResponse(Void v) { + assert context.requiresWaitingForMappingUpdate(); + context.resetForExecutionForRetry(); + } + + @Override + public void onFailure(Exception e) { + context.failOnMappingUpdate(e); + } + }, () -> itemDoneListener.onResponse(null)) + ); + } + + @Override + public void onFailure(Exception e) { + onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); + // Requesting mapping update failed, so we don't have to wait for a cluster state update + assert context.isInitial(); + itemDoneListener.onResponse(null); + } + }); + return false; + } else { + onComplete(result, context, updateResult); } + } catch (Exception e) { + onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult); } + return true; + } - finalizePrimaryOperationOnCompletion(context, opType, updateResult); + private static Engine.Result exceptionToResult(Exception e, IndexShard primary, boolean isDelete, long version) { + return isDelete ? primary.getFailedDeleteResult(e, version) : primary.getFailedIndexResult(e, version); } - private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType, - UpdateHelper.Result updateResult) { + private static void onComplete(Engine.Result r, BulkPrimaryExecutionContext context, UpdateHelper.Result updateResult) { + context.markOperationAsExecuted(r); + final DocWriteRequest docWriteRequest = context.getCurrent(); + final DocWriteRequest.OpType opType = docWriteRequest.opType(); + final boolean isUpdate = opType == DocWriteRequest.OpType.UPDATE; final BulkItemResponse executionResult = context.getExecutionResult(); - if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - context.markAsCompleted( - processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult)); - } else if (executionResult.isFailed()) { - final Exception failure = executionResult.getFailure().getCause(); - final DocWriteRequest docWriteRequest = context.getCurrent(); - if (TransportShardBulkAction.isConflictException(failure)) { - logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } else { - logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } - - context.markAsCompleted(executionResult); + final boolean isFailed = executionResult.isFailed(); + if (isUpdate && isFailed && isConflictException(executionResult.getFailure().getCause()) + && context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) { + context.resetForExecutionForRetry(); + return; + } + final BulkItemResponse response; + if (isUpdate) { + response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult); } else { - context.markAsCompleted(executionResult); + if (isFailed) { + final Exception failure = executionResult.getFailure().getCause(); + final MessageSupplier messageSupplier = () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), opType.getLowercase(), docWriteRequest); + if (TransportShardBulkAction.isConflictException(failure)) { + logger.trace(messageSupplier, failure); + } else { + logger.debug(messageSupplier, failure); + } + } + response = executionResult; } + context.markAsCompleted(response); assert context.isInitial(); } @@ -278,7 +351,6 @@ private static boolean isConflictException(final Exception e) { static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, BulkItemResponse operationResponse, final UpdateHelper.Result translate) { - final BulkItemResponse response; DocWriteResponse.Result translatedResult = translate.getResponseResult(); if (operationResponse.isFailed()) { @@ -439,65 +511,4 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse } return result; } - - /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { - final IndexRequest request = context.getRequestToExecute(); - final IndexShard primary = context.getPrimary(); - final SourceToParse sourceToParse = - new SourceToParse(request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()); - executeOnPrimaryWhileHandlingMappingUpdates(context, - () -> - primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, - request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()), - e -> primary.getFailedIndexResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); - } - - private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { - final DeleteRequest request = context.getRequestToExecute(); - final IndexShard primary = context.getPrimary(); - executeOnPrimaryWhileHandlingMappingUpdates(context, - () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), - request.ifSeqNo(), request.ifPrimaryTerm()), - e -> primary.getFailedDeleteResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); - } - - private static void executeOnPrimaryWhileHandlingMappingUpdates( - BulkPrimaryExecutionContext context, CheckedSupplier toExecute, - Function exceptionToResult, Consumer onComplete, Consumer mappingUpdater) - throws IOException { - T result = toExecute.get(); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // try to update the mappings and mark the context as needing to try again. - try { - mappingUpdater.accept(result.getRequiredMappingUpdate()); - context.markAsRequiringMappingUpdate(); - } catch (Exception e) { - // failure to update the mapping should translate to a failure of specific requests. Other requests - // still need to be executed and replicated. - onComplete.accept(exceptionToResult.apply(e)); - return; - } - } else { - onComplete.accept(result); - } - } - - class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { - - @Override - public void updateMappings(final Mapping update, final ShardId shardId, final String type) { - assert update != null; - assert shardId != null; - // can throw timeout exception when updating mappings or ISE for attempting to - // update default mappings which are bubbled up - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update); - } - } } diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 2a4e06a871007..bfe3274996160 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -85,10 +85,10 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - ResyncReplicationRequest request, IndexShard primary) { - final ResyncReplicationRequest replicaRequest = performOnPrimary(request); - return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger); + protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, + () -> new WritePrimaryResult<>(performOnPrimary(request), new ResyncReplicationResponse(), null, null, primary, logger)); } public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index 22e90cfc1356b..c8c102dfd85e7 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -101,14 +101,17 @@ public void execute() throws Exception { totalShards.incrementAndGet(); pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination - primaryResult = primary.perform(request); - primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); + primary.perform(request, ActionListener.wrap(this::handlePrimaryResult, resultListener::onFailure)); + } + + private void handlePrimaryResult(final PrimaryResultT primaryResult) { + this.primaryResult = primaryResult; + primary.updateLocalCheckpointForShard(primary.routingEntry().allocationId().getId(), primary.localCheckpoint()); final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); if (replicaRequest != null) { if (logger.isTraceEnabled()) { - logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + logger.trace("[{}] op [{}] completed on primary for request [{}]", primary.routingEntry().shardId(), opType, request); } - // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. // we have to make sure that every operation indexed into the primary after recovery start will also be replicated // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. @@ -118,14 +121,14 @@ public void execute() throws Exception { // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. final long globalCheckpoint = primary.globalCheckpoint(); // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of - // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on. + // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed + // on. final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; final ReplicationGroup replicationGroup = primary.getReplicationGroup(); markUnavailableShardsAsStale(replicaRequest, replicationGroup); performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); } - successfulShards.incrementAndGet(); // mark primary as successful decPendingAndFinishIfNeeded(); } @@ -310,9 +313,9 @@ public interface Primary< * also complete after. Deal with it. * * @param request the request to perform - * @return the request to send to the replicas + * @param listener result listener */ - PrimaryResultT perform(RequestT request) throws Exception; + void perform(RequestT request, ActionListener listener); /** * Notifies the primary of a local checkpoint for the given allocation. diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index 70361c0871bd1..ac23b95b3bacf 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; @@ -189,8 +190,8 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r * @param shardRequest the request to the primary shard * @param primary the primary shard to perform the operation on */ - protected abstract PrimaryResult shardOperationOnPrimary( - Request shardRequest, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener); /** * Synchronously execute the specified replica operation. This is done under a permit from @@ -416,7 +417,7 @@ protected ReplicationOperation, + public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; @@ -915,11 +916,15 @@ public void failShard(String reason, Exception e) { } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); - assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() - + "] with a primary failure [" + result.finalFailure + "]"; - return result; + public void perform(Request request, ActionListener> listener) { + if (Assertions.ENABLED) { + listener = ActionListener.map(listener, result -> { + assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; + return result; + }); + } + shardOperationOnPrimary(request, indexShard, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index cb3f67aa99ea2..15c49d1030374 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -103,12 +103,12 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { /** * Called on the primary with a reference to the primary {@linkplain IndexShard} to modify. * - * @return the result of the operation on primary, including current translog location and operation response and failure - * async refresh is performed on the primary shard according to the Request refresh policy + * @param listener listener for the result of the operation on primary, including current translog location and operation response + * and failure async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract WritePrimaryResult shardOperationOnPrimary( - Request request, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener); /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 14c360168f904..aeba27c4120fb 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,7 +19,9 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -29,6 +31,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.FutureUtils; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; @@ -57,34 +60,36 @@ private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeou this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout; } - public void setClient(Client client) { this.client = client.admin().indices(); } - private PutMappingRequestBuilder updateMappingRequest(Index index, String type, Mapping mappingUpdate, final TimeValue timeout) { - if (type.equals(MapperService.DEFAULT_MAPPING)) { - throw new IllegalArgumentException("_default_ mapping should not be updated"); - } - return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) - .setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO); - } - - /** - * Same as {@link #updateMappingOnMaster(Index, String, Mapping, TimeValue)} - * using the default timeout. - */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) { - updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout); - } - /** * Update mappings on the master node, waiting for the change to be committed, * but not for the mapping update to be applied on all nodes. The timeout specified by * {@code timeout} is the master node timeout ({@link MasterNodeRequest#masterNodeTimeout()}), * potentially waiting for a master node to be available. */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue masterNodeTimeout) { - updateMappingRequest(index, type, mappingUpdate, masterNodeTimeout).get(); + public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, ActionListener listener) { + if (type.equals(MapperService.DEFAULT_MAPPING)) { + throw new IllegalArgumentException("_default_ mapping should not be updated"); + } + client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) + .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) + .execute(new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(unwrapException(e)); + } + }); + } + + private static Exception unwrapException(Exception cause) { + return cause instanceof ElasticsearchException ? FutureUtils.unwrapEsException((ElasticsearchException) cause) : cause; } } diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java index c7345aa3b6368..15e26779071ec 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/FutureUtils.java @@ -88,17 +88,19 @@ public static T get(Future future, long timeout, TimeUnit unit) { public static RuntimeException rethrowExecutionException(ExecutionException e) { if (e.getCause() instanceof ElasticsearchException) { ElasticsearchException esEx = (ElasticsearchException) e.getCause(); - Throwable root = esEx.unwrapCause(); - if (root instanceof ElasticsearchException) { - return (ElasticsearchException) root; - } else if (root instanceof RuntimeException) { - return (RuntimeException) root; - } - return new UncategorizedExecutionException("Failed execution", root); + return unwrapEsException(esEx); } else if (e.getCause() instanceof RuntimeException) { return (RuntimeException) e.getCause(); } else { return new UncategorizedExecutionException("Failed execution", e); } } + + public static RuntimeException unwrapEsException(ElasticsearchException esEx) { + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException || root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index b7f08a36ac06f..d67cbc833d666 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -102,10 +102,12 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary( - final Request request, final IndexShard indexShard) throws Exception { - maybeSyncTranslog(indexShard); - return new PrimaryResult<>(request, new ReplicationResponse()); + protected void shardOperationOnPrimary(Request request, IndexShard indexShard, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + maybeSyncTranslog(indexShard); + return new PrimaryResult<>(request, new ReplicationResponse()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index 570159cc74d6e..9be2e55057331 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -127,14 +127,16 @@ public void backgroundSync( } @Override - protected PrimaryResult shardOperationOnPrimary( + protected void shardOperationOnPrimary( final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new PrimaryResult<>(request, new ReplicationResponse()); + final IndexShard primary, ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + return new PrimaryResult<>(request, new ReplicationResponse()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index a8aa7fe6f8ec2..6129affe493ae 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -123,14 +123,15 @@ public void sync( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); + protected void shardOperationOnPrimary(Request request, IndexShard primary, + ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); + }); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index 77bddbe215652..bef05ecda9fd8 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -110,7 +110,7 @@ public List getNamedXContents() { ); } - private Map getMappers(List mapperPlugins) { + public static Map getMappers(List mapperPlugins) { Map mappers = new LinkedHashMap<>(); // builtin mappers @@ -168,7 +168,7 @@ private static Map initBuiltInMetadataMa return Collections.unmodifiableMap(builtInMetadataMappers); } - private static Map getMetadataMappers(List mapperPlugins) { + public static Map getMetadataMappers(List mapperPlugins) { Map metadataMappers = new LinkedHashMap<>(); int i = 0; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 687b01680704e..f90f40311c1b8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.close; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -60,6 +61,7 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -133,18 +135,28 @@ public static void afterClass() { threadPool = null; } - private void executeOnPrimaryOrReplica() throws Exception { + private void executeOnPrimaryOrReplica() throws Throwable { final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); - if (randomBoolean()) { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); - } else { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); + final PlainActionFuture res = PlainActionFuture.newFuture(); + action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( + r -> { + assertNotNull(r); + res.onResponse(null); + }, + res::onFailure + )); + try { + res.get(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } catch (ExecutionException e) { + throw e.getCause(); } } - public void testShardIsFlushed() throws Exception { + public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); @@ -171,7 +183,7 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testVerifyShardBeforeIndexClosing() throws Exception { + public void testVerifyShardBeforeIndexClosing() throws Throwable { executeOnPrimaryOrReplica(); verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); @@ -271,8 +283,9 @@ public ReplicationGroup getReplicationGroup() { } @Override - public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception { - return new PrimaryResult(request); + public void perform( + TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener listener) { + listener.onResponse(new PrimaryResult(request)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/BulkRejectionIT.java b/server/src/test/java/org/elasticsearch/action/bulk/BulkRejectionIT.java new file mode 100644 index 0000000000000..c82d825b9424f --- /dev/null +++ b/server/src/test/java/org/elasticsearch/action/bulk/BulkRejectionIT.java @@ -0,0 +1,74 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.action.bulk; + +import org.elasticsearch.action.ActionFuture; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.Collections; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; + +@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2) +public class BulkRejectionIT extends ESIntegTestCase { + + @Override + protected Settings nodeSettings(int nodeOrdinal) { + return Settings.builder() + .put(super.nodeSettings(nodeOrdinal)) + .put("thread_pool.write.size", 1) + .put("thread_pool.write.queue_size", 1) + .build(); + } + + @Override + protected int numberOfReplicas() { + return 1; + } + + protected int numberOfShards() { + return 5; + } + + public void testBulkRejectionAfterDynamicMappingUpdate() throws Exception { + final String index = "test"; + assertAcked(prepareCreate(index)); + ensureGreen(); + final BulkRequest request1 = new BulkRequest(); + for (int i = 0; i < 500; ++i) { + request1.add(new IndexRequest(index).source(Collections.singletonMap("key" + i, "value" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + // Huge request to keep the write pool busy so that requests waiting on a mapping update in the other bulk request get rejected + // by the write pool + final BulkRequest request2 = new BulkRequest(); + for (int i = 0; i < 10_000; ++i) { + request2.add(new IndexRequest(index).source(Collections.singletonMap("key", "valuea" + i))) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + } + final ActionFuture bulkFuture1 = client().bulk(request1); + final ActionFuture bulkFuture2 = client().bulk(request2); + bulkFuture1.actionGet(); + bulkFuture2.actionGet(); + internalCluster().assertSeqNos(); + } +} diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 610a72de6ecfd..62217f7873138 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -22,13 +22,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.elasticsearch.action.update.UpdateHelper; @@ -75,6 +78,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { + private static final ActionListener ASSERTING_DONE_LISTENER = ActionTestUtils.assertNoFailureListener(r -> {}); + private final ShardId shardId = new ShardId("index", "_na_", 0); private final Settings idxSettings = Settings.builder() .put("index.number_of_shards", 1) @@ -146,10 +151,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); - UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems @@ -174,8 +178,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(secondContext, null, + threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), + listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); @@ -224,37 +229,44 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { final ElasticsearchStatusException rejectionCause = new ElasticsearchStatusException("testing rejection", rejectionStatus); rejectItem.abort("index", rejectionCause); - UpdateHelper updateHelper = null; - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, null, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + listener -> {}, ActionListener.runAfter( + ActionTestUtils.assertNoFailureListener(result -> { + // since at least 1 item passed, the tran log location should exist, + assertThat(((WritePrimaryResult) result).location, notNullValue()); + // and the response should exist and match the item count + assertThat(result.finalResponseIfSuccessful, notNullValue()); + assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); + + // check each response matches the input item, including the rejection + for (int i = 0; i < items.length; i++) { + BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; + assertThat(response.getItemId(), equalTo(i)); + assertThat(response.getIndex(), equalTo("index")); + assertThat(response.getType(), equalTo("_doc")); + assertThat(response.getId(), equalTo("id_" + i)); + assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + if (response.getItemId() == rejectItem.id()) { + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); + assertThat(response.status(), equalTo(rejectionStatus)); + } else { + assertFalse(response.isFailed()); + } + } + + // Check that the non-rejected updates made it to the shard + try { + assertDocCount(shard, items.length - 1); + closeShards(shard); + } catch (IOException e) { + throw new AssertionError(e); + } + }), latch::countDown), threadPool); - // since at least 1 item passed, the tran log location should exist, - assertThat(result.location, notNullValue()); - // and the response should exist and match the item count - assertThat(result.finalResponseIfSuccessful, notNullValue()); - assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); - - // check each response matches the input item, including the rejection - for (int i = 0; i < items.length; i++) { - BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; - assertThat(response.getItemId(), equalTo(i)); - assertThat(response.getIndex(), equalTo("index")); - assertThat(response.getType(), equalTo("_doc")); - assertThat(response.getId(), equalTo("id_" + i)); - assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); - if (response.getItemId() == rejectItem.id()) { - assertTrue(response.isFailed()); - assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); - assertThat(response.status(), equalTo(rejectionStatus)); - } else { - assertFalse(response.isFailed()); - } - } - - // Check that the non-rejected updates made it to the shard - assertDocCount(shard, items.length - 1); - closeShards(shard); + latch.await(); } public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { @@ -281,11 +293,12 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> { + (update, shardId, type, listener) -> { // There should indeed be a mapping update assertNotNull(update); updateCalled.incrementAndGet(); - }, () -> {}); + listener.onResponse(null); + }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -298,7 +311,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); + (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, + ASSERTING_DONE_LISTENER); // Verify that the shard "executed" the operation only once (1 for previous invocations plus @@ -325,8 +339,6 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - UpdateHelper updateHelper = null; - // Return an exception when trying to update the mapping, or when waiting for it to come RuntimeException err = new RuntimeException("some kind of exception"); @@ -335,9 +347,22 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.executeBulkItemRequest( + context, null, threadPool::relativeTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), - errorOnWait ? () -> { throw err; } : () -> {}); + errorOnWait ? listener -> listener.onFailure(err) : listener -> listener.onResponse(null), + new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(final Exception e) { + assertEquals(err, e); + } + }, latch)); + latch.await(); assertFalse(context.hasMoreOperationsToExecute()); // Translog shouldn't be synced, as there were conflicting mappings @@ -371,13 +396,12 @@ public void testExecuteBulkDeleteRequest() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Translog.Location location = new Translog.Location(0, 0, 0); - UpdateHelper updateHelper = null; randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, even though the document didn't exist @@ -418,8 +442,8 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, because the document was deleted @@ -475,7 +499,7 @@ public void testNoopUpdateRequest() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); @@ -521,7 +545,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Since this was not a conflict failure, the primary response @@ -571,7 +595,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -618,7 +642,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -664,7 +688,7 @@ public void testUpdateWithDelete() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -698,7 +722,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -731,7 +755,7 @@ public void testTranslogPositionToSync() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); while (context.hasMoreOperationsToExecute()) { TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); } assertTrue(shard.isSyncNeeded()); @@ -814,18 +838,22 @@ public void testRetries() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); - - assertThat(result.location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); - assertThat(primaryResponse.getItemId(), equalTo(0)); - assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); - DocWriteResponse response = primaryResponse.getResponse(); - assertThat(response.status(), equalTo(RestStatus.CREATED)); - assertThat(response.getSeqNo(), equalTo(13L)); + listener -> listener.onResponse(null), + new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener(result -> { + assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getSeqNo(), equalTo(13L)); + }), latch), threadPool); + latch.await(); } private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { @@ -875,7 +903,8 @@ public Translog.Location getTranslogLocation() { /** Doesn't perform any mapping updates */ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer { @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onResponse(null); } } @@ -888,8 +917,8 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { } @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { - throw e; + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onFailure(e); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 3af5047fe22e5..d493b70337208 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -287,11 +287,12 @@ public void testAddedReplicaAfterPrimaryOperation() throws Exception { final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard(); final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) { @Override - public Result perform(Request request) throws Exception { - Result result = super.perform(request); - replicationGroup.set(updatedReplicationGroup); - logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); - return result; + public void perform(Request request, ActionListener listener) { + super.perform(request, ActionListener.map(listener, result -> { + replicationGroup.set(updatedReplicationGroup); + logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); + return result; + })); } }; @@ -481,11 +482,11 @@ public void failShard(String message, Exception exception) { } @Override - public Result perform(Request request) throws Exception { + public void perform(Request request, ActionListener listener) { if (request.processedOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - return new Result(request); + listener.onResponse(new Result(request)); } static class Result implements ReplicationOperation.PrimaryResult { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index a663841ac6a47..e256ec4e92679 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; @@ -691,16 +692,16 @@ public void testPrimaryReference() throws Exception { }; TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(NO_SHARD_ID); - primary.perform(request); + primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { + final ElasticsearchException exception = new ElasticsearchException("testing"); + primary.failShard("test", exception); - final ElasticsearchException exception = new ElasticsearchException("testing"); - primary.failShard("test", exception); + verify(shard).failShard("test", exception); - verify(shard).failShard("test", exception); + primary.close(); - primary.close(); - - assertTrue(closed.get()); + assertTrue(closed.get()); + })); } public void testReplicaProxy() throws InterruptedException, ExecutionException { @@ -1229,10 +1230,11 @@ protected TestResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult<>(shardRequest, new TestResponse()); + listener.onResponse(new PrimaryResult<>(shardRequest, new TestResponse())); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 15886a517d380..8463d66e98e71 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -439,12 +439,13 @@ public String getActionName() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { executedOnPrimary.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the primary shard assertSame(primary, shard); - return new PrimaryResult<>(shardRequest, new Response()); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override @@ -499,10 +500,11 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before"); assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override @@ -545,9 +547,10 @@ protected void acquireReplicaOperationPermit(IndexShard shard, Request request, } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index 5a35202506d39..98c4f215fca8a 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -140,14 +141,15 @@ public void testPrimaryNoRefreshCall() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + verify(indexShard, never()).refresh(any()); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaNoRefreshCall() throws Exception { @@ -168,15 +170,16 @@ public void testPrimaryImmediateRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - assertTrue(listener.response.forcedRefresh); - verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + assertTrue(listener.response.forcedRefresh); + verify(indexShard).refresh("refresh_flag_index"); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaImmediateRefresh() throws Exception { @@ -198,23 +201,24 @@ public void testPrimaryWaitForRefresh() throws Exception { request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNull(listener.response); // Haven't reallresponded yet - - @SuppressWarnings({ "unchecked", "rawtypes" }) - ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); - verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), refreshListener.capture()); - - // Now we can fire the listener manually and we'll get a response - boolean forcedRefresh = randomBoolean(); - refreshListener.getValue().accept(forcedRefresh); - assertNotNull(listener.response); - assertNull(listener.failure); - assertEquals(forcedRefresh, listener.response.forcedRefresh); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNull(listener.response); // Haven't really responded yet + + @SuppressWarnings({"unchecked", "rawtypes"}) + ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); + verify(indexShard, never()).refresh(any()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + + // Now we can fire the listener manually and we'll get a response + boolean forcedRefresh = randomBoolean(); + refreshListener.getValue().accept(forcedRefresh); + assertNotNull(listener.response); + assertNull(listener.failure); + assertEquals(forcedRefresh, listener.response.forcedRefresh); + })); } public void testReplicaWaitForRefresh() throws Exception { @@ -240,12 +244,13 @@ public void testReplicaWaitForRefresh() throws Exception { public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - TransportWriteAction.WritePrimaryResult writePrimaryResult = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - writePrimaryResult.respond(listener); - assertNull(listener.response); - assertNotNull(listener.failure); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(writePrimaryResult -> { + CapturingActionListener listener = new CapturingActionListener<>(); + writePrimaryResult.respond(listener); + assertNull(listener.response); + assertNotNull(listener.failure); + })); } public void testDocumentFailureInShardOperationOnReplica() throws Exception { @@ -426,15 +431,15 @@ protected TestResponse newResponseInstance() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - TestRequest request, IndexShard primary) throws Exception { - final WritePrimaryResult primaryResult; - if (withDocumentFailureOnPrimary) { - primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); - } else { - primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); - } - return primaryResult; + protected void shardOperationOnPrimary( + TestRequest request, IndexShard primary, ActionListener> listener) { + ActionListener.completeWith(listener, () -> { + if (withDocumentFailureOnPrimary) { + return new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); + } else { + return new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); + } + }); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 97fc9e9defaa9..cec3c05b28438 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -18,6 +18,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -113,7 +114,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { new IndexNameExpressionResolver()); final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId()); if (randomBoolean()) { - action.shardOperationOnPrimary(primaryRequest, indexShard); + action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); } else { action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 81ea56c609624..f1c05c565f8ef 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -21,8 +21,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -47,6 +48,7 @@ import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; @@ -67,6 +69,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { private TransportService transportService; private ShardStateAction shardStateAction; + @Override public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); @@ -84,6 +87,7 @@ public void setUp() throws Exception { shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); } + @Override public void tearDown() throws Exception { try { IOUtils.close(transportService, clusterService, transport); @@ -93,7 +97,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws InterruptedException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -120,12 +124,15 @@ public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateE final RetentionLeaseBackgroundSyncAction.Request request = new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); - final ReplicationOperation.PrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); + final CountDownLatch latch = new CountDownLatch(1); + action.shardOperationOnPrimary(request, indexShard, + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + verify(indexShard).persistRetentionLeases(); + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + }), latch)); + latch.await(); } public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateException { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 9b9ad6a0962c1..73387d69b1e4c 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -90,7 +91,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseSyncActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -115,15 +116,16 @@ public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { new IndexNameExpressionResolver()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - - final TransportWriteAction.WritePrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); - // we should start with an empty replication response - assertNull(result.finalResponseIfSuccessful.getShardInfo()); + action.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + verify(indexShard).persistRetentionLeases(); + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + // we should start with an empty replication response + assertNull(result.finalResponseIfSuccessful.getShardInfo()); + } + )); } public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { @@ -156,7 +158,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); - // the retention leases on the shard should be persisteed + // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index cdc76292be06a..55c1acabd3fbd 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -19,8 +19,10 @@ package org.elasticsearch.action.support; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.common.CheckedConsumer; import static org.elasticsearch.action.support.PlainActionFuture.newFuture; @@ -34,4 +36,10 @@ Response executeBlocking(TransportAction action, Request requ action.execute(request, future); return future.actionGet(); } + + public static ActionListener assertNoFailureListener(CheckedConsumer consumer) { + return ActionListener.wrap(consumer, e -> { + throw new AssertionError(e); + }); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index a884d7de948ef..64e31059c5fab 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -36,12 +36,14 @@ import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; import org.elasticsearch.action.support.replication.ReplicationOperation; import org.elasticsearch.action.support.replication.ReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; +import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportReplicationAction.ReplicaResponse; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.support.replication.TransportWriteActionTestHelper; @@ -189,14 +191,15 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { sync(shardId, retentionLeases, ActionListener.wrap( r -> { }, e -> { - throw new AssertionError("failed to backgroun sync retention lease", e); + throw new AssertionError("failed to background sync retention lease", e); })); } }; protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); + primary = newShard( + primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); replicas = new CopyOnWriteArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -252,9 +255,8 @@ public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { private BulkItemResponse executeWriteRequest( DocWriteRequest writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), - listener::onFailure); + final ActionListener wrapBulkListener = + ActionListener.map(listener, bulkShardResponse -> bulkShardResponse.getResponses()[0]); BulkItemRequest[] items = new BulkItemRequest[1]; items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items); @@ -307,8 +309,7 @@ public IndexShard addReplica() throws IOException { } public synchronized void addReplica(IndexShard replica) throws IOException { - assert shardRoutings().stream() - .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : + assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())) == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); if (replicationTargets != null) { @@ -531,10 +532,8 @@ private synchronized ReplicationTargets getReplicationTargets() { } protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { - RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(shardId, leases); - ActionListener wrappedListener = ActionListener.wrap( - r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); - new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); + new SyncRetentionLeases(new RetentionLeaseSyncAction.Request(shardId, leases), this, + ActionListener.map(listener, r -> new ReplicationResponse())).execute(); } public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, @@ -609,8 +608,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - ActionListener.delegateFailure(listener, - (delegatedListener, result) -> result.respond(delegatedListener)), new ReplicasRef(), logger, opType).execute(); + ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType + ).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -620,7 +619,7 @@ IndexShard getPrimaryShard() { return replicationTargets.primary; } - protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; + protected abstract void performOnPrimary(IndexShard primary, Request request, ActionListener listener); protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; @@ -637,8 +636,8 @@ public void failShard(String message, Exception exception) { } @Override - public PrimaryResult perform(Request request) throws Exception { - return performOnPrimary(getPrimaryShard(), request); + public void perform(Request request, ActionListener listener) { + performOnPrimary(getPrimaryShard(), request, listener); } @Override @@ -744,10 +743,9 @@ class WriteReplicationAction extends ReplicationAction - result = executeShardBulkOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, BulkShardRequest request, ActionListener listener) { + executeShardBulkOnPrimary(primary, request, + ActionListener.map(listener, result -> new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful))); } @Override @@ -757,8 +755,8 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th } } - private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary( - IndexShard primary, BulkShardRequest request) throws Exception { + private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request, + ActionListener> listener) { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { ((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName()); @@ -766,21 +764,27 @@ private TransportWriteAction.WritePrimaryResult permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); - final TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, - null); + MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; + TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + null, ActionTestUtils.assertNoFailureListener(result -> { + TransportWriteActionTestHelper.performPostWriteActions(primary, request, + ((TransportWriteAction.WritePrimaryResult) result).location, logger); + listener.onResponse((TransportWriteAction.WritePrimaryResult) result); + }), threadPool); + } catch (Exception e) { + listener.onFailure(e); } - TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); - return result; } - private - BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception { + private BulkShardRequest executeReplicationRequestOnPrimary( + IndexShard primary, Request request) throws Exception { final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), new BulkItemRequest[]{new BulkItemRequest(0, request)}); - return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest(); + final PlainActionFuture res = new PlainActionFuture<>(); + executeShardBulkOnPrimary( + primary, bulkShardRequest, ActionListener.map(res, TransportReplicationAction.PrimaryResult::replicaRequest)); + return res.get(); } private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, @@ -843,10 +847,12 @@ class GlobalCheckpointSync extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary( - final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception { - primary.sync(); - return new PrimaryResult(request, new ReplicationResponse()); + protected void performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.Request request, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + primary.sync(); + return new PrimaryResult(request, new ReplicationResponse()); + }); } @Override @@ -862,10 +868,12 @@ class ResyncAction extends ReplicationAction result = - executeResyncOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, ResyncReplicationRequest request, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final TransportWriteAction.WritePrimaryResult result = + executeResyncOnPrimary(primary, request); + return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + }); } @Override @@ -905,9 +913,12 @@ class SyncRetentionLeases extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request) throws Exception { - primary.persistRetentionLeases(); - return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + protected void performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request, + ActionListener listener) { + ActionListener.completeWith(listener, () -> { + primary.persistRetentionLeases(); + return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + }); } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 74e9ceda67026..45ffbf6998d90 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -61,13 +61,13 @@ public TransportBulkShardOperationsAction( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, + ActionListener> listener) { if (logger.isTraceEnabled()) { logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); } - return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), - request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); + ActionListener.completeWith(listener, () -> shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), + request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger)); } public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index adbff7fb29f99..c5a357c7df817 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -643,20 +643,22 @@ class CcrAction extends ReplicationAction permitFuture = new PlainActionFuture<>(); - primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); - final TransportWriteAction.WritePrimaryResult ccrResult; - try (Releasable ignored = permitFuture.get()) { - ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), - request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); - } - return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { - @Override - public void respond(ActionListener listener) { - ccrResult.respond(listener); + protected void performOnPrimary(IndexShard primary, BulkShardOperationsRequest request, ActionListener listener) { + ActionListener.completeWith(listener, () -> { + final PlainActionFuture permitFuture = new PlainActionFuture<>(); + primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); + final TransportWriteAction.WritePrimaryResult ccrResult; + try (Releasable ignored = permitFuture.get()) { + ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), + request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } - }; + return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { + @Override + public void respond(ActionListener listener) { + ccrResult.respond(listener); + } + }; + }); } @Override From 51b9d5ee3a4bab5e3187f5be52f9b0d8184e7963 Mon Sep 17 00:00:00 2001 From: Jason Tedor Date: Sat, 6 Apr 2019 17:23:51 -0400 Subject: [PATCH 2/3] Be lenient when parsing build flavor and type on the wire (#40734) Today we are strict when parsing build flavor and types off the wire. This means that if a later version introduces a new build flavor or type, an older version would not be able to parse what that new version is sending. For a practical example of this, we recently added the build type "docker", and this means that in a rolling upgrade scenario older nodes would not be able to understand the build type that the newer node is sending. This breaks clusters and is bad. We do not normally think of adding a new enumeration value as being a serialization breaking change, it is just not a lesson that we have learned before. We should be lenient here though, so that we can add future changes without running the risk of breaking ourselves horribly. It is either that, or we have super-strict testing infrastructure here yet still I fear the possibility of mistakes. This commit changes the parsing of build flavor and build type so that we are still strict at startup, yet we are lenient with values coming across the wire. This will help avoid us breaking rolling upgrades, or clients that are on an older version. --- .../main/java/org/elasticsearch/Build.java | 29 +++++++++---- .../action/main/MainResponse.java | 8 +++- .../java/org/elasticsearch/BuildTests.java | 42 +++++++++++++++++++ 3 files changed, 69 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/Build.java b/server/src/main/java/org/elasticsearch/Build.java index be37c56837d70..1b1cd8d3e720a 100644 --- a/server/src/main/java/org/elasticsearch/Build.java +++ b/server/src/main/java/org/elasticsearch/Build.java @@ -57,7 +57,7 @@ public String displayName() { return displayName; } - public static Flavor fromDisplayName(final String displayName) { + public static Flavor fromDisplayName(final String displayName, final boolean strict) { switch (displayName) { case "default": return Flavor.DEFAULT; @@ -66,7 +66,12 @@ public static Flavor fromDisplayName(final String displayName) { case "unknown": return Flavor.UNKNOWN; default: - throw new IllegalStateException("unexpected distribution flavor [" + displayName + "]; your distribution is broken"); + if (strict) { + final String message = "unexpected distribution flavor [" + displayName + "]; your distribution is broken"; + throw new IllegalStateException(message); + } else { + return Flavor.UNKNOWN; + } } } @@ -91,7 +96,7 @@ public String displayName() { this.displayName = displayName; } - public static Type fromDisplayName(final String displayName) { + public static Type fromDisplayName(final String displayName, final boolean strict) { switch (displayName) { case "deb": return Type.DEB; @@ -106,9 +111,14 @@ public static Type fromDisplayName(final String displayName) { case "unknown": return Type.UNKNOWN; default: - throw new IllegalStateException("unexpected distribution type [" + displayName + "]; your distribution is broken"); + if (strict) { + throw new IllegalStateException("unexpected distribution type [" + displayName + "]; your distribution is broken"); + } else { + return Type.UNKNOWN; + } } } + } static { @@ -119,8 +129,9 @@ public static Type fromDisplayName(final String displayName) { final boolean isSnapshot; final String version; - flavor = Flavor.fromDisplayName(System.getProperty("es.distribution.flavor", "unknown")); - type = Type.fromDisplayName(System.getProperty("es.distribution.type", "unknown")); + // these are parsed at startup, and we require that we are able to recognize the values passed in by the startup scripts + flavor = Flavor.fromDisplayName(System.getProperty("es.distribution.flavor", "unknown"), true); + type = Type.fromDisplayName(System.getProperty("es.distribution.type", "unknown"), true); final String esPrefix = "elasticsearch-" + Version.CURRENT; final URL url = getElasticsearchCodeSourceLocation(); @@ -214,12 +225,14 @@ public static Build readBuild(StreamInput in) throws IOException { final Flavor flavor; final Type type; if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - flavor = Flavor.fromDisplayName(in.readString()); + // be lenient when reading on the wire, the enumeration values from other versions might be different than what we know + flavor = Flavor.fromDisplayName(in.readString(), false); } else { flavor = Flavor.OSS; } if (in.getVersion().onOrAfter(Version.V_6_3_0)) { - type = Type.fromDisplayName(in.readString()); + // be lenient when reading on the wire, the enumeration values from other versions might be different than what we know + type = Type.fromDisplayName(in.readString(), false); } else { type = Type.UNKNOWN; } diff --git a/server/src/main/java/org/elasticsearch/action/main/MainResponse.java b/server/src/main/java/org/elasticsearch/action/main/MainResponse.java index dc54a112f6779..8485c47c8c79b 100644 --- a/server/src/main/java/org/elasticsearch/action/main/MainResponse.java +++ b/server/src/main/java/org/elasticsearch/action/main/MainResponse.java @@ -129,8 +129,12 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws final String buildType = (String) value.get("build_type"); response.build = new Build( - buildFlavor == null ? Build.Flavor.UNKNOWN : Build.Flavor.fromDisplayName(buildFlavor), - buildType == null ? Build.Type.UNKNOWN : Build.Type.fromDisplayName(buildType), + /* + * Be lenient when reading on the wire, the enumeration values from other versions might be different than what + * we know. + */ + buildFlavor == null ? Build.Flavor.UNKNOWN : Build.Flavor.fromDisplayName(buildFlavor, false), + buildType == null ? Build.Type.UNKNOWN : Build.Type.fromDisplayName(buildType, false), (String) value.get("build_hash"), (String) value.get("build_date"), (boolean) value.get("build_snapshot"), diff --git a/server/src/test/java/org/elasticsearch/BuildTests.java b/server/src/test/java/org/elasticsearch/BuildTests.java index f1d48c08b3953..e0d8140c708d6 100644 --- a/server/src/test/java/org/elasticsearch/BuildTests.java +++ b/server/src/test/java/org/elasticsearch/BuildTests.java @@ -35,7 +35,10 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasToString; +import static org.hamcrest.Matchers.sameInstance; public class BuildTests extends ESTestCase { @@ -223,4 +226,43 @@ public void testSerializationBWC() throws IOException { assertThat(post67pre70.build.getQualifiedVersion(), equalTo(post67Pre70Version.toString())); assertThat(post70.build.getQualifiedVersion(), equalTo(dockerBuild.build.getQualifiedVersion())); } + + public void testFlavorParsing() { + for (final Build.Flavor flavor : Build.Flavor.values()) { + // strict or not should not impact parsing at all here + assertThat(Build.Flavor.fromDisplayName(flavor.displayName(), randomBoolean()), sameInstance(flavor)); + } + } + + public void testTypeParsing() { + for (final Build.Type type : Build.Type.values()) { + // strict or not should not impact parsing at all here + assertThat(Build.Type.fromDisplayName(type.displayName(), randomBoolean()), sameInstance(type)); + } + } + + public void testLenientFlavorParsing() { + final String displayName = randomAlphaOfLength(8); + assertThat(Build.Flavor.fromDisplayName(displayName, false), equalTo(Build.Flavor.UNKNOWN)); + } + + public void testStrictFlavorParsing() { + final String displayName = randomAlphaOfLength(8); + @SuppressWarnings("ResultOfMethodCallIgnored") final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> Build.Flavor.fromDisplayName(displayName, true)); + assertThat(e, hasToString(containsString("unexpected distribution flavor [" + displayName + "]; your distribution is broken"))); + } + + public void testLenientTypeParsing() { + final String displayName = randomAlphaOfLength(8); + assertThat(Build.Type.fromDisplayName(displayName, false), equalTo(Build.Type.UNKNOWN)); + } + + public void testStrictTypeParsing() { + final String displayName = randomAlphaOfLength(8); + @SuppressWarnings("ResultOfMethodCallIgnored") final IllegalStateException e = + expectThrows(IllegalStateException.class, () -> Build.Type.fromDisplayName(displayName, true)); + assertThat(e, hasToString(containsString("unexpected distribution type [" + displayName + "]; your distribution is broken"))); + } + } From 8428f9ca55ad7953347172764c70488a0b005c89 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Sun, 7 Apr 2019 11:02:50 +0200 Subject: [PATCH 3/3] Fix Failing to Handle Ex. in TransportShardBulkAction (#40923) * Fixing minor mistake from #39793 here, we should be using `run` so that the `onFailure` path is executed if the first invocation of this `Runnable` fails for an unexpected reason --- .../org/elasticsearch/action/bulk/TransportShardBulkAction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index afa26bfeb9700..a55de9f1497b2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -194,7 +194,7 @@ private void finishRequest() { null, context.getPrimary(), logger)); } - }.doRun(); + }.run(); } /**