Skip to content
Browse files

replication base classes to allow for different implementation of the…

… primary request and the replica request
  • Loading branch information...
1 parent 72ad722 commit 1047cebabee2a5ad9f22897ed2abe2685385c069 @kimchy kimchy committed Oct 16, 2011
Showing with 129 additions and 76 deletions.
  1. +1 −2 .../org/elasticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java
  2. +1 −2 .../java/org/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java
  3. +8 −4 .../org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java
  4. +11 −7 modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
  5. +10 −6 modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
  6. +1 −1 ...elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java
  7. +10 −6 ...elasticsearch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java
  8. +1 −2 ...sticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java
  9. +1 −1 ...earch/src/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java
  10. +11 −6 ...earch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java
  11. +11 −7 modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
  12. +4 −3 ...n/java/org/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java
  13. +5 −4 ...java/org/elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java
  14. +54 −25 ...n/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
View
3 ...sticsearch/action/admin/cluster/ping/replication/TransportIndexReplicationPingAction.java
@@ -33,9 +33,8 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
- * @author kimchy (shay.banon)
*/
-public class TransportIndexReplicationPingAction extends TransportIndexReplicationOperationAction<IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingResponse> {
+public class TransportIndexReplicationPingAction extends TransportIndexReplicationOperationAction<IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportIndexReplicationPingAction(Settings settings, ClusterService clusterService,
TransportService transportService, ThreadPool threadPool,
View
3 ...g/elasticsearch/action/admin/cluster/ping/replication/TransportReplicationPingAction.java
@@ -31,9 +31,8 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
- * @author kimchy (Shay Banon)
*/
-public class TransportReplicationPingAction extends TransportIndicesReplicationOperationAction<ReplicationPingRequest, ReplicationPingResponse, IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingResponse> {
+public class TransportReplicationPingAction extends TransportIndicesReplicationOperationAction<ReplicationPingRequest, ReplicationPingResponse, IndexReplicationPingRequest, IndexReplicationPingResponse, ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportReplicationPingAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool, TransportIndexReplicationPingAction indexAction) {
super(settings, transportService, clusterService, threadPool, indexAction);
View
12 ...sticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java
@@ -33,7 +33,7 @@
/**
* @author kimchy (shay.banon)
*/
-public class TransportShardReplicationPingAction extends TransportShardReplicationOperationAction<ShardReplicationPingRequest, ShardReplicationPingResponse> {
+public class TransportShardReplicationPingAction extends TransportShardReplicationOperationAction<ShardReplicationPingRequest, ShardReplicationPingRequest, ShardReplicationPingResponse> {
@Inject public TransportShardReplicationPingAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@@ -53,6 +53,10 @@
return new ShardReplicationPingRequest();
}
+ @Override protected ShardReplicationPingRequest newReplicaRequestInstance() {
+ return new ShardReplicationPingRequest();
+ }
+
@Override protected ShardReplicationPingResponse newResponseInstance() {
return new ShardReplicationPingResponse();
}
@@ -61,11 +65,11 @@
return "ping/replication/shard";
}
- @Override protected PrimaryResponse<ShardReplicationPingResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
- return new PrimaryResponse<ShardReplicationPingResponse>(new ShardReplicationPingResponse(), null);
+ @Override protected PrimaryResponse<ShardReplicationPingResponse, ShardReplicationPingRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
+ return new PrimaryResponse<ShardReplicationPingResponse, ShardReplicationPingRequest>(shardRequest.request, new ShardReplicationPingResponse(), null);
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
}
@Override protected ShardIterator shards(ClusterState clusterState, ShardReplicationPingRequest request) {
View
18 ...s/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java
@@ -58,7 +58,7 @@
*
* @author kimchy (shay.banon)
*/
-public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardResponse> {
+public class TransportShardBulkAction extends TransportShardReplicationOperationAction<BulkShardRequest, BulkShardRequest, BulkShardResponse> {
private final MappingUpdatedAction mappingUpdatedAction;
@@ -86,6 +86,10 @@
return new BulkShardRequest();
}
+ @Override protected BulkShardRequest newReplicaRequestInstance() {
+ return new BulkShardRequest();
+ }
+
@Override protected BulkShardResponse newResponseInstance() {
return new BulkShardResponse();
}
@@ -102,9 +106,9 @@
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}
- @Override protected PrimaryResponse<BulkShardResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+ @Override protected PrimaryResponse<BulkShardResponse, BulkShardRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final BulkShardRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.IndexingOperation[] ops = null;
@@ -198,10 +202,10 @@
}
}
BulkShardResponse response = new BulkShardResponse(new ShardId(request.index(), request.shardId()), responses);
- return new PrimaryResponse<BulkShardResponse>(response, ops);
+ return new PrimaryResponse<BulkShardResponse, BulkShardRequest>(shardRequest.request, response, ops);
}
- @Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse> response) {
+ @Override protected void postPrimaryOperation(BulkShardRequest request, PrimaryResponse<BulkShardResponse, BulkShardRequest> response) {
IndexService indexService = indicesService.indexServiceSafe(request.index());
Engine.IndexingOperation[] ops = (Engine.IndexingOperation[]) response.payload();
if (ops == null) {
@@ -233,8 +237,8 @@
}
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
- IndexShard indexShard = indexShard(shardRequest);
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
final BulkShardRequest request = shardRequest.request;
for (int i = 0; i < request.items().length; i++) {
BulkItemRequest item = request.items()[i];
View
16 ...es/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java
@@ -50,7 +50,7 @@
*
* @author kimchy (shay.banon)
*/
-public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteResponse> {
+public class TransportDeleteAction extends TransportShardReplicationOperationAction<DeleteRequest, DeleteRequest, DeleteResponse> {
private final boolean autoCreateIndex;
@@ -136,6 +136,10 @@ private void innerExecute(final DeleteRequest request, final ActionListener<Dele
return new DeleteRequest();
}
+ @Override protected DeleteRequest newReplicaRequestInstance() {
+ return new DeleteRequest();
+ }
+
@Override protected DeleteResponse newResponseInstance() {
return new DeleteResponse();
}
@@ -148,9 +152,9 @@ private void innerExecute(final DeleteRequest request, final ActionListener<Dele
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
- @Override protected PrimaryResponse<DeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+ @Override protected PrimaryResponse<DeleteResponse, DeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.versionType(request.versionType())
.origin(Engine.Operation.Origin.PRIMARY);
@@ -167,12 +171,12 @@ private void innerExecute(final DeleteRequest request, final ActionListener<Dele
}
DeleteResponse response = new DeleteResponse(request.index(), request.type(), request.id(), delete.version(), delete.notFound());
- return new PrimaryResponse<DeleteResponse>(response, null);
+ return new PrimaryResponse<DeleteResponse, DeleteRequest>(shardRequest.request, response, null);
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
View
2 ...earch/src/main/java/org/elasticsearch/action/delete/index/TransportIndexDeleteAction.java
@@ -35,7 +35,7 @@
/**
* @author kimchy (shay.banon)
*/
-public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteResponse> {
+public class TransportIndexDeleteAction extends TransportIndexReplicationOperationAction<IndexDeleteRequest, IndexDeleteResponse, ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
@Inject public TransportIndexDeleteAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportShardDeleteAction deleteAction) {
View
16 ...earch/src/main/java/org/elasticsearch/action/delete/index/TransportShardDeleteAction.java
@@ -38,7 +38,7 @@
/**
* @author kimchy (Shay Banon)
*/
-public class TransportShardDeleteAction extends TransportShardReplicationOperationAction<ShardDeleteRequest, ShardDeleteResponse> {
+public class TransportShardDeleteAction extends TransportShardReplicationOperationAction<ShardDeleteRequest, ShardDeleteRequest, ShardDeleteResponse> {
@Inject public TransportShardDeleteAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@@ -54,6 +54,10 @@
return new ShardDeleteRequest();
}
+ @Override protected ShardDeleteRequest newReplicaRequestInstance() {
+ return new ShardDeleteRequest();
+ }
+
@Override protected ShardDeleteResponse newResponseInstance() {
return new ShardDeleteResponse();
}
@@ -70,9 +74,9 @@
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
- @Override protected PrimaryResponse<ShardDeleteResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+ @Override protected PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.PRIMARY);
indexShard.delete(delete);
@@ -89,12 +93,12 @@
ShardDeleteResponse response = new ShardDeleteResponse(delete.version(), delete.notFound());
- return new PrimaryResponse<ShardDeleteResponse>(response, null);
+ return new PrimaryResponse<ShardDeleteResponse, ShardDeleteRequest>(shardRequest.request, response, null);
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id(), request.version())
.origin(Engine.Operation.Origin.REPLICA);
indexShard.delete(delete);
View
3 ...ch/src/main/java/org/elasticsearch/action/deletebyquery/TransportDeleteByQueryAction.java
@@ -33,9 +33,8 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
- * @author kimchy (shay.banon)
*/
-public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
+public class TransportDeleteByQueryAction extends TransportIndicesReplicationOperationAction<DeleteByQueryRequest, DeleteByQueryResponse, IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportIndexDeleteByQueryAction indexDeleteByQueryAction) {
View
2 ...c/main/java/org/elasticsearch/action/deletebyquery/TransportIndexDeleteByQueryAction.java
@@ -34,7 +34,7 @@
/**
* @author kimchy (shay.banon)
*/
-public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
+public class TransportIndexDeleteByQueryAction extends TransportIndexReplicationOperationAction<IndexDeleteByQueryRequest, IndexDeleteByQueryResponse, ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportIndexDeleteByQueryAction(Settings settings, ClusterService clusterService, TransportService transportService,
ThreadPool threadPool, TransportShardDeleteByQueryAction shardDeleteByQueryAction) {
View
17 ...c/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java
@@ -38,7 +38,7 @@
/**
* @author kimchy (Shay Banon)
*/
-public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
+public class TransportShardDeleteByQueryAction extends TransportShardReplicationOperationAction<ShardDeleteByQueryRequest, ShardDeleteByQueryRequest, ShardDeleteByQueryResponse> {
@Inject public TransportShardDeleteByQueryAction(Settings settings, TransportService transportService,
ClusterService clusterService, IndicesService indicesService, ThreadPool threadPool,
@@ -58,6 +58,10 @@
return new ShardDeleteByQueryRequest();
}
+ @Override protected ShardDeleteByQueryRequest newReplicaRequestInstance() {
+ return new ShardDeleteByQueryRequest();
+ }
+
@Override protected ShardDeleteByQueryResponse newResponseInstance() {
return new ShardDeleteByQueryResponse();
}
@@ -70,17 +74,18 @@
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}
- @Override protected PrimaryResponse<ShardDeleteByQueryResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+ @Override protected PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
- return new PrimaryResponse<ShardDeleteByQueryResponse>(new ShardDeleteByQueryResponse(), null);
+ return new PrimaryResponse<ShardDeleteByQueryResponse, ShardDeleteByQueryRequest>(shardRequest.request, new ShardDeleteByQueryResponse(), null);
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
+
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
Engine.DeleteByQuery deleteByQuery = indexShard.prepareDeleteByQuery(request.querySource(), request.filteringAliases(), request.types());
indexShard.deleteByQuery(deleteByQuery);
}
View
18 modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java
@@ -66,7 +66,7 @@
*
* @author kimchy (shay.banon)
*/
-public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexResponse> {
+public class TransportIndexAction extends TransportShardReplicationOperationAction<IndexRequest, IndexRequest, IndexResponse> {
private final boolean autoCreateIndex;
@@ -136,6 +136,10 @@ private void innerExecute(final IndexRequest request, final ActionListener<Index
return new IndexRequest();
}
+ @Override protected IndexRequest newReplicaRequestInstance() {
+ return new IndexRequest();
+ }
+
@Override protected IndexResponse newResponseInstance() {
return new IndexResponse();
}
@@ -157,7 +161,7 @@ private void innerExecute(final IndexRequest request, final ActionListener<Index
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}
- @Override protected PrimaryResponse<IndexResponse> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
+ @Override protected PrimaryResponse<IndexResponse, IndexRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest) {
final IndexRequest request = shardRequest.request;
// validate, if routing is required, that we got routing
@@ -168,7 +172,7 @@ private void innerExecute(final IndexRequest request, final ActionListener<Index
}
}
- IndexShard indexShard = indexShard(shardRequest);
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
long version;
@@ -204,10 +208,10 @@ private void innerExecute(final IndexRequest request, final ActionListener<Index
request.version(version);
IndexResponse response = new IndexResponse(request.index(), request.type(), request.id(), version);
- return new PrimaryResponse<IndexResponse>(response, op);
+ return new PrimaryResponse<IndexResponse, IndexRequest>(shardRequest.request, response, op);
}
- @Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse> response) {
+ @Override protected void postPrimaryOperation(IndexRequest request, PrimaryResponse<IndexResponse, IndexRequest> response) {
Engine.IndexingOperation op = (Engine.IndexingOperation) response.payload();
if (!Strings.hasLength(request.percolate())) {
return;
@@ -221,8 +225,8 @@ private void innerExecute(final IndexRequest request, final ActionListener<Index
}
}
- @Override protected void shardOperationOnReplica(ShardOperationRequest shardRequest) {
- IndexShard indexShard = indexShard(shardRequest);
+ @Override protected void shardOperationOnReplica(ReplicaOperationRequest shardRequest) {
+ IndexShard indexShard = indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
IndexRequest request = shardRequest.request;
SourceToParse sourceToParse = SourceToParse.source(request.underlyingSource(), request.underlyingSourceOffset(), request.underlyingSourceLength()).type(request.type()).id(request.id())
.routing(request.routing()).parent(request.parent()).timestamp(request.timestamp()).ttl(request.ttl());
View
7 ...rg/elasticsearch/action/support/replication/TransportIndexReplicationOperationAction.java
@@ -21,6 +21,7 @@
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
@@ -40,15 +41,15 @@
/**
* @author kimchy (shay.banon)
*/
-public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
+public abstract class TransportIndexReplicationOperationAction<Request extends IndexReplicationOperationRequest, Response extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ClusterService clusterService;
- protected final TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction;
+ protected final TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction;
@Inject public TransportIndexReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
- TransportShardReplicationOperationAction<ShardRequest, ShardResponse> shardAction) {
+ TransportShardReplicationOperationAction<ShardRequest, ShardReplicaRequest, ShardResponse> shardAction) {
super(settings, threadPool);
this.clusterService = clusterService;
this.shardAction = shardAction;
View
9 .../elasticsearch/action/support/replication/TransportIndicesReplicationOperationAction.java
@@ -20,6 +20,7 @@
package org.elasticsearch.action.support.replication;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.BaseAction;
import org.elasticsearch.cluster.ClusterService;
@@ -37,20 +38,20 @@
import java.util.concurrent.atomic.AtomicReferenceArray;
/**
- * @author kimchy (shay.banon)
*/
-public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse, ShardRequest extends ShardReplicationOperationRequest, ShardResponse extends ActionResponse>
+public abstract class TransportIndicesReplicationOperationAction<Request extends IndicesReplicationOperationRequest, Response extends ActionResponse, IndexRequest extends IndexReplicationOperationRequest, IndexResponse extends ActionResponse,
+ ShardRequest extends ShardReplicationOperationRequest, ShardReplicaRequest extends ActionRequest, ShardResponse extends ActionResponse>
extends BaseAction<Request, Response> {
protected final ClusterService clusterService;
- protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction;
+ protected final TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction;
final String transportAction;
@Inject public TransportIndicesReplicationOperationAction(Settings settings, TransportService transportService, ClusterService clusterService, ThreadPool threadPool,
- TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardResponse> indexAction) {
+ TransportIndexReplicationOperationAction<IndexRequest, IndexResponse, ShardRequest, ShardReplicaRequest, ShardResponse> indexAction) {
super(settings, threadPool);
this.clusterService = clusterService;
this.indexAction = indexAction;
View
79 ...rg/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java
@@ -22,6 +22,7 @@
import org.elasticsearch.ElasticSearchException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
+import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.UnavailableShardsException;
import org.elasticsearch.action.WriteConsistencyLevel;
@@ -46,7 +47,6 @@
import org.elasticsearch.index.engine.VersionConflictEngineException;
import org.elasticsearch.index.shard.IllegalIndexShardStateException;
import org.elasticsearch.index.shard.ShardId;
-import org.elasticsearch.index.shard.service.IndexShard;
import org.elasticsearch.indices.IndexMissingException;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.node.NodeClosedException;
@@ -67,9 +67,8 @@
import static org.elasticsearch.ExceptionsHelper.*;
/**
- * @author kimchy (shay.banon)
*/
-public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
+public abstract class TransportShardReplicationOperationAction<Request extends ShardReplicationOperationRequest, ReplicaRequest extends ActionRequest, Response extends ActionResponse> extends BaseAction<Request, Response> {
protected final TransportService transportService;
@@ -115,20 +114,22 @@ protected TransportShardReplicationOperationAction(Settings settings, TransportS
protected abstract Request newRequestInstance();
+ protected abstract ReplicaRequest newReplicaRequestInstance();
+
protected abstract Response newResponseInstance();
protected abstract String transportAction();
protected abstract String executor();
- protected abstract PrimaryResponse<Response> shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);
+ protected abstract PrimaryResponse<Response, ReplicaRequest> shardOperationOnPrimary(ClusterState clusterState, PrimaryOperationRequest shardRequest);
- protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);
+ protected abstract void shardOperationOnReplica(ReplicaOperationRequest shardRequest);
/**
* Called once replica operations have been dispatched on the
*/
- protected void postPrimaryOperation(Request request, PrimaryResponse<Response> response) {
+ protected void postPrimaryOperation(Request request, PrimaryResponse<Response, ReplicaRequest> response) {
}
@@ -156,10 +157,6 @@ private String transportReplicaAction() {
return transportAction() + "/replica";
}
- protected IndexShard indexShard(ShardOperationRequest shardRequest) {
- return indicesService.indexServiceSafe(shardRequest.request.index()).shardSafe(shardRequest.shardId);
- }
-
protected boolean retryPrimaryException(Throwable e) {
Throwable cause = ExceptionsHelper.unwrapCause(e);
return cause instanceof IndexShardMissingException ||
@@ -231,32 +228,32 @@ boolean ignoreReplicaException(Throwable e) {
}
}
- class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ShardOperationRequest> {
+ class ReplicaOperationTransportHandler extends BaseTransportRequestHandler<ReplicaOperationRequest> {
- @Override public ShardOperationRequest newInstance() {
- return new ShardOperationRequest();
+ @Override public ReplicaOperationRequest newInstance() {
+ return new ReplicaOperationRequest();
}
@Override public String executor() {
return executor;
}
- @Override public void messageReceived(final ShardOperationRequest request, final TransportChannel channel) throws Exception {
+ @Override public void messageReceived(final ReplicaOperationRequest request, final TransportChannel channel) throws Exception {
shardOperationOnReplica(request);
channel.sendResponse(VoidStreamable.INSTANCE);
}
}
- protected class ShardOperationRequest implements Streamable {
+ protected class PrimaryOperationRequest implements Streamable {
public int shardId;
public Request request;
- public ShardOperationRequest() {
+ public PrimaryOperationRequest() {
}
- public ShardOperationRequest(int shardId, Request request) {
+ public PrimaryOperationRequest(int shardId, Request request) {
this.shardId = shardId;
this.request = request;
}
@@ -273,6 +270,32 @@ public ShardOperationRequest(int shardId, Request request) {
}
}
+ protected class ReplicaOperationRequest implements Streamable {
+
+ public int shardId;
+
+ public ReplicaRequest request;
+
+ public ReplicaOperationRequest() {
+ }
+
+ public ReplicaOperationRequest(int shardId, ReplicaRequest request) {
+ this.shardId = shardId;
+ this.request = request;
+ }
+
+ @Override public void readFrom(StreamInput in) throws IOException {
+ shardId = in.readVInt();
+ request = newReplicaRequestInstance();
+ request.readFrom(in);
+ }
+
+ @Override public void writeTo(StreamOutput out) throws IOException {
+ out.writeVInt(shardId);
+ request.writeTo(out);
+ }
+ }
+
protected class AsyncShardOperationAction {
private final ActionListener<Response> listener;
@@ -461,7 +484,7 @@ void retry(boolean fromClusterEvent, final ShardId shardId) {
void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final ShardRouting shard, ClusterState clusterState) {
try {
- PrimaryResponse<Response> response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
+ PrimaryResponse<Response, ReplicaRequest> response = shardOperationOnPrimary(clusterState, new PrimaryOperationRequest(primaryShardId, request));
performReplicas(response);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
@@ -478,7 +501,7 @@ void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, final S
}
}
- void performReplicas(final PrimaryResponse<Response> response) {
+ void performReplicas(final PrimaryResponse<Response, ReplicaRequest> response) {
if (ignoreReplicas() || shardIt.size() == 1 /* no replicas */) {
postPrimaryOperation(request, response);
listener.onResponse(response.response());
@@ -543,7 +566,7 @@ void performReplicas(final PrimaryResponse<Response> response) {
}
}
- void performOnReplica(final PrimaryResponse<Response> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
+ void performOnReplica(final PrimaryResponse<Response, ReplicaRequest> response, final AtomicInteger counter, final ShardRouting shard, String nodeId) {
// if we don't have that node, it means that it might have failed and will be created again, in
// this case, we don't have to do the operation, and just let it failover
if (!nodes.nodeExists(nodeId)) {
@@ -553,7 +576,7 @@ void performOnReplica(final PrimaryResponse<Response> response, final AtomicInte
return;
}
- final ShardOperationRequest shardRequest = new ShardOperationRequest(shardIt.shardId().id(), request);
+ final ReplicaOperationRequest shardRequest = new ReplicaOperationRequest(shardIt.shardId().id(), response.replicaRequest());
if (!nodeId.equals(nodes.localNodeId())) {
DiscoveryNode node = nodes.get(nodeId);
transportService.sendRequest(node, transportReplicaAction, shardRequest, transportOptions(), new VoidTransportResponseHandler(ThreadPool.Names.SAME) {
@@ -610,16 +633,22 @@ private void finishIfPossible() {
}
}
- public static class PrimaryResponse<T> {
- private final T response;
+ public static class PrimaryResponse<Response, ReplicaRequest> {
+ private final ReplicaRequest replicaRequest;
+ private final Response response;
private final Object payload;
- public PrimaryResponse(T response, Object payload) {
+ public PrimaryResponse(ReplicaRequest replicaRequest, Response response, Object payload) {
+ this.replicaRequest = replicaRequest;
this.response = response;
this.payload = payload;
}
- public T response() {
+ public ReplicaRequest replicaRequest() {
+ return this.replicaRequest;
+ }
+
+ public Response response() {
return response;
}

0 comments on commit 1047ceb

Please sign in to comment.
Something went wrong with that request. Please try again.