Skip to content

Commit

Permalink
Fix Exception Handling for TransportShardBulkAction
Browse files Browse the repository at this point in the history
* Prior to elastic#39793 exceptions for the primary write and delete actions
were bubbled up to the caller so that closed shards would be handled accordingly upstream.
 elastic#39793 accidentally changed the behaviour here and simply marked those exceptions as bulk item failures on the request and kept processing bulk request items on closed shards.
* This fix returns to that behaviour and adjusts the listeners passed in `TransportReplicationAction`
such that they behave like the previous synchronous `catch`.
   * Dried up the exception handling slightly for that and inlined all the listeners to make the logic a little
easier to follow
* Reenable SplitIndexIT now that clsoed shards are properly handled again
* Closes elastic#40944
  • Loading branch information
original-brownbear committed Apr 9, 2019
1 parent d31207a commit c71e36b
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static void performOnPrimary(
private final BulkPrimaryExecutionContext context = new BulkPrimaryExecutionContext(request, primary);

@Override
protected void doRun() {
protected void doRun() throws Exception {
while (context.hasMoreOperationsToExecute()) {
if (executeBulkItemRequest(context, updateHelper, nowInMillisSupplier, mappingUpdater, waitForMappingUpdate,
ActionListener.wrap(v -> executor.execute(this), this::onRejection)) == false) {
Expand All @@ -168,12 +168,6 @@ protected void doRun() {
finishRequest();
}

@Override
public void onFailure(Exception e) {
assert false : "All exceptions should be handled by #executeBulkItemRequest";
onRejection(e);
}

@Override
public void onRejection(Exception e) {
// Fail all operations after a bulk rejection hit an action that waited for a mapping update and finish the request
Expand Down Expand Up @@ -204,7 +198,7 @@ private void finishRequest() {
*/
static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, UpdateHelper updateHelper, LongSupplier nowInMillisSupplier,
MappingUpdatePerformer mappingUpdater, Consumer<ActionListener<Void>> waitForMappingUpdate,
ActionListener<Void> itemDoneListener) {
ActionListener<Void> itemDoneListener) throws Exception {
final DocWriteRequest.OpType opType = context.getCurrent().opType();

final UpdateHelper.Result updateResult;
Expand Down Expand Up @@ -252,55 +246,51 @@ static boolean executeBulkItemRequest(BulkPrimaryExecutionContext context, Updat
final IndexShard primary = context.getPrimary();
final long version = context.getRequestToExecute().version();
final boolean isDelete = context.getRequestToExecute().opType() == DocWriteRequest.OpType.DELETE;
try {
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
context.getRequestToExecute().type(),
new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(
ActionListener.runAfter(new ActionListener<Void>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForExecutionForRetry();
}

@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null))
);
}

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
} catch (Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
final Engine.Result result;
if (isDelete) {
final DeleteRequest request = context.getRequestToExecute();
result = primary.applyDeleteOperationOnPrimary(version, request.type(), request.id(), request.versionType(),
request.ifSeqNo(), request.ifPrimaryTerm());
} else {
final IndexRequest request = context.getRequestToExecute();
result = primary.applyIndexOperationOnPrimary(version, request.versionType(), new SourceToParse(
request.index(), request.type(), request.id(), request.source(), request.getContentType(), request.routing()),
request.ifSeqNo(), request.ifPrimaryTerm(), request.getAutoGeneratedTimestamp(), request.isRetry());
}
if (result.getResultType() == Engine.Result.Type.MAPPING_UPDATE_REQUIRED) {
mappingUpdater.updateMappings(result.getRequiredMappingUpdate(), primary.shardId(),
context.getRequestToExecute().type(),
new ActionListener<>() {
@Override
public void onResponse(Void v) {
context.markAsRequiringMappingUpdate();
waitForMappingUpdate.accept(
ActionListener.runAfter(new ActionListener<>() {
@Override
public void onResponse(Void v) {
assert context.requiresWaitingForMappingUpdate();
context.resetForExecutionForRetry();
}

@Override
public void onFailure(Exception e) {
context.failOnMappingUpdate(e);
}
}, () -> itemDoneListener.onResponse(null))
);
}

@Override
public void onFailure(Exception e) {
onComplete(exceptionToResult(e, primary, isDelete, version), context, updateResult);
// Requesting mapping update failed, so we don't have to wait for a cluster state update
assert context.isInitial();
itemDoneListener.onResponse(null);
}
});
return false;
} else {
onComplete(result, context, updateResult);
}
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ void runWithPrimaryShardReference(final PrimaryShardReference primaryShardRefere
new ConcreteShardRequest<>(primaryRequest.getRequest(), primary.allocationId().getRelocationId(),
primaryRequest.getPrimaryTerm()),
transportOptions,
new ActionListenerResponseHandler<Response>(onCompletionListener, reader) {
new ActionListenerResponseHandler<>(onCompletionListener, reader) {
@Override
public void handleResponse(Response response) {
setPhase(replicationTask, "finished");
Expand All @@ -357,58 +357,54 @@ public void handleException(TransportException exp) {
});
} else {
setPhase(replicationTask, "primary");
final ActionListener<Response> listener = createResponseListener(primaryShardReference);
createReplicatedOperation(primaryRequest.getRequest(),
ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
primaryShardReference)
.execute();
ActionListener.wrap(result -> result.respond(
new ActionListener<>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(
e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
// intentionally swallow, a missed global checkpoint sync should not fail this operation
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync", shard.shardId()), e);
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
handleException(primaryShardReference, e);
}
}), e -> handleException(primaryShardReference, e)
), primaryShardReference).execute();
}
} catch (Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(e);
handleException(primaryShardReference, e);
}
}

private void handleException(PrimaryShardReference primaryShardReference, Exception e) {
Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
onFailure(e);
}

@Override
public void onFailure(Exception e) {
setPhase(replicationTask, "finished");
onCompletionListener.onFailure(e);
}

private ActionListener<Response> createResponseListener(final PrimaryShardReference primaryShardReference) {
return new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
if (syncGlobalCheckpointAfterOperation) {
final IndexShard shard = primaryShardReference.indexShard;
try {
shard.maybeSyncGlobalCheckpoint("post-operation");
} catch (final Exception e) {
// only log non-closed exceptions
if (ExceptionsHelper.unwrap(e, AlreadyClosedException.class, IndexShardClosedException.class) == null) {
logger.info(
new ParameterizedMessage(
"{} failed to execute post-operation global checkpoint sync",
shard.shardId()),
e);
// intentionally swallow, a missed global checkpoint sync should not fail this operation
}
}
}
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
primaryShardReference.close(); // release shard operation lock before responding to caller
setPhase(replicationTask, "finished");
onCompletionListener.onFailure(e);
}
};
}

protected ReplicationOperation<Request, ReplicaRequest, PrimaryResult<ReplicaRequest, Response>> createReplicatedOperation(
Request request, ActionListener<PrimaryResult<ReplicaRequest, Response>> listener,
PrimaryShardReference primaryShardReference) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.lucene.search.SortedSetSortField;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.util.Constants;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
Expand Down Expand Up @@ -78,8 +77,6 @@
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;


@LuceneTestCase.AwaitsFix( bugUrl = "https://github.com/elastic/elasticsearch/issues/40944")
public class SplitIndexIT extends ESIntegTestCase {

@Override
Expand Down

0 comments on commit c71e36b

Please sign in to comment.