Skip to content

Commit

Permalink
Internal: make sure that all delete mapping internal requests share t…
Browse files Browse the repository at this point in the history
…he same original headers and context

Delete mapping executes flush, delete by query and refresh operations internally. Those internal requests are now initialized by passing in the original delete mapping request so that its headers and request context are kept around.

Closes elastic#7736
  • Loading branch information
javanna committed Sep 18, 2014
1 parent 32ffb2a commit 1422688
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 5 deletions.
Expand Up @@ -20,6 +20,7 @@
package org.elasticsearch.action.admin.indices.flush;

import org.elasticsearch.Version;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -47,6 +48,14 @@ public class FlushRequest extends BroadcastOperationRequest<FlushRequest> {

}

/**
* Copy constructor that creates a new flush request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public FlushRequest(ActionRequest originalRequest) {
super(originalRequest);
}

/**
* Constructs a new flush request against one or more indices. If nothing is provided, all indices will
* be flushed.
Expand Down
Expand Up @@ -23,10 +23,13 @@
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ShardOperationFailedException;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
import org.elasticsearch.action.admin.indices.flush.FlushResponse;
import org.elasticsearch.action.admin.indices.flush.TransportFlushAction;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.admin.indices.refresh.TransportRefreshAction;
import org.elasticsearch.action.deletebyquery.DeleteByQueryRequest;
import org.elasticsearch.action.deletebyquery.DeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.IndexDeleteByQueryResponse;
import org.elasticsearch.action.deletebyquery.TransportDeleteByQueryAction;
Expand All @@ -35,7 +38,6 @@
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.broadcast.BroadcastOperationResponse;
import org.elasticsearch.action.support.master.TransportMasterNodeOperationAction;
import org.elasticsearch.client.Requests;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ack.ClusterStateUpdateResponse;
Expand Down Expand Up @@ -112,7 +114,7 @@ protected ClusterBlockException checkBlock(DeleteMappingRequest request, Cluster
@Override
protected void masterOperation(final DeleteMappingRequest request, final ClusterState state, final ActionListener<DeleteMappingResponse> listener) throws ElasticsearchException {
final String[] concreteIndices = state.metaData().concreteIndices(request.indicesOptions(), request.indices());
flushAction.execute(Requests.flushRequest(concreteIndices), new ActionListener<FlushResponse>() {
flushAction.execute(new FlushRequest(request).indices(concreteIndices), new ActionListener<FlushResponse>() {
@Override
public void onResponse(FlushResponse flushResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -138,7 +140,9 @@ public void onResponse(FlushResponse flushResponse) {
request.types(types.toArray(new String[types.size()]));
QuerySourceBuilder querySourceBuilder = new QuerySourceBuilder()
.setQuery(QueryBuilders.filteredQuery(QueryBuilders.matchAllQuery(), filterBuilder));
deleteByQueryAction.execute(Requests.deleteByQueryRequest(concreteIndices).types(request.types()).source(querySourceBuilder), new ActionListener<DeleteByQueryResponse>() {

DeleteByQueryRequest deleteByQueryRequest = new DeleteByQueryRequest(request).indices(concreteIndices).types(request.types()).source(querySourceBuilder);
deleteByQueryAction.execute(deleteByQueryRequest, new ActionListener<DeleteByQueryResponse>() {
@Override
public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
if (logger.isTraceEnabled()) {
Expand All @@ -151,7 +155,7 @@ public void onResponse(DeleteByQueryResponse deleteByQueryResponse) {
}
}
}
refreshAction.execute(Requests.refreshRequest(concreteIndices), new ActionListener<RefreshResponse>() {
refreshAction.execute(new RefreshRequest(request).indices(concreteIndices), new ActionListener<RefreshResponse>() {
@Override
public void onResponse(RefreshResponse refreshResponse) {
if (logger.isTraceEnabled()) {
Expand Down
Expand Up @@ -19,6 +19,7 @@

package org.elasticsearch.action.admin.indices.refresh;

import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.support.broadcast.BroadcastOperationRequest;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
Expand All @@ -41,6 +42,14 @@ public class RefreshRequest extends BroadcastOperationRequest<RefreshRequest> {
RefreshRequest() {
}

/**
* Copy constructor that creates a new refresh request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public RefreshRequest(ActionRequest originalRequest) {
super(originalRequest);
}

public RefreshRequest(String... indices) {
super(indices);
}
Expand Down
Expand Up @@ -21,6 +21,7 @@

import com.google.common.base.Charsets;
import org.elasticsearch.ElasticsearchGenerationException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.QuerySourceBuilder;
import org.elasticsearch.action.support.replication.IndicesReplicationOperationRequest;
Expand Down Expand Up @@ -72,6 +73,14 @@ public DeleteByQueryRequest(String... indices) {
public DeleteByQueryRequest() {
}

/**
* Copy constructor that creates a new delete by query request that is a copy of the one provided as an argument.
* The new request will inherit though headers and context from the original request that caused it.
*/
public DeleteByQueryRequest(ActionRequest originalRequest) {
super(originalRequest);
}

@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = super.validate();
Expand Down Expand Up @@ -113,6 +122,7 @@ public DeleteByQueryRequest source(String query) {
/**
* The source to execute in the form of a map.
*/
@SuppressWarnings("unchecked")
public DeleteByQueryRequest source(Map source) {
try {
XContentBuilder builder = XContentFactory.contentBuilder(Requests.CONTENT_TYPE);
Expand Down
Expand Up @@ -33,7 +33,7 @@
/**
*
*/
public class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {
public abstract class IndicesReplicationOperationRequest<T extends IndicesReplicationOperationRequest> extends ActionRequest<T> implements IndicesRequest {

protected TimeValue timeout = ShardReplicationOperationRequest.DEFAULT_TIMEOUT;
protected String[] indices;
Expand All @@ -46,6 +46,13 @@ public TimeValue timeout() {
return timeout;
}

protected IndicesReplicationOperationRequest() {
}

protected IndicesReplicationOperationRequest(ActionRequest actionRequest) {
super(actionRequest);
}

/**
* A timeout to wait if the delete by query operation can't be performed immediately. Defaults to <tt>1m</tt>.
*/
Expand Down Expand Up @@ -74,6 +81,7 @@ public IndicesOptions indicesOptions() {
return indicesOptions;
}

@SuppressWarnings("unchecked")
public T indicesOptions(IndicesOptions indicesOptions) {
if (indicesOptions == null) {
throw new IllegalArgumentException("IndicesOptions must not be null");
Expand Down

0 comments on commit 1422688

Please sign in to comment.