From 5ee4211689c9ace02c1c1cb7ad453da709588114 Mon Sep 17 00:00:00 2001 From: Armin Braun Date: Tue, 5 Mar 2019 13:43:01 +0100 Subject: [PATCH] Make TransportBulkAction Non-Blocking * Dependency of #39504 --- ...TransportVerifyShardBeforeCloseAction.java | 14 +- .../flush/TransportShardFlushAction.java | 15 +- .../refresh/TransportShardRefreshAction.java | 14 +- .../action/bulk/MappingUpdatePerformer.java | 3 +- .../action/bulk/TransportShardBulkAction.java | 368 +++++++++--------- .../TransportSingleItemBulkWriteAction.java | 35 +- .../TransportResyncReplicationAction.java | 12 +- .../replication/ReplicationOperation.java | 57 +-- .../TransportReplicationAction.java | 21 +- .../replication/TransportWriteAction.java | 8 +- .../action/index/MappingUpdatedAction.java | 59 ++- .../seqno/GlobalCheckpointSyncAction.java | 12 +- .../RetentionLeaseBackgroundSyncAction.java | 18 +- .../index/seqno/RetentionLeaseSyncAction.java | 19 +- .../elasticsearch/indices/IndicesModule.java | 4 +- ...portVerifyShardBeforeCloseActionTests.java | 32 +- .../bulk/TransportShardBulkActionTests.java | 164 ++++---- .../ReplicationOperationTests.java | 15 +- .../TransportReplicationActionTests.java | 20 +- ...ReplicationAllPermitsAcquisitionTests.java | 15 +- .../TransportWriteActionTests.java | 91 +++-- .../GlobalCheckpointSyncActionTests.java | 3 +- ...tentionLeaseBackgroundSyncActionTests.java | 27 +- .../seqno/RetentionLeaseSyncActionTests.java | 28 +- .../action/support/ActionTestUtils.java | 17 + .../ESIndexLevelReplicationTestCase.java | 122 +++--- .../TransportBulkShardOperationsAction.java | 12 +- .../ShardFollowTaskReplicationTests.java | 25 +- 28 files changed, 699 insertions(+), 531 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java index 13269344a1ac6..2e0eef6057ac9 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseAction.java @@ -85,14 +85,18 @@ protected void acquireReplicaOperationPermit(final IndexShard replica, } @Override - protected PrimaryResult shardOperationOnPrimary(final ShardRequest shardRequest, - final IndexShard primary) throws Exception { - executeShardOperation(shardRequest, primary); - return new PrimaryResult<>(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary, + ActionListener> listener) { + try { + executeShardOperation(shardRequest, primary); + listener.onResponse(new PrimaryResult<>(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override - protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) throws Exception { + protected ReplicaResult shardOperationOnReplica(final ShardRequest shardRequest, final IndexShard replica) { executeShardOperation(shardRequest, replica); return new ReplicaResult(); } diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java index 344a817fa8b83..921a5003c36b6 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.flush; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; @@ -51,11 +52,15 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(ShardFlushRequest shardRequest, - IndexShard primary) { - primary.flush(shardRequest.getRequest()); - logger.trace("{} flush request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(ShardFlushRequest shardRequest, IndexShard primary, + ActionListener> listener) { + try { + primary.flush(shardRequest.getRequest()); + logger.trace("{} flush request executed on primary", primary.shardId()); + listener.onResponse(new PrimaryResult<>(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java index 0b5975cf025af..6acd7f2fdf367 100644 --- a/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java +++ b/server/src/main/java/org/elasticsearch/action/admin/indices/refresh/TransportShardRefreshAction.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.refresh; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.replication.BasicReplicationRequest; import org.elasticsearch.action.support.replication.ReplicationResponse; @@ -53,10 +54,15 @@ protected ReplicationResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary) { - primary.refresh("api"); - logger.trace("{} refresh request executed on primary", primary.shardId()); - return new PrimaryResult(shardRequest, new ReplicationResponse()); + protected void shardOperationOnPrimary(BasicReplicationRequest shardRequest, IndexShard primary, + ActionListener> listener) { + try { + primary.refresh("api"); + logger.trace("{} refresh request executed on primary", primary.shardId()); + listener.onResponse(new PrimaryResult(shardRequest, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java index 1f228b0f355e0..5a38f0f43e070 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.bulk; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.shard.ShardId; @@ -27,6 +28,6 @@ public interface MappingUpdatePerformer { /** * Update the mappings on the master. */ - void updateMappings(Mapping update, ShardId shardId, String type); + void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener); } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index f182c2985815d..bd52a2fb7e964 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -22,7 +22,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.apache.logging.log4j.util.MessageSupplier; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRunnable; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; import org.elasticsearch.action.delete.DeleteRequest; @@ -30,7 +33,6 @@ import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.action.update.UpdateHelper; @@ -44,8 +46,6 @@ import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.common.CheckedRunnable; -import org.elasticsearch.common.CheckedSupplier; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.common.collect.Tuple; import org.elasticsearch.common.inject.Inject; @@ -57,7 +57,6 @@ import org.elasticsearch.index.engine.VersionConflictEngineException; import org.elasticsearch.index.get.GetResult; import org.elasticsearch.index.mapper.MapperException; -import org.elasticsearch.index.mapper.Mapping; import org.elasticsearch.index.mapper.SourceToParse; import org.elasticsearch.index.seqno.SequenceNumbers; import org.elasticsearch.index.shard.IndexShard; @@ -69,8 +68,8 @@ import org.elasticsearch.transport.TransportRequestOptions; import org.elasticsearch.transport.TransportService; -import java.io.IOException; import java.util.Map; +import java.util.concurrent.Executor; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.LongSupplier; @@ -82,7 +81,6 @@ public class TransportShardBulkAction extends TransportWriteAction shardOperationOnPrimary(BulkShardRequest request, IndexShard primary) - throws Exception { + protected void shardOperationOnPrimary(BulkShardRequest request, IndexShard primary, + ActionListener> listener) { ClusterStateObserver observer = new ClusterStateObserver(clusterService, request.timeout(), logger, threadPool.getThreadContext()); - CheckedRunnable waitForMappingUpdate = () -> { - PlainActionFuture waitingFuture = new PlainActionFuture<>(); - observer.waitForNextChange(new ClusterStateObserver.Listener() { + performOnPrimary(request, primary, updateHelper, threadPool::relativeTimeInMillis, + (update, shardId, type, mappingListener) -> { + assert update != null; + assert shardId != null; + mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update, mappingListener); + }, + mappingUpdateListener -> observer.waitForNextChange(new ClusterStateObserver.Listener() { @Override public void onNewClusterState(ClusterState state) { - waitingFuture.onResponse(null); + mappingUpdateListener.onResponse(null); } @Override public void onClusterServiceClose() { - waitingFuture.onFailure(new NodeClosedException(clusterService.localNode())); + mappingUpdateListener.onFailure(new NodeClosedException(clusterService.localNode())); } @Override public void onTimeout(TimeValue timeout) { - waitingFuture.onFailure( - new MapperException("timed out while waiting for a dynamic mapping update")); + mappingUpdateListener.onFailure(new MapperException("timed out while waiting for a dynamic mapping update")); } - }); - waitingFuture.get(); - }; - return performOnPrimary(request, primary, updateHelper, threadPool::absoluteTimeInMillis, - new ConcreteMappingUpdatePerformer(), waitForMappingUpdate); + }), listener, threadPool + ); } - public static WritePrimaryResult performOnPrimary( + public static void performOnPrimary( BulkShardRequest request, IndexShard primary, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, MappingUpdatePerformer mappingUpdater, - CheckedRunnable waitForMappingUpdate) throws Exception { - BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); - return performOnPrimary(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - } - - private static WritePrimaryResult performOnPrimary( - BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) throws Exception { - - while (context.hasMoreOperationsToExecute()) { - executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate); - assert context.isInitial(); // either completed and moved to next or reset - } - return new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), - null, context.getPrimary(), logger); + Consumer> waitForMappingUpdate, + ActionListener> listener, + ThreadPool threadPool) { + new ActionRunnable>(listener) { + + private final Executor executor = threadPool.executor(ThreadPool.Names.WRITE); + + private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary); + + @Override + protected void doRun() { + while (context.hasMoreOperationsToExecute()) { + if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate, + ActionListener.wrap(v -> executor.execute(this), listener::onFailure)) == false) { + // We are waiting for a mapping update on another thread, that will invoke this action again once its done + // so we just break out here. + return; + } + assert context.isInitial(); // either completed and moved to next or reset + } + // We're done, there's no more operations to execute so we resolve the wrapped listener + listener.onResponse( + new WritePrimaryResult<>(context.getBulkShardRequest(), context.buildShardResponse(), context.getLocationToSync(), + null, context.getPrimary(), logger)); + } + }.doRun(); } - /** Executes bulk item requests and handles request execution exceptions */ - static void executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, - MappingUpdatePerformer mappingUpdater, CheckedRunnable waitForMappingUpdate) - throws Exception { - final DocWriteRequest.OpType opType = context.getCurrent().opType(); - - final UpdateHelper.Result updateResult; - if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - try { - updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); - } catch (Exception failure) { - // we may fail translating a update to index or delete operation - // we use index result to communicate failure while translating update request - final Engine.Result result = new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); - context.setRequestToExecute(updateRequest); - context.markOperationAsExecuted(result); - context.markAsCompleted(context.getExecutionResult()); - return; - } - // execute translated update request - switch (updateResult.getResponseResult()) { - case CREATED: - case UPDATED: - IndexRequest indexRequest = updateResult.action(); - IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); - MappingMetaData mappingMd = metaData.mappingOrDefault(); - indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); - context.setRequestToExecute(indexRequest); - break; - case DELETED: - context.setRequestToExecute(updateResult.action()); - break; - case NOOP: - context.markOperationAsNoOp(updateResult.action()); + /** + * Executes bulk item requests and handles request execution exceptions. + * @return {@code true} if request completed on this thread and the listener was invoked, {@code false} if the request triggered + * a mapping update that will finish and invoke the listener on a different thread + */ + static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier, + MappingUpdatePerformer mappingUpdater, Consumer> waitForMappingUpdate, + ActionListener itemDoneListener) { + try { + final DocWriteRequest.OpType opType = context.getCurrent().opType(); + + final UpdateHelper.Result updateResult; + if (opType == DocWriteRequest.OpType.UPDATE) { + final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); + try { + updateResult = updateHelper.prepare(updateRequest, context.getPrimary(), nowInMillisSupplier); + } catch (Exception failure) { + // we may fail translating a update to index or delete operation + // we use index result to communicate failure while translating update request + final Engine.Result result = + new Engine.IndexResult(failure, updateRequest.version(), SequenceNumbers.UNASSIGNED_SEQ_NO); + context.setRequestToExecute(updateRequest); + context.markOperationAsExecuted(result); context.markAsCompleted(context.getExecutionResult()); - return; - default: - throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + return true; + } + // execute translated update request + switch (updateResult.getResponseResult()) { + case CREATED: + case UPDATED: + IndexRequest indexRequest = updateResult.action(); + IndexMetaData metaData = context.getPrimary().indexSettings().getIndexMetaData(); + MappingMetaData mappingMd = metaData.mappingOrDefault(); + indexRequest.process(metaData.getCreationVersion(), mappingMd, updateRequest.concreteIndex()); + context.setRequestToExecute(indexRequest); + break; + case DELETED: + context.setRequestToExecute(updateResult.action()); + break; + case NOOP: + context.markOperationAsNoOp(updateResult.action()); + context.markAsCompleted(context.getExecutionResult()); + return true; + default: + throw new IllegalStateException("Illegal update operation " + updateResult.getResponseResult()); + } + } else { + context.setRequestToExecute(context.getCurrent()); + updateResult = null; } - } else { - context.setRequestToExecute(context.getCurrent()); - updateResult = null; - } - - assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - if (context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE) { - executeDeleteRequestOnPrimary(context, mappingUpdater); - } else { - executeIndexRequestOnPrimary(context, mappingUpdater); - } + assert context.getRequestToExecute() != null; // also checks that we're in TRANSLATED state - if (context.requiresWaitingForMappingUpdate()) { + final IndexShard primary = context.getPrimary(); + final long version = context.getRequestToExecute().version(); + final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE; + final Function exceptionToResult = + isDelete ? e -> primary.getFailedDeleteResult(e, version) : e -> primary.getFailedIndexResult(e, version); try { - waitForMappingUpdate.run(); - context.resetForExecutionForRetry(); + final Engine.Result result; + if (isDelete) { + final DeleteRequest request = context.getRequestToExecute(); + result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(), + request.ifSeqNo(), request.ifPrimaryTerm()); + } else { + final IndexRequest request = context.getRequestToExecute(); + result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse( + request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()), + request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()); + } + if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { + mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(), + context.getRequestToExecute().type(), + new ActionListener() { + @Override + public void onResponse(Void v) { + context.markAsRequiringMappingUpdate(); + waitForMappingUpdate.accept( + ActionListener.runAfter(new ActionListener() { + @Override + public void onResponse(Void v) { + assert context.requiresWaitingForMappingUpdate(); + context.resetForExecutionForRetry(); + } + + @Override + public void onFailure(Exception e) { + context.failOnMappingUpdate(e); + } + }, () -> itemDoneListener.onResponse(null)) + ); + } + + @Override + public void onFailure(Exception e) { + onComplete(exceptionToResult.apply(e), context, updateResult); + // Requesting mapping update failed, so we don't have to wait for a cluster state update + assert context.isInitial(); + itemDoneListener.onResponse(null); + } + }); + return false; + } else { + onComplete(result, context, updateResult); + return true; + } } catch (Exception e) { - context.failOnMappingUpdate(e); - } - return; - } - - assert context.isOperationExecuted(); - - if (opType == DocWriteRequest.OpType.UPDATE && - context.getExecutionResult().isFailed() && - isConflictException(context.getExecutionResult().getFailure().getCause())) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - if (context.getRetryCounter() < updateRequest.retryOnConflict()) { - context.resetForExecutionForRetry(); - return; + onComplete(exceptionToResult.apply(e), context, updateResult); + return true; } + } catch (Exception e) { + itemDoneListener.onFailure(e); + return true; } - - finalizePrimaryOperationOnCompletion(context, opType, updateResult); } - private static void finalizePrimaryOperationOnCompletion(BulkPrimaryExecutionContext context, DocWriteRequest.OpType opType, - UpdateHelper.Result updateResult) { + private static void onComplete(Engine.Result r, BulkPrimaryExecutionContext context, UpdateHelper.Result updateResult) { + context.markOperationAsExecuted(r); + final DocWriteRequest docWriteRequest = context.getCurrent(); + final DocWriteRequest.OpType opType = docWriteRequest.opType(); + final boolean isUpdate = opType == DocWriteRequest.OpType.UPDATE; final BulkItemResponse executionResult = context.getExecutionResult(); - if (opType == DocWriteRequest.OpType.UPDATE) { - final UpdateRequest updateRequest = (UpdateRequest) context.getCurrent(); - context.markAsCompleted( - processUpdateResponse(updateRequest, context.getConcreteIndex(), executionResult, updateResult)); - } else if (executionResult.isFailed()) { - final Exception failure = executionResult.getFailure().getCause(); - final DocWriteRequest docWriteRequest = context.getCurrent(); - if (TransportShardBulkAction.isConflictException(failure)) { - logger.trace(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } else { - logger.debug(() -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", - context.getPrimary().shardId(), docWriteRequest.opType().getLowercase(), docWriteRequest), failure); - } - - context.markAsCompleted(executionResult); + final boolean isFailed = executionResult.isFailed(); + if (isUpdate && isFailed && isConflictException(executionResult.getFailure().getCause()) + && context.getRetryCounter() < ((UpdateRequest) docWriteRequest).retryOnConflict()) { + context.resetForExecutionForRetry(); + return; + } + final BulkItemResponse response; + if (isUpdate) { + response = processUpdateResponse((UpdateRequest) docWriteRequest, context.getConcreteIndex(), executionResult, updateResult); } else { - context.markAsCompleted(executionResult); + if (isFailed) { + final Exception failure = executionResult.getFailure().getCause(); + final MessageSupplier messageSupplier = () -> new ParameterizedMessage("{} failed to execute bulk item ({}) {}", + context.getPrimary().shardId(), opType.getLowercase(), docWriteRequest); + if (TransportShardBulkAction.isConflictException(failure)) { + logger.trace(messageSupplier, failure); + } else { + logger.debug(messageSupplier, failure); + } + } + response = executionResult; } + context.markAsCompleted(response); assert context.isInitial(); } @@ -275,10 +328,8 @@ private static boolean isConflictException(final Exception e) { /** * Creates a new bulk item result from the given requests and result of performing the update operation on the shard. */ - static BulkItemResponse processUpdateResponse(final UpdateRequest updateRequest, final String concreteIndex, - BulkItemResponse operationResponse, - final UpdateHelper.Result translate) { - + private static BulkItemResponse processUpdateResponse(UpdateRequest updateRequest, String concreteIndex, + BulkItemResponse operationResponse, UpdateHelper.Result translate) { final BulkItemResponse response; DocWriteResponse.Result translatedResult = translate.getResponseResult(); if (operationResponse.isFailed()) { @@ -439,65 +490,4 @@ private static Engine.Result performOpOnReplica(DocWriteResponse primaryResponse } return result; } - - /** Executes index operation on primary shard after updates mapping if dynamic mappings are found */ - private static void executeIndexRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { - final IndexRequest request = context.getRequestToExecute(); - final IndexShard primary = context.getPrimary(); - final SourceToParse sourceToParse = - new SourceToParse(request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()); - executeOnPrimaryWhileHandlingMappingUpdates(context, - () -> - primary.applyIndexOperationOnPrimary(request.version(), request.versionType(), sourceToParse, - request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry()), - e -> primary.getFailedIndexResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); - } - - private static void executeDeleteRequestOnPrimary(BulkPrimaryExecutionContext context, - MappingUpdatePerformer mappingUpdater) throws Exception { - final DeleteRequest request = context.getRequestToExecute(); - final IndexShard primary = context.getPrimary(); - executeOnPrimaryWhileHandlingMappingUpdates(context, - () -> primary.applyDeleteOperationOnPrimary(request.version(), request.type(), request.id(), request.versionType(), - request.ifSeqNo(), request.ifPrimaryTerm()), - e -> primary.getFailedDeleteResult(e, request.version()), - context::markOperationAsExecuted, - mapping -> mappingUpdater.updateMappings(mapping, primary.shardId(), request.type())); - } - - private static void executeOnPrimaryWhileHandlingMappingUpdates( - BulkPrimaryExecutionContext context, CheckedSupplier toExecute, - Function exceptionToResult, Consumer onComplete, Consumer mappingUpdater) - throws IOException { - T result = toExecute.get(); - if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) { - // try to update the mappings and mark the context as needing to try again. - try { - mappingUpdater.accept(result.getRequiredMappingUpdate()); - context.markAsRequiringMappingUpdate(); - } catch (Exception e) { - // failure to update the mapping should translate to a failure of specific requests. Other requests - // still need to be executed and replicated. - onComplete.accept(exceptionToResult.apply(e)); - return; - } - } else { - onComplete.accept(result); - } - } - - class ConcreteMappingUpdatePerformer implements MappingUpdatePerformer { - - @Override - public void updateMappings(final Mapping update, final ShardId shardId, final String type) { - assert update != null; - assert shardId != null; - // can throw timeout exception when updating mappings or ISE for attempting to - // update default mappings which are bubbled up - mappingUpdatedAction.updateMappingOnMaster(shardId.getIndex(), type, update); - } - } } diff --git a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java index 892daae4bb275..03ee359b628b2 100644 --- a/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -70,27 +70,30 @@ protected void doExecute(Task task, final Request request, final ActionListener< } @Override - protected WritePrimaryResult shardOperationOnPrimary( - Request request, final IndexShard primary) throws Exception { + protected void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener) { BulkItemRequest[] itemRequests = new BulkItemRequest[1]; WriteRequest.RefreshPolicy refreshPolicy = request.getRefreshPolicy(); request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); itemRequests[0] = new BulkItemRequest(0, ((DocWriteRequest) request)); BulkShardRequest bulkShardRequest = new BulkShardRequest(request.shardId(), refreshPolicy, itemRequests); - WritePrimaryResult bulkResult = - shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary); - assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; - BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; - final Response response; - final Exception failure; - if (itemResponse.isFailed()) { - failure = itemResponse.getFailure().getCause(); - response = null; - } else { - response = (Response) itemResponse.getResponse(); - failure = null; - } - return new WritePrimaryResult<>(request, response, bulkResult.location, failure, primary, logger); + shardBulkAction.shardOperationOnPrimary(bulkShardRequest, primary, + ActionListener.map(listener, bulkResult -> { + assert bulkResult.finalResponseIfSuccessful.getResponses().length == 1 : "expected only one bulk shard response"; + BulkItemResponse itemResponse = bulkResult.finalResponseIfSuccessful.getResponses()[0]; + final Response response; + final Exception failure; + if (itemResponse.isFailed()) { + failure = itemResponse.getFailure().getCause(); + response = null; + } else { + response = (Response) itemResponse.getResponse(); + failure = null; + } + return new WritePrimaryResult<>( + request, response, + ((WritePrimaryResult) bulkResult).location, failure, primary, logger); + })); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java index 3f09f00b9ac1e..dc14d5e553842 100644 --- a/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/resync/TransportResyncReplicationAction.java @@ -115,10 +115,14 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - ResyncReplicationRequest request, IndexShard primary) throws Exception { - final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary); - return new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger); + protected void shardOperationOnPrimary(ResyncReplicationRequest request, IndexShard primary, + ActionListener> listener) { + try { + final ResyncReplicationRequest replicaRequest = performOnPrimary(request, primary); + listener.onResponse(new WritePrimaryResult<>(replicaRequest, new ResyncReplicationResponse(), null, null, primary, logger)); + } catch (Exception e) { + listener.onFailure(e); + } } public static ResyncReplicationRequest performOnPrimary(ResyncReplicationRequest request, IndexShard primary) { diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java index c232404d5fe5d..16f1b73c56c40 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/ReplicationOperation.java @@ -102,33 +102,36 @@ public void execute() throws Exception { totalShards.incrementAndGet(); pendingActions.incrementAndGet(); // increase by 1 until we finish all primary coordination - primaryResult = primary.perform(request); - primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); - final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); - if (replicaRequest != null) { - if (logger.isTraceEnabled()) { - logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); - } + primary.perform(request, ActionListener.wrap(primaryResult -> { + this.primaryResult = primaryResult; + primary.updateLocalCheckpointForShard(primaryRouting.allocationId().getId(), primary.localCheckpoint()); + final ReplicaRequest replicaRequest = primaryResult.replicaRequest(); + if (replicaRequest != null) { + if (logger.isTraceEnabled()) { + logger.trace("[{}] op [{}] completed on primary for request [{}]", primaryId, opType, request); + } - // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. - // we have to make sure that every operation indexed into the primary after recovery start will also be replicated - // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. - // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint - // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset - // of the sampled replication group, and advanced further than what the given replication group would allow it to. - // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. - final long globalCheckpoint = primary.globalCheckpoint(); - // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of - // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed on. - final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); - assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; - final ReplicationGroup replicationGroup = primary.getReplicationGroup(); - markUnavailableShardsAsStale(replicaRequest, replicationGroup); - performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); - } + // we have to get the replication group after successfully indexing into the primary in order to honour recovery semantics. + // we have to make sure that every operation indexed into the primary after recovery start will also be replicated + // to the recovery target. If we used an old replication group, we may miss a recovery that has started since then. + // we also have to make sure to get the global checkpoint before the replication group, to ensure that the global checkpoint + // is valid for this replication group. If we would sample in the reverse, the global checkpoint might be based on a subset + // of the sampled replication group, and advanced further than what the given replication group would allow it to. + // This would entail that some shards could learn about a global checkpoint that would be higher than its local checkpoint. + final long globalCheckpoint = primary.globalCheckpoint(); + // we have to capture the max_seq_no_of_updates after this request was completed on the primary to make sure the value of + // max_seq_no_of_updates on replica when this request is executed is at least the value on the primary when it was executed + // on. + final long maxSeqNoOfUpdatesOrDeletes = primary.maxSeqNoOfUpdatesOrDeletes(); + assert maxSeqNoOfUpdatesOrDeletes != SequenceNumbers.UNASSIGNED_SEQ_NO : "seqno_of_updates still uninitialized"; + final ReplicationGroup replicationGroup = primary.getReplicationGroup(); + markUnavailableShardsAsStale(replicaRequest, replicationGroup); + performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup); + } - successfulShards.incrementAndGet(); // mark primary as successful - decPendingAndFinishIfNeeded(); + successfulShards.incrementAndGet(); // mark primary as successful + decPendingAndFinishIfNeeded(); + }, resultListener::onFailure)); } private void markUnavailableShardsAsStale(ReplicaRequest replicaRequest, ReplicationGroup replicationGroup) { @@ -310,9 +313,9 @@ public interface Primary< * also complete after. Deal with it. * * @param request the request to perform - * @return the request to send to the replicas + * @param listener result listener */ - PrimaryResultT perform(RequestT request) throws Exception; + void perform(RequestT request, ActionListener listener); /** * Notifies the primary of a local checkpoint for the given allocation. diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java index cc0c69418d7eb..be2a5d902ae4e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java @@ -21,6 +21,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; +import org.elasticsearch.Assertions; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.Version; @@ -197,8 +198,8 @@ protected void resolveRequest(final IndexMetaData indexMetaData, final Request r * @param shardRequest the request to the primary shard * @param primary the primary shard to perform the operation on */ - protected abstract PrimaryResult shardOperationOnPrimary( - Request shardRequest, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener); /** * Synchronously execute the specified replica operation. This is done under a permit from @@ -479,7 +480,7 @@ protected ReplicationOperation, + public static class PrimaryResult, Response extends ReplicationResponse> implements ReplicationOperation.PrimaryResult { final ReplicaRequest replicaRequest; @@ -1029,11 +1030,15 @@ public void failShard(String reason, Exception e) { } @Override - public PrimaryResult perform(Request request) throws Exception { - PrimaryResult result = shardOperationOnPrimary(request, indexShard); - assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() - + "] with a primary failure [" + result.finalFailure + "]"; - return result; + public void perform(Request request, ActionListener> listener) { + if (Assertions.ENABLED) { + listener = ActionListener.map(listener, result -> { + assert result.replicaRequest() == null || result.finalFailure == null : "a replica request [" + result.replicaRequest() + + "] with a primary failure [" + result.finalFailure + "]"; + return result; + }); + } + shardOperationOnPrimary(request, indexShard, listener); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java index 4781682437545..1f7aca950a43d 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportWriteAction.java @@ -102,12 +102,12 @@ protected ReplicationOperation.Replicas newReplicasProxy(long primaryTerm) { /** * Called on the primary with a reference to the primary {@linkplain IndexShard} to modify. * - * @return the result of the operation on primary, including current translog location and operation response and failure - * async refresh is performed on the primary shard according to the Request refresh policy + * @param listener listener for the result of the operation on primary, including current translog location and operation response + * and failure async refresh is performed on the primary shard according to the Request refresh policy */ @Override - protected abstract WritePrimaryResult shardOperationOnPrimary( - Request request, IndexShard primary) throws Exception; + protected abstract void shardOperationOnPrimary( + Request request, IndexShard primary, ActionListener> listener); /** * Called once per replica with a reference to the replica {@linkplain IndexShard} to modify. diff --git a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java index 14c360168f904..c1895af0db254 100644 --- a/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java +++ b/server/src/main/java/org/elasticsearch/cluster/action/index/MappingUpdatedAction.java @@ -19,7 +19,9 @@ package org.elasticsearch.cluster.action.index; -import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuilder; +import org.elasticsearch.ElasticsearchException; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.support.master.AcknowledgedResponse; import org.elasticsearch.action.support.master.MasterNodeRequest; import org.elasticsearch.client.Client; import org.elasticsearch.client.IndicesAdminClient; @@ -29,6 +31,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.concurrent.UncategorizedExecutionException; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.Index; import org.elasticsearch.index.mapper.MapperService; @@ -57,34 +60,50 @@ private void setDynamicMappingUpdateTimeout(TimeValue dynamicMappingUpdateTimeou this.dynamicMappingUpdateTimeout = dynamicMappingUpdateTimeout; } - public void setClient(Client client) { this.client = client.admin().indices(); } - private PutMappingRequestBuilder updateMappingRequest(Index index, String type, Mapping mappingUpdate, final TimeValue timeout) { - if (type.equals(MapperService.DEFAULT_MAPPING)) { - throw new IllegalArgumentException("_default_ mapping should not be updated"); - } - return client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) - .setMasterNodeTimeout(timeout).setTimeout(TimeValue.ZERO); - } - - /** - * Same as {@link #updateMappingOnMaster(Index, String, Mapping, TimeValue)} - * using the default timeout. - */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate) { - updateMappingOnMaster(index, type, mappingUpdate, dynamicMappingUpdateTimeout); - } - /** * Update mappings on the master node, waiting for the change to be committed, * but not for the mapping update to be applied on all nodes. The timeout specified by * {@code timeout} is the master node timeout ({@link MasterNodeRequest#masterNodeTimeout()}), * potentially waiting for a master node to be available. */ - public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, TimeValue masterNodeTimeout) { - updateMappingRequest(index, type, mappingUpdate, masterNodeTimeout).get(); + public void updateMappingOnMaster(Index index, String type, Mapping mappingUpdate, ActionListener listener) { + if (type.equals(MapperService.DEFAULT_MAPPING)) { + throw new IllegalArgumentException("_default_ mapping should not be updated"); + } + client.preparePutMapping().setConcreteIndex(index).setType(type).setSource(mappingUpdate.toString(), XContentType.JSON) + .setMasterNodeTimeout(dynamicMappingUpdateTimeout).setTimeout(TimeValue.ZERO) + .execute(new ActionListener() { + @Override + public void onResponse(AcknowledgedResponse acknowledgedResponse) { + listener.onResponse(null); + } + + @Override + public void onFailure(Exception e) { + listener.onFailure(unwrapException(e)); + } + }); + } + + private static Exception unwrapException(Exception e) { + final Throwable cause = e.getCause(); + if (cause instanceof ElasticsearchException) { + ElasticsearchException esEx = (ElasticsearchException) cause; + Throwable root = esEx.unwrapCause(); + if (root instanceof ElasticsearchException) { + return (ElasticsearchException) root; + } else if (root instanceof RuntimeException) { + return (RuntimeException) root; + } + return new UncategorizedExecutionException("Failed execution", root); + } else if (cause instanceof RuntimeException) { + return (RuntimeException) cause; + } else { + return e; + } } } diff --git a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java index 9b55cff8cff9a..a3fb68934134a 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java @@ -117,10 +117,14 @@ protected void sendReplicaRequest( } @Override - protected PrimaryResult shardOperationOnPrimary( - final Request request, final IndexShard indexShard) throws Exception { - maybeSyncTranslog(indexShard); - return new PrimaryResult<>(request, new ReplicationResponse()); + protected void shardOperationOnPrimary(Request request, IndexShard indexShard, + ActionListener> listener) { + try { + maybeSyncTranslog(indexShard); + listener.onResponse(new PrimaryResult<>(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java index d454c2de75b28..4da562779bbf3 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncAction.java @@ -121,14 +121,18 @@ public void backgroundSync( } @Override - protected PrimaryResult shardOperationOnPrimary( + protected void shardOperationOnPrimary( final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new PrimaryResult<>(request, new ReplicationResponse()); + final IndexShard primary, ActionListener> listener) { + try { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + listener.onResponse(new PrimaryResult<>(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java index d4845d92a3a6f..3aed373fbbf50 100644 --- a/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java +++ b/server/src/main/java/org/elasticsearch/index/seqno/RetentionLeaseSyncAction.java @@ -123,14 +123,17 @@ public void sync( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final Request request, - final IndexShard primary) throws WriteStateException { - assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); - Objects.requireNonNull(request); - Objects.requireNonNull(primary); - primary.persistRetentionLeases(); - return new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger()); + protected void shardOperationOnPrimary(Request request, IndexShard primary, + ActionListener> listener) { + try { + assert request.waitForActiveShards().equals(ActiveShardCount.NONE) : request.waitForActiveShards(); + Objects.requireNonNull(request); + Objects.requireNonNull(primary); + primary.persistRetentionLeases(); + listener.onResponse(new WritePrimaryResult<>(request, new Response(), null, null, primary, getLogger())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java index 77bddbe215652..bef05ecda9fd8 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -110,7 +110,7 @@ public List getNamedXContents() { ); } - private Map getMappers(List mapperPlugins) { + public static Map getMappers(List mapperPlugins) { Map mappers = new LinkedHashMap<>(); // builtin mappers @@ -168,7 +168,7 @@ private static Map initBuiltInMetadataMa return Collections.unmodifiableMap(builtInMetadataMappers); } - private static Map getMetadataMappers(List mapperPlugins) { + public static Map getMetadataMappers(List mapperPlugins) { Map metadataMappers = new LinkedHashMap<>(); int i = 0; diff --git a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java index 687b01680704e..050e682b0737f 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/indices/close/TransportVerifyShardBeforeCloseActionTests.java @@ -19,6 +19,7 @@ package org.elasticsearch.action.admin.indices.close; import org.apache.lucene.util.SetOnce; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.indices.flush.FlushRequest; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.PlainActionFuture; @@ -60,6 +61,8 @@ import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.elasticsearch.action.support.replication.ClusterStateCreationUtils.state; @@ -133,18 +136,28 @@ public static void afterClass() { threadPool = null; } - private void executeOnPrimaryOrReplica() throws Exception { + private void executeOnPrimaryOrReplica() throws Throwable { final TaskId taskId = new TaskId("_node_id", randomNonNegativeLong()); final TransportVerifyShardBeforeCloseAction.ShardRequest request = new TransportVerifyShardBeforeCloseAction.ShardRequest(indexShard.shardId(), clusterBlock, taskId); - if (randomBoolean()) { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); - } else { - assertNotNull(action.shardOperationOnPrimary(request, indexShard)); + final CompletableFuture res = new CompletableFuture<>(); + action.shardOperationOnPrimary(request, indexShard, ActionListener.wrap( + r -> { + assertNotNull(r); + res.complete(null); + }, + res::completeExceptionally + )); + try { + res.get(); + } catch (InterruptedException e) { + throw new AssertionError(e); + } catch (ExecutionException e) { + throw e.getCause(); } } - public void testShardIsFlushed() throws Exception { + public void testShardIsFlushed() throws Throwable { final ArgumentCaptor flushRequest = ArgumentCaptor.forClass(FlushRequest.class); when(indexShard.flush(flushRequest.capture())).thenReturn(new Engine.CommitId(new byte[0])); @@ -171,7 +184,7 @@ public void testOperationFailsWithNoBlock() { verify(indexShard, times(0)).flush(any(FlushRequest.class)); } - public void testVerifyShardBeforeIndexClosing() throws Exception { + public void testVerifyShardBeforeIndexClosing() throws Throwable { executeOnPrimaryOrReplica(); verify(indexShard, times(1)).verifyShardBeforeIndexClosing(); verify(indexShard, times(1)).flush(any(FlushRequest.class)); @@ -271,8 +284,9 @@ public ReplicationGroup getReplicationGroup() { } @Override - public PrimaryResult perform(TransportVerifyShardBeforeCloseAction.ShardRequest request) throws Exception { - return new PrimaryResult(request); + public void perform( + TransportVerifyShardBeforeCloseAction.ShardRequest request, ActionListener listener) { + listener.onResponse(new PrimaryResult(request)); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java index 8fa8060b38536..f82ca6e692e04 100644 --- a/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/bulk/TransportShardBulkActionTests.java @@ -22,13 +22,16 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.Version; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.DocWriteResponse; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.bulk.TransportShardBulkAction.ReplicaItemExecutionMode; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.replication.TransportWriteAction.WritePrimaryResult; import org.elasticsearch.action.update.UpdateHelper; @@ -75,6 +78,8 @@ public class TransportShardBulkActionTests extends IndexShardTestCase { + private static final ActionListener ASSERTING_DONE_LISTENER = ActionTestUtils.assertNoFailureListener(r -> {}); + private final ShardId shardId = new ShardId("index", "_na_", 0); private final Settings idxSettings = Settings.builder() .put("index.number_of_shards", 1) @@ -146,10 +151,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); - UpdateHelper updateHelper = null; BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog should change, since there were no problems @@ -174,8 +178,9 @@ public void testExecuteBulkIndexRequest() throws Exception { randomlySetIgnoredPrimaryResponse(primaryRequest); BulkPrimaryExecutionContext secondContext = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(secondContext, updateHelper, - threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(secondContext, null, + threadPool::absoluteTimeInMillis, new ThrowingMappingUpdatePerformer(new RuntimeException("fail")), + listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(secondContext.getLocationToSync()); @@ -224,37 +229,44 @@ public void testSkipBulkIndexRequestIfAborted() throws Exception { final ElasticsearchStatusException rejectionCause = new ElasticsearchStatusException("testing rejection", rejectionStatus); rejectItem.abort("index", rejectionCause); - UpdateHelper updateHelper = null; - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( - bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( + bulkShardRequest, shard, null, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), + listener -> {}, ActionListener.runAfter( + ActionTestUtils.assertNoFailureListener(result -> { + // since at least 1 item passed, the tran log location should exist, + assertThat(((WritePrimaryResult) result).location, notNullValue()); + // and the response should exist and match the item count + assertThat(result.finalResponseIfSuccessful, notNullValue()); + assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); + + // check each response matches the input item, including the rejection + for (int i = 0; i < items.length; i++) { + BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; + assertThat(response.getItemId(), equalTo(i)); + assertThat(response.getIndex(), equalTo("index")); + assertThat(response.getType(), equalTo("_doc")); + assertThat(response.getId(), equalTo("id_" + i)); + assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); + if (response.getItemId() == rejectItem.id()) { + assertTrue(response.isFailed()); + assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); + assertThat(response.status(), equalTo(rejectionStatus)); + } else { + assertFalse(response.isFailed()); + } + } + + // Check that the non-rejected updates made it to the shard + try { + assertDocCount(shard, items.length - 1); + closeShards(shard); + } catch (IOException e) { + throw new AssertionError(e); + } + }), latch::countDown), threadPool); - // since at least 1 item passed, the tran log location should exist, - assertThat(result.location, notNullValue()); - // and the response should exist and match the item count - assertThat(result.finalResponseIfSuccessful, notNullValue()); - assertThat(result.finalResponseIfSuccessful.getResponses(), arrayWithSize(items.length)); - - // check each response matches the input item, including the rejection - for (int i = 0; i < items.length; i++) { - BulkItemResponse response = result.finalResponseIfSuccessful.getResponses()[i]; - assertThat(response.getItemId(), equalTo(i)); - assertThat(response.getIndex(), equalTo("index")); - assertThat(response.getType(), equalTo("_doc")); - assertThat(response.getId(), equalTo("id_" + i)); - assertThat(response.getOpType(), equalTo(DocWriteRequest.OpType.INDEX)); - if (response.getItemId() == rejectItem.id()) { - assertTrue(response.isFailed()); - assertThat(response.getFailure().getCause(), equalTo(rejectionCause)); - assertThat(response.status(), equalTo(rejectionStatus)); - } else { - assertFalse(response.isFailed()); - } - } - - // Check that the non-rejected updates made it to the shard - assertDocCount(shard, items.length - 1); - closeShards(shard); + latch.await(); } public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { @@ -281,11 +293,12 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); AtomicInteger updateCalled = new AtomicInteger(); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> { + (update, shardId, type, listener) -> { // There should indeed be a mapping update assertNotNull(update); updateCalled.incrementAndGet(); - }, () -> {}); + listener.onResponse(null); + }, listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertTrue(context.isInitial()); assertTrue(context.hasMoreOperationsToExecute()); @@ -298,7 +311,8 @@ public void testExecuteBulkIndexRequestWithMappingUpdates() throws Exception { .thenReturn(success); TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - (update, shardId, type) -> fail("should not have had to update the mappings"), () -> {}); + (update, shardId, type, listener) -> fail("should not have had to update the mappings"), listener -> {}, + ASSERTING_DONE_LISTENER); // Verify that the shard "executed" the operation only once (1 for previous invocations plus @@ -325,8 +339,6 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - UpdateHelper updateHelper = null; - // Return an exception when trying to update the mapping, or when waiting for it to come RuntimeException err = new RuntimeException("some kind of exception"); @@ -335,9 +347,22 @@ public void testExecuteBulkIndexRequestWithErrorWhileUpdatingMapping() throws Ex randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.executeBulkItemRequest( + context, null, threadPool::relativeTimeInMillis, errorOnWait == false ? new ThrowingMappingUpdatePerformer(err) : new NoopMappingUpdatePerformer(), - errorOnWait ? () -> { throw err; } : () -> {}); + errorOnWait ? listener -> listener.onFailure(err) : listener -> listener.onResponse(null), + new LatchedActionListener<>(new ActionListener() { + @Override + public void onResponse(Void aVoid) { + } + + @Override + public void onFailure(final Exception e) { + assertEquals(err, e); + } + }, latch)); + latch.await(); assertFalse(context.hasMoreOperationsToExecute()); // Translog shouldn't be synced, as there were conflicting mappings @@ -371,13 +396,12 @@ public void testExecuteBulkDeleteRequest() throws Exception { new BulkShardRequest(shardId, RefreshPolicy.NONE, items); Translog.Location location = new Translog.Location(0, 0, 0); - UpdateHelper updateHelper = null; randomlySetIgnoredPrimaryResponse(items[0]); BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, even though the document didn't exist @@ -418,8 +442,8 @@ public void testExecuteBulkDeleteRequest() throws Exception { randomlySetIgnoredPrimaryResponse(items[0]); context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); - TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Translog changes, because the document was deleted @@ -475,7 +499,7 @@ public void testNoopUpdateRequest() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); @@ -521,7 +545,7 @@ public void testUpdateRequestWithFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Since this was not a conflict failure, the primary response @@ -571,7 +595,7 @@ public void testUpdateRequestWithConflictFailure() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -618,7 +642,7 @@ public void testUpdateRequestWithSuccess() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -664,7 +688,7 @@ public void testUpdateWithDelete() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> listener.onResponse(null), ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); // Check that the translog is successfully advanced @@ -698,7 +722,7 @@ public void testFailureDuringUpdateProcessing() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); TransportShardBulkAction.executeBulkItemRequest(context, updateHelper, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); assertFalse(context.hasMoreOperationsToExecute()); assertNull(context.getLocationToSync()); @@ -731,7 +755,7 @@ public void testTranslogPositionToSync() throws Exception { BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(bulkShardRequest, shard); while (context.hasMoreOperationsToExecute()) { TransportShardBulkAction.executeBulkItemRequest(context, null, threadPool::absoluteTimeInMillis, - new NoopMappingUpdatePerformer(), () -> {}); + new NoopMappingUpdatePerformer(), listener -> {}, ASSERTING_DONE_LISTENER); } assertTrue(shard.isSyncNeeded()); @@ -814,18 +838,23 @@ public void testRetries() throws Exception { BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, RefreshPolicy.NONE, items); - WritePrimaryResult result = TransportShardBulkAction.performOnPrimary( + final CountDownLatch latch = new CountDownLatch(1); + TransportShardBulkAction.performOnPrimary( bulkShardRequest, shard, updateHelper, threadPool::absoluteTimeInMillis, new NoopMappingUpdatePerformer(), - () -> {}); + listener -> listener.onResponse(null), + new LatchedActionListener<>( + ActionTestUtils.assertNoFailureListener(result -> { + assertThat(((WritePrimaryResult) result).location, equalTo(resultLocation)); + BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); + assertThat(primaryResponse.getItemId(), equalTo(0)); + assertThat(primaryResponse.getId(), equalTo("id")); + assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); + DocWriteResponse response = primaryResponse.getResponse(); + assertThat(response.status(), equalTo(RestStatus.CREATED)); + assertThat(response.getSeqNo(), equalTo(13L)); + }), latch), threadPool); - assertThat(result.location, equalTo(resultLocation)); - BulkItemResponse primaryResponse = result.replicaRequest().items()[0].getPrimaryResponse(); - assertThat(primaryResponse.getItemId(), equalTo(0)); - assertThat(primaryResponse.getId(), equalTo("id")); - assertThat(primaryResponse.getOpType(), equalTo(DocWriteRequest.OpType.UPDATE)); - DocWriteResponse response = primaryResponse.getResponse(); - assertThat(response.status(), equalTo(RestStatus.CREATED)); - assertThat(response.getSeqNo(), equalTo(13L)); + latch.await(); } private void randomlySetIgnoredPrimaryResponse(BulkItemRequest primaryRequest) { @@ -875,7 +904,8 @@ public Translog.Location getTranslogLocation() { /** Doesn't perform any mapping updates */ public static class NoopMappingUpdatePerformer implements MappingUpdatePerformer { @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onResponse(null); } } @@ -888,8 +918,8 @@ private class ThrowingMappingUpdatePerformer implements MappingUpdatePerformer { } @Override - public void updateMappings(Mapping update, ShardId shardId, String type) { - throw e; + public void updateMappings(Mapping update, ShardId shardId, String type, ActionListener listener) { + listener.onFailure(e); } } } diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java index 8adb9c2f26b1a..488efd42063fe 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/ReplicationOperationTests.java @@ -284,11 +284,12 @@ public void testAddedReplicaAfterPrimaryOperation() throws Exception { final ShardRouting primaryShard = updatedReplicationGroup.getRoutingTable().primaryShard(); final TestPrimary primary = new TestPrimary(primaryShard, replicationGroup::get) { @Override - public Result perform(Request request) throws Exception { - Result result = super.perform(request); - replicationGroup.set(updatedReplicationGroup); - logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); - return result; + public void perform(Request request, ActionListener listener) { + super.perform(request, ActionListener.map(listener, result -> { + replicationGroup.set(updatedReplicationGroup); + logger.debug("--> state after primary operation:\n{}", replicationGroup.get()); + return result; + })); } }; @@ -482,11 +483,11 @@ public void failShard(String message, Exception exception) { } @Override - public Result perform(Request request) throws Exception { + public void perform(Request request, ActionListener listener) { if (request.processedOnPrimary.compareAndSet(false, true) == false) { fail("processed [" + request + "] twice"); } - return new Result(request); + listener.onResponse(new Result(request)); } static class Result implements ReplicationOperation.PrimaryResult { diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java index ffc9c2bf70a8a..340579239d29d 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationActionTests.java @@ -27,6 +27,7 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.ActiveShardCount; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.replication.ReplicationOperation.ReplicaResponse; @@ -684,16 +685,16 @@ public void testPrimaryReference() throws Exception { }; TestAction.PrimaryShardReference primary = action.new PrimaryShardReference(shard, releasable); final Request request = new Request(); - Request replicaRequest = (Request) primary.perform(request).replicaRequest; + primary.perform(request, ActionTestUtils.assertNoFailureListener(r -> { + final ElasticsearchException exception = new ElasticsearchException("testing"); + primary.failShard("test", exception); - final ElasticsearchException exception = new ElasticsearchException("testing"); - primary.failShard("test", exception); + verify(shard).failShard("test", exception); - verify(shard).failShard("test", exception); + primary.close(); - primary.close(); - - assertTrue(closed.get()); + assertTrue(closed.get()); + })); } public void testReplicaProxy() throws InterruptedException, ExecutionException { @@ -1221,10 +1222,11 @@ protected TestResponse newResponseInstance() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard primary) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard primary, + ActionListener> listener) { boolean executedBefore = shardRequest.processedOnPrimary.getAndSet(true); assert executedBefore == false : "request has already been executed on the primary"; - return new PrimaryResult(shardRequest, new TestResponse()); + listener.onResponse(new PrimaryResult(shardRequest, new TestResponse())); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java index 1cb1bfde34ea8..1e6a4bf168b6f 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportReplicationAllPermitsAcquisitionTests.java @@ -377,12 +377,13 @@ public String getActionName() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { executedOnPrimary.set(true); // The TransportReplicationAction.getIndexShard() method is overridden for testing purpose but we double check here // that the permit has been acquired on the primary shard assertSame(primary, shard); - return new PrimaryResult<>(shardRequest, new Response()); + listener.onResponse(new PrimaryResult<>(shardRequest, new Response())); } @Override @@ -464,10 +465,11 @@ public ClusterBlockLevel indexBlockLevel() { } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertNoBlocks("block must not exist when executing the operation on primary shard: it should have been blocked before"); assertThat(shard.getActiveOperationsCount(), greaterThan(0)); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override @@ -509,9 +511,10 @@ protected void acquireReplicaOperationPermit(IndexShard shard, Request request, } @Override - protected PrimaryResult shardOperationOnPrimary(Request shardRequest, IndexShard shard) throws Exception { + protected void shardOperationOnPrimary(Request shardRequest, IndexShard shard, + ActionListener> listener) { assertEquals("All permits must be acquired", 0, shard.getActiveOperationsCount()); - return super.shardOperationOnPrimary(shardRequest, shard); + super.shardOperationOnPrimary(shardRequest, shard, listener); } @Override diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java index f540374a56c20..dea0d0747cedc 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/TransportWriteActionTests.java @@ -22,6 +22,7 @@ import org.elasticsearch.ElasticsearchException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest.RefreshPolicy; import org.elasticsearch.action.support.WriteResponse; @@ -138,14 +139,15 @@ public void testPrimaryNoRefreshCall() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.NONE); // The default, but we'll set it anyway just to be explicit TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - verify(indexShard, never()).refresh(any()); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + verify(indexShard, never()).refresh(any()); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaNoRefreshCall() throws Exception { @@ -166,15 +168,16 @@ public void testPrimaryImmediateRefresh() throws Exception { TestRequest request = new TestRequest(); request.setRefreshPolicy(RefreshPolicy.IMMEDIATE); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNotNull(listener.response); - assertNull(listener.failure); - assertTrue(listener.response.forcedRefresh); - verify(indexShard).refresh("refresh_flag_index"); - verify(indexShard, never()).addRefreshListener(any(), any()); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNotNull(listener.response); + assertNull(listener.failure); + assertTrue(listener.response.forcedRefresh); + verify(indexShard).refresh("refresh_flag_index"); + verify(indexShard, never()).addRefreshListener(any(), any()); + })); } public void testReplicaImmediateRefresh() throws Exception { @@ -196,23 +199,24 @@ public void testPrimaryWaitForRefresh() throws Exception { request.setRefreshPolicy(RefreshPolicy.WAIT_UNTIL); TestAction testAction = new TestAction(); - TransportWriteAction.WritePrimaryResult result = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - result.respond(listener); - assertNull(listener.response); // Haven't reallresponded yet - - @SuppressWarnings({ "unchecked", "rawtypes" }) - ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); - verify(indexShard, never()).refresh(any()); - verify(indexShard).addRefreshListener(any(), refreshListener.capture()); - - // Now we can fire the listener manually and we'll get a response - boolean forcedRefresh = randomBoolean(); - refreshListener.getValue().accept(forcedRefresh); - assertNotNull(listener.response); - assertNull(listener.failure); - assertEquals(forcedRefresh, listener.response.forcedRefresh); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + CapturingActionListener listener = new CapturingActionListener<>(); + result.respond(listener); + assertNull(listener.response); // Haven't really responded yet + + @SuppressWarnings({"unchecked", "rawtypes"}) + ArgumentCaptor> refreshListener = ArgumentCaptor.forClass((Class) Consumer.class); + verify(indexShard, never()).refresh(any()); + verify(indexShard).addRefreshListener(any(), refreshListener.capture()); + + // Now we can fire the listener manually and we'll get a response + boolean forcedRefresh = randomBoolean(); + refreshListener.getValue().accept(forcedRefresh); + assertNotNull(listener.response); + assertNull(listener.failure); + assertEquals(forcedRefresh, listener.response.forcedRefresh); + })); } public void testReplicaWaitForRefresh() throws Exception { @@ -238,12 +242,13 @@ public void testReplicaWaitForRefresh() throws Exception { public void testDocumentFailureInShardOperationOnPrimary() throws Exception { TestRequest request = new TestRequest(); TestAction testAction = new TestAction(true, true); - TransportWriteAction.WritePrimaryResult writePrimaryResult = - testAction.shardOperationOnPrimary(request, indexShard); - CapturingActionListener listener = new CapturingActionListener<>(); - writePrimaryResult.respond(listener); - assertNull(listener.response); - assertNotNull(listener.failure); + testAction.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(writePrimaryResult -> { + CapturingActionListener listener = new CapturingActionListener<>(); + writePrimaryResult.respond(listener); + assertNull(listener.response); + assertNotNull(listener.failure); + })); } public void testDocumentFailureInShardOperationOnReplica() throws Exception { @@ -424,15 +429,15 @@ protected TestResponse newResponseInstance() { } @Override - protected WritePrimaryResult shardOperationOnPrimary( - TestRequest request, IndexShard primary) throws Exception { + protected void shardOperationOnPrimary( + TestRequest request, IndexShard primary, ActionListener> listener) { final WritePrimaryResult primaryResult; if (withDocumentFailureOnPrimary) { primaryResult = new WritePrimaryResult<>(request, null, null, new RuntimeException("simulated"), primary, logger); } else { primaryResult = new WritePrimaryResult<>(request, new TestResponse(), location, null, primary, logger); } - return primaryResult; + listener.onResponse(primaryResult); } @Override diff --git a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java index 97fc9e9defaa9..cec3c05b28438 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncActionTests.java @@ -18,6 +18,7 @@ package org.elasticsearch.index.seqno; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; @@ -113,7 +114,7 @@ public void testTranslogSyncAfterGlobalCheckpointSync() throws Exception { new IndexNameExpressionResolver()); final GlobalCheckpointSyncAction.Request primaryRequest = new GlobalCheckpointSyncAction.Request(indexShard.shardId()); if (randomBoolean()) { - action.shardOperationOnPrimary(primaryRequest, indexShard); + action.shardOperationOnPrimary(primaryRequest, indexShard, ActionTestUtils.assertNoFailureListener(r -> {})); } else { action.shardOperationOnReplica(new GlobalCheckpointSyncAction.Request(indexShard.shardId()), indexShard); } diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java index 6ad7d5039ae8b..8d527b287eaac 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseBackgroundSyncActionTests.java @@ -21,8 +21,9 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.LatchedActionListener; import org.elasticsearch.action.support.ActionFilters; -import org.elasticsearch.action.support.replication.ReplicationOperation; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.ReplicationResponse; import org.elasticsearch.action.support.replication.TransportReplicationAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; @@ -46,6 +47,7 @@ import org.mockito.ArgumentCaptor; import java.util.Collections; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; import static org.elasticsearch.mock.orig.Mockito.verifyNoMoreInteractions; @@ -66,6 +68,7 @@ public class RetentionLeaseBackgroundSyncActionTests extends ESTestCase { private TransportService transportService; private ShardStateAction shardStateAction; + @Override public void setUp() throws Exception { super.setUp(); threadPool = new TestThreadPool(getClass().getName()); @@ -83,6 +86,7 @@ public void setUp() throws Exception { shardStateAction = new ShardStateAction(clusterService, transportService, null, null, threadPool); } + @Override public void tearDown() throws Exception { try { IOUtils.close(transportService, clusterService, transport); @@ -92,7 +96,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws InterruptedException { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -119,12 +123,19 @@ public void testRetentionLeaseBackgroundSyncActionOnPrimary() throws WriteStateE final RetentionLeaseBackgroundSyncAction.Request request = new RetentionLeaseBackgroundSyncAction.Request(indexShard.shardId(), retentionLeases); - final ReplicationOperation.PrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); + final CountDownLatch latch = new CountDownLatch(1); + action.shardOperationOnPrimary(request, indexShard, + new LatchedActionListener<>(ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + try { + verify(indexShard).persistRetentionLeases(); + } catch (WriteStateException e) { + throw new AssertionError(e); + } + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + }), latch)); + latch.await(); } public void testRetentionLeaseBackgroundSyncActionOnReplica() throws WriteStateException { diff --git a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java index 9b9ad6a0962c1..fa7e50b5fd01e 100644 --- a/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/index/seqno/RetentionLeaseSyncActionTests.java @@ -22,6 +22,7 @@ import org.apache.lucene.store.AlreadyClosedException; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.replication.TransportWriteAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; @@ -90,7 +91,7 @@ public void tearDown() throws Exception { super.tearDown(); } - public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { + public void testRetentionLeaseSyncActionOnPrimary() { final IndicesService indicesService = mock(IndicesService.class); final Index index = new Index("index", "uuid"); @@ -115,15 +116,20 @@ public void testRetentionLeaseSyncActionOnPrimary() throws WriteStateException { new IndexNameExpressionResolver()); final RetentionLeases retentionLeases = mock(RetentionLeases.class); final RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(indexShard.shardId(), retentionLeases); - - final TransportWriteAction.WritePrimaryResult result = - action.shardOperationOnPrimary(request, indexShard); - // the retention leases on the shard should be persisted - verify(indexShard).persistRetentionLeases(); - // we should forward the request containing the current retention leases to the replica - assertThat(result.replicaRequest(), sameInstance(request)); - // we should start with an empty replication response - assertNull(result.finalResponseIfSuccessful.getShardInfo()); + action.shardOperationOnPrimary(request, indexShard, + ActionTestUtils.assertNoFailureListener(result -> { + // the retention leases on the shard should be persisted + try { + verify(indexShard).persistRetentionLeases(); + } catch (WriteStateException e) { + throw new AssertionError(e); + } + // we should forward the request containing the current retention leases to the replica + assertThat(result.replicaRequest(), sameInstance(request)); + // we should start with an empty replication response + assertNull(result.finalResponseIfSuccessful.getShardInfo()); + } + )); } public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { @@ -156,7 +162,7 @@ public void testRetentionLeaseSyncActionOnReplica() throws WriteStateException { action.shardOperationOnReplica(request, indexShard); // the retention leases on the shard should be updated verify(indexShard).updateRetentionLeasesOnReplica(retentionLeases); - // the retention leases on the shard should be persisteed + // the retention leases on the shard should be persisted verify(indexShard).persistRetentionLeases(); // the result should indicate success final AtomicBoolean success = new AtomicBoolean(); diff --git a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java index cdc76292be06a..6da7b4474ddee 100644 --- a/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java +++ b/test/framework/src/main/java/org/elasticsearch/action/support/ActionTestUtils.java @@ -19,9 +19,12 @@ package org.elasticsearch.action.support; +import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; +import java.util.function.Consumer; + import static org.elasticsearch.action.support.PlainActionFuture.newFuture; public class ActionTestUtils { @@ -34,4 +37,18 @@ Response executeBlocking(TransportAction action, Request requ action.execute(request, future); return future.actionGet(); } + + public static ActionListener assertNoFailureListener(Consumer consumer) { + return new ActionListener() { + @Override + public void onResponse(T t) { + consumer.accept(t); + } + + @Override + public void onFailure(Exception e) { + throw new AssertionError(e); + } + }; + } } diff --git a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java index 8b984c22bfd0e..d75629a47cd31 100644 --- a/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/index/replication/ESIndexLevelReplicationTestCase.java @@ -36,6 +36,7 @@ import org.elasticsearch.action.resync.ResyncReplicationRequest; import org.elasticsearch.action.resync.ResyncReplicationResponse; import org.elasticsearch.action.resync.TransportResyncReplicationAction; +import org.elasticsearch.action.support.ActionTestUtils; import org.elasticsearch.action.support.PlainActionFuture; import org.elasticsearch.action.support.WriteRequest; import org.elasticsearch.action.support.replication.ReplicatedWriteRequest; @@ -90,6 +91,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; @@ -191,14 +193,15 @@ public void backgroundSync(ShardId shardId, RetentionLeases retentionLeases) { sync(shardId, retentionLeases, ActionListener.wrap( r -> { }, e -> { - throw new AssertionError("failed to backgroun sync retention lease", e); + throw new AssertionError("failed to background sync retention lease", e); })); } }; protected ReplicationGroup(final IndexMetaData indexMetaData) throws IOException { final ShardRouting primaryRouting = this.createShardRouting("s0", true); - primary = newShard(primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); + primary = newShard( + primaryRouting, indexMetaData, null, getEngineFactory(primaryRouting), () -> {}, retentionLeaseSyncer); replicas = new CopyOnWriteArrayList<>(); this.indexMetaData = indexMetaData; updateAllocationIDsOnPrimary(); @@ -254,9 +257,8 @@ public BulkItemResponse delete(DeleteRequest deleteRequest) throws Exception { private BulkItemResponse executeWriteRequest( DocWriteRequest writeRequest, WriteRequest.RefreshPolicy refreshPolicy) throws Exception { PlainActionFuture listener = new PlainActionFuture<>(); - final ActionListener wrapBulkListener = ActionListener.wrap( - bulkShardResponse -> listener.onResponse(bulkShardResponse.getResponses()[0]), - listener::onFailure); + final ActionListener wrapBulkListener = + ActionListener.map(listener, bulkShardResponse -> bulkShardResponse.getResponses()[0]); BulkItemRequest[] items = new BulkItemRequest[1]; items[0] = new BulkItemRequest(0, writeRequest); BulkShardRequest request = new BulkShardRequest(shardId, refreshPolicy, items); @@ -309,8 +311,7 @@ public IndexShard addReplica() throws IOException { } public synchronized void addReplica(IndexShard replica) throws IOException { - assert shardRoutings().stream() - .filter(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())).findFirst().isPresent() == false : + assert shardRoutings().stream().anyMatch(shardRouting -> shardRouting.isSameAllocation(replica.routingEntry())) == false : "replica with aId [" + replica.routingEntry().allocationId() + "] already exists"; replicas.add(replica); if (replicationTargets != null) { @@ -533,10 +534,8 @@ private synchronized ReplicationTargets getReplicationTargets() { } protected void syncRetentionLeases(ShardId shardId, RetentionLeases leases, ActionListener listener) { - RetentionLeaseSyncAction.Request request = new RetentionLeaseSyncAction.Request(shardId, leases); - ActionListener wrappedListener = ActionListener.wrap( - r -> listener.onResponse(new ReplicationResponse()), listener::onFailure); - new SyncRetentionLeases(request, ReplicationGroup.this, wrappedListener).execute(); + new SyncRetentionLeases(new RetentionLeaseSyncAction.Request(shardId, leases), this, + ActionListener.map(listener, r -> new ReplicationResponse())).execute(); } public synchronized RetentionLease addRetentionLease(String id, long retainingSequenceNumber, String source, @@ -611,17 +610,8 @@ protected ReplicationAction(Request request, ActionListener listener, public void execute() { try { new ReplicationOperation<>(request, new PrimaryRef(), - new ActionListener() { - @Override - public void onResponse(PrimaryResult result) { - result.respond(listener); - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, new ReplicasRef(), logger, opType).execute(); + ActionListener.wrap(result -> result.respond(listener), listener::onFailure), new ReplicasRef(), logger, opType + ).execute(); } catch (Exception e) { listener.onFailure(e); } @@ -631,7 +621,7 @@ IndexShard getPrimaryShard() { return replicationTargets.primary; } - protected abstract PrimaryResult performOnPrimary(IndexShard primary, Request request) throws Exception; + protected abstract void performOnPrimary(IndexShard primary, Request request, ActionListener listener); protected abstract void performOnReplica(ReplicaRequest request, IndexShard replica) throws Exception; @@ -648,8 +638,8 @@ public void failShard(String message, Exception exception) { } @Override - public PrimaryResult perform(Request request) throws Exception { - return performOnPrimary(getPrimaryShard(), request); + public void perform(Request request, ActionListener listener) { + performOnPrimary(getPrimaryShard(), request, listener); } @Override @@ -763,10 +753,9 @@ class WriteReplicationAction extends ReplicationAction - result = executeShardBulkOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, BulkShardRequest request, ActionListener listener) { + executeShardBulkOnPrimary(primary, request, + ActionListener.map(listener, result -> new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful))); } @Override @@ -776,8 +765,8 @@ protected void performOnReplica(BulkShardRequest request, IndexShard replica) th } } - private TransportWriteAction.WritePrimaryResult executeShardBulkOnPrimary( - IndexShard primary, BulkShardRequest request) throws Exception { + private void executeShardBulkOnPrimary(IndexShard primary, BulkShardRequest request, + ActionListener> listener) { for (BulkItemRequest itemRequest : request.items()) { if (itemRequest.request() instanceof IndexRequest) { ((IndexRequest) itemRequest.request()).process(Version.CURRENT, null, index.getName()); @@ -785,21 +774,37 @@ private TransportWriteAction.WritePrimaryResult permitAcquiredFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitAcquiredFuture, ThreadPool.Names.SAME, request); - final TransportWriteAction.WritePrimaryResult result; try (Releasable ignored = permitAcquiredFuture.actionGet()) { - MappingUpdatePerformer noopMappingUpdater = (update, shardId, type) -> { }; - result = TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, - null); + MappingUpdatePerformer noopMappingUpdater = (update, shardId, type, listener1) -> {}; + TransportShardBulkAction.performOnPrimary(request, primary, null, System::currentTimeMillis, noopMappingUpdater, + null, ActionTestUtils.assertNoFailureListener(result -> { + TransportWriteActionTestHelper.performPostWriteActions(primary, request, + ((TransportWriteAction.WritePrimaryResult) result).location, logger); + listener.onResponse((TransportWriteAction.WritePrimaryResult) result); + }), threadPool); + } catch (Exception e) { + listener.onFailure(e); } - TransportWriteActionTestHelper.performPostWriteActions(primary, request, result.location, logger); - return result; } - private - BulkShardRequest executeReplicationRequestOnPrimary(IndexShard primary, Request request) throws Exception { + private BulkShardRequest executeReplicationRequestOnPrimary( + IndexShard primary, Request request) throws Exception { final BulkShardRequest bulkShardRequest = new BulkShardRequest(shardId, request.getRefreshPolicy(), new BulkItemRequest[]{new BulkItemRequest(0, request)}); - return executeShardBulkOnPrimary(primary, bulkShardRequest).replicaRequest(); + final CompletableFuture res = new CompletableFuture<>(); + executeShardBulkOnPrimary( + primary, bulkShardRequest, new ActionListener>() { + @Override + public void onResponse(TransportWriteAction.WritePrimaryResult result) { + res.complete(result.replicaRequest()); + } + + @Override + public void onFailure(Exception e) { + res.completeExceptionally(e); + } + }); + return res.get(); } private void executeShardBulkOnReplica(BulkShardRequest request, IndexShard replica, long operationPrimaryTerm, @@ -862,10 +867,14 @@ class GlobalCheckpointSync extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary( - final IndexShard primary, final GlobalCheckpointSyncAction.Request request) throws Exception { - primary.sync(); - return new PrimaryResult(request, new ReplicationResponse()); + protected void performOnPrimary(IndexShard primary, GlobalCheckpointSyncAction.Request request, + ActionListener listener) { + try { + primary.sync(); + listener.onResponse(new PrimaryResult(request, new ReplicationResponse())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override @@ -881,10 +890,14 @@ class ResyncAction extends ReplicationAction result = - executeResyncOnPrimary(primary, request); - return new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful); + protected void performOnPrimary(IndexShard primary, ResyncReplicationRequest request, ActionListener listener) { + try { + final TransportWriteAction.WritePrimaryResult result = + executeResyncOnPrimary(primary, request); + listener.onResponse(new PrimaryResult(result.replicaRequest(), result.finalResponseIfSuccessful)); + } catch (Exception e) { + listener.onFailure(e); + } } @Override @@ -895,7 +908,7 @@ protected void performOnReplica(ResyncReplicationRequest request, IndexShard rep } private TransportWriteAction.WritePrimaryResult executeResyncOnPrimary( - IndexShard primary, ResyncReplicationRequest request) throws Exception { + IndexShard primary, ResyncReplicationRequest request) { final TransportWriteAction.WritePrimaryResult result = new TransportWriteAction.WritePrimaryResult<>(TransportResyncReplicationAction.performOnPrimary(request, primary), new ResyncReplicationResponse(), null, null, primary, logger); @@ -924,9 +937,14 @@ class SyncRetentionLeases extends ReplicationAction< } @Override - protected PrimaryResult performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request) throws Exception { - primary.persistRetentionLeases(); - return new PrimaryResult(request, new RetentionLeaseSyncAction.Response()); + protected void performOnPrimary(IndexShard primary, RetentionLeaseSyncAction.Request request, + ActionListener listener) { + try { + primary.persistRetentionLeases(); + listener.onResponse(new PrimaryResult(request, new RetentionLeaseSyncAction.Response())); + } catch (Exception e) { + listener.onFailure(e); + } } @Override diff --git a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java index 7c3b2da32e88f..2e4ae81fff326 100644 --- a/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java +++ b/x-pack/plugin/ccr/src/main/java/org/elasticsearch/xpack/ccr/action/bulk/TransportBulkShardOperationsAction.java @@ -61,13 +61,17 @@ public TransportBulkShardOperationsAction( } @Override - protected WritePrimaryResult shardOperationOnPrimary( - final BulkShardOperationsRequest request, final IndexShard primary) throws Exception { + protected void shardOperationOnPrimary(BulkShardOperationsRequest request, IndexShard primary, + ActionListener> listener) { if (logger.isTraceEnabled()) { logger.trace("index [{}] on the following primary shard {}", request.getOperations(), primary.routingEntry()); } - return shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), - request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); + try { + listener.onResponse(shardOperationOnPrimary(request.shardId(), request.getHistoryUUID(), request.getOperations(), + request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger)); + } catch (Exception e) { + listener.onFailure(e); + } } public static Translog.Operation rewriteOperationWithPrimaryTerm(Translog.Operation operation, long primaryTerm) { diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index dd495ee5ca587..505077e19c165 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -641,20 +641,25 @@ class CcrAction extends ReplicationAction listener) { final PlainActionFuture permitFuture = new PlainActionFuture<>(); primary.acquirePrimaryOperationPermit(permitFuture, ThreadPool.Names.SAME, request); final TransportWriteAction.WritePrimaryResult ccrResult; - try (Releasable ignored = permitFuture.get()) { - ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), - request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); - } - return new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { - @Override - public void respond(ActionListener listener) { - ccrResult.respond(listener); + try { + try (Releasable ignored = permitFuture.get()) { + ccrResult = TransportBulkShardOperationsAction.shardOperationOnPrimary(primary.shardId(), request.getHistoryUUID(), + request.getOperations(), request.getMaxSeqNoOfUpdatesOrDeletes(), primary, logger); } - }; + listener.onResponse(new PrimaryResult(ccrResult.replicaRequest(), ccrResult.finalResponseIfSuccessful) { + @Override + public void respond(ActionListener listener) { + ccrResult.respond(listener); + } + }); + } catch (Exception e) { + listener.onFailure(e); + } } @Override