diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/ActionRequestValidationException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/ActionRequestValidationException.java index 6f9f314da8d23..d1b2a1c77b4b4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/ActionRequestValidationException.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/ActionRequestValidationException.java @@ -25,7 +25,7 @@ import java.util.List; /** - * @author kimchy (Shay Banon) + * @author kimchy (shay.banon) */ public class ActionRequestValidationException extends ElasticSearchException { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/RoutingMissingException.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/RoutingMissingException.java new file mode 100644 index 0000000000000..1e6678dfe2bd9 --- /dev/null +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/RoutingMissingException.java @@ -0,0 +1,53 @@ +/* + * Licensed to Elastic Search and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Elastic Search licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.action; + +import org.elasticsearch.ElasticSearchException; + +/** + * @author kimchy (shay.banon) + */ +public class RoutingMissingException extends ElasticSearchException { + + private final String index; + + private final String type; + + private final String id; + + public RoutingMissingException(String index, String type, String id) { + super("routing is required for [" + index + "]/[" + type + "]/[" + id + "]"); + this.index = index; + this.type = type; + this.id = id; + } + + public String index() { + return index; + } + + public String type() { + return type; + } + + public String id() { + return id; + } +} diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java index 0b6853a227b25..5fc3639309222 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/admin/cluster/ping/replication/TransportShardReplicationPingAction.java @@ -57,7 +57,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati return "ping/replication/shard"; } - @Override protected ShardReplicationPingResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { + @Override protected ShardReplicationPingResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { return new ShardReplicationPingResponse(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java index 15a1b67501300..0c11e0e7f2c5c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.index.IndexRequest; @@ -31,6 +32,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.collect.Sets; import org.elasticsearch.common.inject.Inject; @@ -95,7 +97,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt(); } - @Override protected BulkShardResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { + @Override protected BulkShardResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { IndexShard indexShard = indexShard(shardRequest); final BulkShardRequest request = shardRequest.request; BulkItemResponse[] responses = new BulkItemResponse[request.items().length]; @@ -105,6 +107,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation if (item.request() instanceof IndexRequest) { IndexRequest indexRequest = (IndexRequest) item.request(); try { + + // validate, if routing is required, that we got routing + MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(indexRequest.type()); + if (mappingMd != null && mappingMd.routing().required()) { + if (indexRequest.routing() == null) { + throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id()); + } + } + SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()).routing(indexRequest.routing()); if (indexRequest.opType() == IndexRequest.OpType.INDEX) { ops[i] = indexShard.prepareIndex(sourceToParse); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 60a681d4450e2..4717344a7fcc4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -100,7 +100,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); } - @Override protected DeleteResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { + @Override protected DeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { DeleteRequest request = shardRequest.request; IndexShard indexShard = indexShard(shardRequest); Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id()); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java index 60b6ab49a5ed7..f1f31960b2488 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/deletebyquery/TransportShardDeleteByQueryAction.java @@ -64,7 +64,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index()); } - @Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { + @Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { ShardDeleteByQueryRequest request = shardRequest.request; indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types()); return new ShardDeleteByQueryResponse(); diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java index cbdfe136a95bb..b8b603f1d3981 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/index/TransportIndexAction.java @@ -21,6 +21,7 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.TransportActions; import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; @@ -31,6 +32,7 @@ import org.elasticsearch.cluster.action.index.MappingUpdatedAction; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.block.ClusterBlockLevel; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardsIterator; import org.elasticsearch.common.UUID; import org.elasticsearch.common.inject.Inject; @@ -133,9 +135,18 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi .indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing()); } - @Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) { - IndexShard indexShard = indexShard(shardRequest); + @Override protected IndexResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) { final IndexRequest request = shardRequest.request; + + // validate, if routing is required, that we got routing + MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type()); + if (mappingMd != null && mappingMd.routing().required()) { + if (request.routing() == null) { + throw new RoutingMissingException(request.index(), request.type(), request.id()); + } + } + + IndexShard indexShard = indexShard(shardRequest); SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()).routing(request.routing()); ParsedDocument doc; if (request.opType() == IndexRequest.OpType.INDEX) { diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java index e56bd3c6a2139..4c98fb8bac84c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/action/support/replication/TransportShardReplicationOperationAction.java @@ -103,7 +103,7 @@ protected TransportShardReplicationOperationAction(Settings settings, TransportS protected abstract String transportAction(); - protected abstract Response shardOperationOnPrimary(ShardOperationRequest shardRequest); + protected abstract Response shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest); protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest); @@ -254,7 +254,7 @@ public void start() { * Returns true if the action starting to be performed on the primary (or is done). */ public boolean start(final boolean fromClusterEvent) throws ElasticSearchException { - ClusterState clusterState = clusterService.state(); + final ClusterState clusterState = clusterService.state(); nodes = clusterState.nodes(); if (!clusterState.routingTable().hasIndex(request.index())) { retry(fromClusterEvent, null); @@ -313,11 +313,11 @@ public boolean start(final boolean fromClusterEvent) throws ElasticSearchExcepti request.beforeLocalFork(); threadPool.execute(new Runnable() { @Override public void run() { - performOnPrimary(shard.id(), fromClusterEvent, true, shard); + performOnPrimary(shard.id(), fromClusterEvent, true, shard, clusterState); } }); } else { - performOnPrimary(shard.id(), fromClusterEvent, false, shard); + performOnPrimary(shard.id(), fromClusterEvent, false, shard, clusterState); } } else { DiscoveryNode node = nodes.get(shard.currentNodeId()); @@ -413,9 +413,9 @@ private void retry(boolean fromClusterEvent, final ShardId shardId) { } } - private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard) { + private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard, ClusterState clusterState) { try { - Response response = shardOperationOnPrimary(new ShardOperationRequest(primaryShardId, request)); + Response response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request)); performReplicas(response, alreadyThreaded); } catch (Exception e) { // shard has not been allocated yet, retry it here diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java index 92f29810e4679..ac669c89115cf 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/cluster/metadata/MappingMetaData.java @@ -31,18 +31,43 @@ */ public class MappingMetaData { + public static class Routing { + + public static final Routing EMPTY = new Routing(false); + + private final boolean required; + + public Routing(boolean required) { + this.required = required; + } + + public boolean required() { + return required; + } + } + private final String type; private final CompressedString source; + private final Routing routing; + public MappingMetaData(DocumentMapper docMapper) { this.type = docMapper.type(); this.source = docMapper.mappingSource(); + this.routing = new Routing(docMapper.routingFieldMapper().required()); } public MappingMetaData(String type, CompressedString source) { this.type = type; this.source = source; + this.routing = Routing.EMPTY; + } + + MappingMetaData(String type, CompressedString source, Routing routing) { + this.type = type; + this.source = source; + this.routing = routing; } public String type() { @@ -53,12 +78,22 @@ public CompressedString source() { return this.source; } + public Routing routing() { + return this.routing; + } + public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException { out.writeUTF(mappingMd.type()); mappingMd.source().writeTo(out); + // routing + out.writeBoolean(mappingMd.routing().required()); } public static MappingMetaData readFrom(StreamInput in) throws IOException { - return new MappingMetaData(in.readUTF(), CompressedString.readCompressedString(in)); + String type = in.readUTF(); + CompressedString source = CompressedString.readCompressedString(in); + // routing + Routing routing = new Routing(in.readBoolean()); + return new MappingMetaData(type, source, routing); } } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java index 5f690cf992900..3e07af02c6d19 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java @@ -66,6 +66,8 @@ public interface DocumentMapper { AllFieldMapper allFieldMapper(); + RoutingFieldMapper routingFieldMapper(); + DocumentFieldMappers mappers(); /** diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java index 597b6b13dca37..c2a5116524ded 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/RoutingFieldMapper.java @@ -26,5 +26,7 @@ */ public interface RoutingFieldMapper extends FieldMapper, InternalMapper { + boolean required(); + String value(Document document); } \ No newline at end of file diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/RoutingFieldMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/RoutingFieldMapper.java index 81716118224b8..db82c667d7e63 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/RoutingFieldMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/RoutingFieldMapper.java @@ -41,28 +41,43 @@ public static class Defaults extends AbstractFieldMapper.Defaults { public static final Field.Store STORE = Field.Store.YES; public static final boolean OMIT_NORMS = true; public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true; + public static final boolean REQUIRED = false; } public static class Builder extends AbstractFieldMapper.Builder { + private boolean required = Defaults.REQUIRED; + public Builder() { super(Defaults.NAME); store = Defaults.STORE; index = Defaults.INDEX; } + public Builder required(boolean required) { + this.required = required; + return builder; + } + @Override public RoutingFieldMapper build(BuilderContext context) { - return new RoutingFieldMapper(store, index); + return new RoutingFieldMapper(store, index, required); } } + private final boolean required; + protected RoutingFieldMapper() { - this(Defaults.STORE, Defaults.INDEX); + this(Defaults.STORE, Defaults.INDEX, Defaults.REQUIRED); } - protected RoutingFieldMapper(Field.Store store, Field.Index index) { + protected RoutingFieldMapper(Field.Store store, Field.Index index, boolean required) { super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), index, store, Defaults.TERM_VECTOR, 1.0f, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS, Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER); + this.required = required; + } + + @Override public boolean required() { + return this.required; } @Override public String value(Document document) { @@ -107,7 +122,7 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) { @Override public void toXContent(XContentBuilder builder, Params params) throws IOException { // if all are defaults, no sense to write it at all - if (index == Defaults.INDEX && store == Defaults.STORE) { + if (index == Defaults.INDEX && store == Defaults.STORE && required == Defaults.REQUIRED) { return; } builder.startObject(CONTENT_TYPE); @@ -117,6 +132,9 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) { if (store != Defaults.STORE) { builder.field("store", store.name().toLowerCase()); } + if (required != Defaults.REQUIRED) { + builder.field("required", required); + } builder.endObject(); } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java index c386f6561fed4..906a37cc3669c 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapper.java @@ -312,6 +312,10 @@ public RootObjectMapper root() { return this.allFieldMapper; } + @Override public org.elasticsearch.index.mapper.RoutingFieldMapper routingFieldMapper() { + return this.routingFieldMapper; + } + @Override public Analyzer indexAnalyzer() { return this.indexAnalyzer; } diff --git a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java index a80f96ac11b54..d946c04181cd4 100644 --- a/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java +++ b/modules/elasticsearch/src/main/java/org/elasticsearch/index/mapper/xcontent/XContentDocumentMapperParser.java @@ -218,6 +218,13 @@ private IdFieldMapper.Builder parseIdField(Map idNode, XContentM private RoutingFieldMapper.Builder parseRoutingField(Map routingNode, XContentMapper.TypeParser.ParserContext parserContext) { RoutingFieldMapper.Builder builder = routing(); parseField(builder, builder.name, routingNode, parserContext); + for (Map.Entry entry : routingNode.entrySet()) { + String fieldName = Strings.toUnderscoreCase(entry.getKey()); + Object fieldNode = entry.getValue(); + if (fieldName.equals("required")) { + builder.required(nodeBooleanValue(fieldNode)); + } + } return builder; } diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/AbstractNodesTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/AbstractNodesTests.java index 88ec3926cc4c2..1e12501f1162b 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/AbstractNodesTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/AbstractNodesTests.java @@ -56,6 +56,10 @@ public Node buildNode(String id) { return buildNode(id, EMPTY_SETTINGS); } + public Node buildNode(String id, Settings.Builder settings) { + return buildNode(id, settings.build()); + } + public Node buildNode(String id, Settings settings) { String settingsSource = getClass().getName().replace('.', '/') + ".yml"; Settings finalSettings = settingsBuilder() diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java index f9469e2323ed8..87c194ec4e544 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/AbstractSimpleIndexGatewayTests.java @@ -68,6 +68,13 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests // get the environment, so we can clear the work dir when needed Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class); + + logger.info("Running Cluster Health (waiting for node to startup properly)"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + // Translog tests logger.info("Creating index [{}]", "test"); @@ -107,7 +114,7 @@ public abstract class AbstractSimpleIndexGatewayTests extends AbstractNodesTests startNode("server1"); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); @@ -227,6 +234,12 @@ protected boolean isPersistentStorage() { private void testLoad(boolean fullRecovery) { startNode("server1"); + logger.info("Running Cluster Health (waiting for node to startup properly)"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + // get the environment, so we can clear the work dir when needed Environment environment = ((InternalNode) node("server1")).injector().getInstance(Environment.class); @@ -234,7 +247,7 @@ private void testLoad(boolean fullRecovery) { client("server1").admin().indices().prepareCreate("test").execute().actionGet(); logger.info("Running Cluster Health (wait for the shards to startup)"); - ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); + clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForYellowStatus().waitForActiveShards(1)).actionGet(); logger.info("Done Cluster Health, status " + clusterHealth.status()); assertThat(clusterHealth.timedOut(), equalTo(false)); assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.YELLOW)); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java index b27f9b74fb2f6..2a6c8ee68d9a3 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/fs/FsMetaDataGatewayTests.java @@ -19,6 +19,8 @@ package org.elasticsearch.test.integration.gateway.fs; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.cluster.health.ClusterHealthStatus; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.indices.IndexAlreadyExistsException; import org.elasticsearch.node.internal.InternalNode; @@ -28,6 +30,8 @@ import org.testng.annotations.Test; import static org.elasticsearch.client.Requests.*; +import static org.hamcrest.MatcherAssert.*; +import static org.hamcrest.Matchers.*; /** * @author kimchy (shay.banon) @@ -51,6 +55,12 @@ public class FsMetaDataGatewayTests extends AbstractNodesTests { @Test public void testIndexActions() throws Exception { startNode("server1"); + logger.info("Running Cluster Health (waiting for node to startup properly)"); + ClusterHealthResponse clusterHealth = client("server1").admin().cluster().health(clusterHealthRequest().waitForGreenStatus()).actionGet(); + logger.info("Done Cluster Health, status " + clusterHealth.status()); + assertThat(clusterHealth.timedOut(), equalTo(false)); + assertThat(clusterHealth.status(), equalTo(ClusterHealthStatus.GREEN)); + client("server1").admin().indices().create(createIndexRequest("test")).actionGet(); closeNode("server1"); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java index b6d46399f3630..0f147caa8a241 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/gateway/local/LocalGatewayIndexStateTests.java @@ -25,9 +25,11 @@ import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.cluster.block.ClusterBlockException; import org.elasticsearch.cluster.metadata.IndexMetaData; +import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.common.logging.ESLogger; import org.elasticsearch.common.logging.Loggers; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.gateway.Gateway; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.test.integration.AbstractNodesTests; @@ -56,6 +58,51 @@ public class LocalGatewayIndexStateTests extends AbstractNodesTests { closeAllNodes(); } + @Test public void testMappingMetaDataParsed() throws Exception { + logger.info("--> cleaning nodes"); + buildNode("node1", settingsBuilder().put("gateway.type", "local")); + buildNode("node2", settingsBuilder().put("gateway.type", "local")); + cleanAndCloseNodes(); + + logger.info("--> starting 1 nodes"); + startNode("node1", settingsBuilder().put("gateway.type", "local")); + + logger.info("--> creating test index, with meta routing"); + client("node1").admin().indices().prepareCreate("test") + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject()) + .execute().actionGet(); + + logger.info("--> waiting for yellow status"); + ClusterHealthResponse health = client("node1").admin().cluster().prepareHealth().setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); + if (health.timedOut()) { + ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet(); + System.out.println("" + response); + } + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify meta _routing required exists"); + MappingMetaData mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().state().metaData().index("test").mapping("type1"); + assertThat(mappingMd.routing().required(), equalTo(true)); + + logger.info("--> close node"); + closeNode("node1"); + + logger.info("--> starting node again..."); + startNode("node1", settingsBuilder().put("gateway.type", "local")); + + logger.info("--> waiting for yellow status"); + health = client("node1").admin().cluster().prepareHealth().setWaitForActiveShards(5).setWaitForYellowStatus().execute().actionGet(); + if (health.timedOut()) { + ClusterStateResponse response = client("node1").admin().cluster().prepareState().execute().actionGet(); + System.out.println("" + response); + } + assertThat(health.timedOut(), equalTo(false)); + + logger.info("--> verify meta _routing required exists"); + mappingMd = client("node1").admin().cluster().prepareState().execute().actionGet().state().metaData().index("test").mapping("type1"); + assertThat(mappingMd.routing().required(), equalTo(true)); + } + @Test public void testSimpleOpenClose() throws Exception { logger.info("--> cleaning nodes"); buildNode("node1", settingsBuilder().put("gateway.type", "local").build()); diff --git a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java index eab2f60993cc5..d5fc701681e38 100644 --- a/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java +++ b/modules/test/integration/src/test/java/org/elasticsearch/test/integration/routing/SimpleRoutingTests.java @@ -19,7 +19,10 @@ package org.elasticsearch.test.integration.routing; +import org.elasticsearch.ElasticSearchException; +import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.client.Client; +import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.query.xcontent.QueryBuilders; import org.elasticsearch.test.integration.AbstractNodesTests; import org.testng.annotations.AfterClass; @@ -184,4 +187,28 @@ protected Client getClient() { assertThat(client.prepareCount().setRouting("0", "1", "0").setQuery(QueryBuilders.matchAllQuery()).execute().actionGet().count(), equalTo(2l)); } } + + @Test public void testRequiredRoutingMapping() throws Exception { + try { + client.admin().indices().prepareDelete("test").execute().actionGet(); + } catch (Exception e) { + // ignore + } + client.admin().indices().prepareCreate("test") + .addMapping("type1", XContentFactory.jsonBuilder().startObject().startObject("type1").startObject("_routing").field("required", true).endObject().endObject().endObject()) + .execute().actionGet(); + client.admin().cluster().prepareHealth().setWaitForGreenStatus().execute().actionGet(); + + logger.info("--> indexing with id [1], and routing [0]"); + client.prepareIndex("test", "type1", "1").setRouting("0").setSource("field", "value1").setRefresh(true).execute().actionGet(); + logger.info("--> verifying get with no routing, should not find anything"); + + logger.info("--> indexing with id [1], with no routing, should fail"); + try { + client.prepareIndex("test", "type1", "1").setSource("field", "value1").setRefresh(true).execute().actionGet(); + assert false; + } catch (ElasticSearchException e) { + assertThat(e.unwrapCause(), instanceOf(RoutingMissingException.class)); + } + } }