Skip to content

Commit

Permalink
Maxing out retries on conflict in bulk update cause null pointer exce…
Browse files Browse the repository at this point in the history
…ptions

Also:
Bulk update one less retry then requested
Document for retries on conflict says it default to 1 (but default is 0)
TransportShardReplicationOperationAction methods now catches Throwables instead of exceptions
Added a little extra check to UpdateTests.concurrentUpdateWithRetryOnConflict

Closes elastic#3447 & elastic#3448
  • Loading branch information
bleskes committed Aug 6, 2013
1 parent d5058d1 commit 00a0967
Show file tree
Hide file tree
Showing 6 changed files with 107 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,16 +147,16 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP

BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
long[] preVersions = new long[request.items().length];
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
for (int requestIndex = 0; requestIndex < request.items().length; requestIndex++) {
BulkItemRequest item = request.items()[requestIndex];
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {
WriteResult result = shardIndexOperation(request, indexRequest, clusterState, indexShard, true);
// add the response
IndexResponse indexResponse = result.response();
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
preVersions[i] = result.preVersion;
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(), indexResponse);
preVersions[requestIndex] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
Expand All @@ -167,13 +167,13 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
if (ops == null) {
ops = new Engine.IndexingOperation[request.items().length];
}
ops[i] = result.op;
ops[requestIndex] = result.op;
}
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) e;
Expand All @@ -183,22 +183,22 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
} else {
logger.debug("[{}][{}] failed to execute bulk item (index) {}", e, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(e)));
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
request.items()[requestIndex] = null;
}
} else if (item.request() instanceof DeleteRequest) {
DeleteRequest deleteRequest = (DeleteRequest) item.request();
try {
// add the response
DeleteResponse deleteResponse = shardDeleteOperation(deleteRequest, indexShard).response();
responses[i] = new BulkItemResponse(item.id(), "delete", deleteResponse);
responses[requestIndex] = new BulkItemResponse(item.id(), "delete", deleteResponse);
} catch (Exception e) {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(e)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) e;
Expand All @@ -208,22 +208,23 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
} else {
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", e, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[i] = new BulkItemResponse(item.id(), "delete",
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(e)));
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
request.items()[requestIndex] = null;
}
} else if (item.request() instanceof UpdateRequest) {
UpdateRequest updateRequest = (UpdateRequest) item.request();
int retryCount = 0;
do {
// We need to do the requested retries plus the initial attempt. We don't do < 1+retry_on_conflict because retry_on_conflict may be Integer.MAX_VALUE
for (int updateAttemptsCount = 0; updateAttemptsCount <= updateRequest.retryOnConflict(); updateAttemptsCount++) {
UpdateResult updateResult;
try {
updateResult = shardUpdateOperation(clusterState, request, updateRequest, indexShard);
} catch (Throwable t) {
updateResult = new UpdateResult(null, null, false, t, null);
}
if (updateResult.success()) {

switch (updateResult.result.operation()) {
case UPSERT:
case INDEX:
Expand All @@ -238,8 +239,8 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
Tuple<XContentType, Map<String, Object>> sourceAndContent = XContentHelper.convertToMap(indexSourceAsBytes, true);
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, indexResponse.getVersion(), sourceAndContent.v2(), sourceAndContent.v1(), indexSourceAsBytes));
}
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
preVersions[i] = result.preVersion;
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
preVersions[requestIndex] = result.preVersion;
if (result.mappingToUpdate != null) {
if (mappingsToUpdate == null) {
mappingsToUpdate = Sets.newHashSet();
Expand All @@ -250,40 +251,50 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
if (ops == null) {
ops = new Engine.IndexingOperation[request.items().length];
}
ops[i] = result.op;
ops[requestIndex] = result.op;
}
// Replace the update request to the translated index request to execute on the replica.
request.items()[i] = new BulkItemRequest(request.items()[i].id(), indexRequest);
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), indexRequest);
break;
case DELETE:
DeleteResponse response = updateResult.writeResult.response();
DeleteRequest deleteRequest = updateResult.request();
updateResponse = new UpdateResponse(response.getIndex(), response.getType(), response.getId(), response.getVersion());
updateResponse.setGetResult(updateHelper.extractGetResult(updateRequest, response.getVersion(), updateResult.result.updatedSourceAsMap(), updateResult.result.updateSourceContentType(), null));
responses[i] = new BulkItemResponse(item.id(), "update", updateResponse);
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResponse);
// Replace the update request to the translated delete request to execute on the replica.
request.items()[i] = new BulkItemRequest(request.items()[i].id(), deleteRequest);
request.items()[requestIndex] = new BulkItemRequest(request.items()[requestIndex].id(), deleteRequest);
break;
case NONE:
responses[i] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
request.items()[i] = null; // No need to go to the replica
responses[requestIndex] = new BulkItemResponse(item.id(), "update", updateResult.noopResult);
request.items()[requestIndex] = null; // No need to go to the replica
break;
}
// NOTE: Breaking out of the retry_on_conflict loop!
break;
} else if (updateResult.failure()) {
Throwable t = updateResult.error;
if (!updateResult.retry) {
if (updateResult.retry) {
// updateAttemptCount is 0 based and marks current attempt, if it's equal to retryOnConflict we are going out of the iteration
if (updateAttemptsCount >= updateRequest.retryOnConflict()) {
// we can't try any more
responses[requestIndex] = new BulkItemResponse(item.id(), "update",
new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));;

request.items()[requestIndex] = null; // do not send to replicas
}
}
else {
// rethrow the failure if we are going to retry on primary and let parent failure to handle it
if (retryPrimaryException(t)) {
// restore updated versions...
for (int j = 0; j < i; j++) {
for (int j = 0; j < requestIndex; j++) {
applyVersion(request.items()[j], preVersions[j]);
}
throw (ElasticSearchException) t;
}
if (updateResult.result == null) {
responses[i] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
responses[requestIndex] = new BulkItemResponse(item.id(), "update", new BulkItemResponse.Failure(updateRequest.index(), updateRequest.type(), updateRequest.id(), ExceptionsHelper.detailedMessage(t)));
} else {
switch (updateResult.result.operation()) {
case UPSERT:
Expand All @@ -294,7 +305,7 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
} else {
logger.debug("[{}][{}] failed to execute bulk item (index) {}", t, shardRequest.request.index(), shardRequest.shardId, indexRequest);
}
responses[i] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
responses[requestIndex] = new BulkItemResponse(item.id(), indexRequest.opType().lowercase(),
new BulkItemResponse.Failure(indexRequest.index(), indexRequest.type(), indexRequest.id(), ExceptionsHelper.detailedMessage(t)));
break;
case DELETE:
Expand All @@ -304,19 +315,23 @@ protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnP
} else {
logger.debug("[{}][{}] failed to execute bulk item (delete) {}", t, shardRequest.request.index(), shardRequest.shardId, deleteRequest);
}
responses[i] = new BulkItemResponse(item.id(), "delete",
responses[requestIndex] = new BulkItemResponse(item.id(), "delete",
new BulkItemResponse.Failure(deleteRequest.index(), deleteRequest.type(), deleteRequest.id(), ExceptionsHelper.detailedMessage(t)));
break;
}
}
// nullify the request so it won't execute on the replicas
request.items()[i] = null;
request.items()[requestIndex] = null;
// NOTE: Breaking out of the retry_on_conflict loop!
break;
}

}
} while (++retryCount < updateRequest.retryOnConflict());
}
}

assert responses[requestIndex] != null; // we must have set a response somewhere.

}

if (mappingsToUpdate != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public void onResponse(Response result) {
public void onFailure(Throwable e) {
try {
channel.sendResponse(e);
} catch (Exception e1) {
} catch (Throwable e1) {
logger.warn("Failed to send response for " + transportAction, e1);
}
}
Expand Down Expand Up @@ -520,7 +520,7 @@ void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final S
try {
PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
performReplicas(response);
} catch (Exception e) {
} catch (Throwable e) {
// shard has not been allocated yet, retry it here
if (retryPrimaryException(e)) {
primaryOperationStarted.set(false);
Expand Down Expand Up @@ -691,7 +691,7 @@ private void finishIfPossible() {
public void run() {
try {
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica " + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
Expand All @@ -705,7 +705,7 @@ public void run() {
} else {
try {
shardOperationOnReplica(shardRequest);
} catch (Exception e) {
} catch (Throwable e) {
if (!ignoreReplicaException(e)) {
logger.warn("Failed to perform " + transportAction + " on replica" + shardIt.shardId(), e);
shardStateAction.shardFailed(shard, "Failed to perform [" + transportAction + "] on replica, message [" + detailedMessage(e) + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ public String[] fields() {

/**
* Sets the number of retries of a version conflict occurs because the document was updated between
* getting it and updating it. Defaults to 1.
* getting it and updating it. Defaults to 0.
*/
public UpdateRequest retryOnConflict(int retryOnConflict) {
this.retryOnConflict = retryOnConflict;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public UpdateRequestBuilder setFields(String... fields) {

/**
* Sets the number of retries of a version conflict occurs because the document was updated between
* getting it and updating it. Defaults to 1.
* getting it and updating it. Defaults to 0.
*/
public UpdateRequestBuilder setRetryOnConflict(int retryOnConflict) {
request.retryOnConflict(retryOnConflict);
Expand Down
Loading

0 comments on commit 00a0967

Please sign in to comment.