Skip to content

Commit

Permalink
Routing: Allow to specify on the _routing mapping that its required…
Browse files Browse the repository at this point in the history
…, and fail index operations that do not provide one, closes elastic#520.
  • Loading branch information
kimchy committed Nov 16, 2010
1 parent 8a8a6d5 commit 02981f6
Show file tree
Hide file tree
Showing 19 changed files with 264 additions and 20 deletions.
Expand Up @@ -25,7 +25,7 @@
import java.util.List;

/**
* @author kimchy (Shay Banon)
* @author kimchy (shay.banon)
*/
public class ActionRequestValidationException extends ElasticSearchException {

Expand Down
@@ -0,0 +1,53 @@
/*
* Licensed to Elastic Search and Shay Banon under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Elastic Search licenses this
* file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.action;

import org.elasticsearch.ElasticSearchException;

/**
* @author kimchy (shay.banon)
*/
public class RoutingMissingException extends ElasticSearchException {

private final String index;

private final String type;

private final String id;

public RoutingMissingException(String index, String type, String id) {
super("routing is required for [" + index + "]/[" + type + "]/[" + id + "]");
this.index = index;
this.type = type;
this.id = id;
}

public String index() {
return index;
}

public String type() {
return type;
}

public String id() {
return id;
}
}
Expand Up @@ -57,7 +57,7 @@ public class TransportShardReplicationPingAction extends TransportShardReplicati
return "ping/replication/shard";
}

@Override protected ShardReplicationPingResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
@Override protected ShardReplicationPingResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
return new ShardReplicationPingResponse();
}

Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.collect.Sets;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -95,7 +97,7 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
return clusterState.routingTable().index(request.index()).shard(request.shardId()).shardsIt();
}

@Override protected BulkShardResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
@Override protected BulkShardResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
final BulkShardRequest request = shardRequest.request;
BulkItemResponse[] responses = new BulkItemResponse[request.items().length];
Expand All @@ -105,6 +107,15 @@ public class TransportShardBulkAction extends TransportShardReplicationOperation
if (item.request() instanceof IndexRequest) {
IndexRequest indexRequest = (IndexRequest) item.request();
try {

// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(indexRequest.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (indexRequest.routing() == null) {
throw new RoutingMissingException(indexRequest.index(), indexRequest.type(), indexRequest.id());
}
}

SourceToParse sourceToParse = SourceToParse.source(indexRequest.source()).type(indexRequest.type()).id(indexRequest.id()).routing(indexRequest.routing());
if (indexRequest.opType() == IndexRequest.OpType.INDEX) {
ops[i] = indexShard.prepareIndex(sourceToParse);
Expand Down
Expand Up @@ -100,7 +100,7 @@ public class TransportDeleteAction extends TransportShardReplicationOperationAct
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}

@Override protected DeleteResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
@Override protected DeleteResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
DeleteRequest request = shardRequest.request;
IndexShard indexShard = indexShard(shardRequest);
Engine.Delete delete = indexShard.prepareDelete(request.type(), request.id());
Expand Down
Expand Up @@ -64,7 +64,7 @@ public class TransportShardDeleteByQueryAction extends TransportShardReplication
state.blocks().indexBlockedRaiseException(ClusterBlockLevel.WRITE, request.index());
}

@Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
@Override protected ShardDeleteByQueryResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
ShardDeleteByQueryRequest request = shardRequest.request;
indexShard(shardRequest).deleteByQuery(request.querySource(), request.queryParserName(), request.types());
return new ShardDeleteByQueryResponse();
Expand Down
Expand Up @@ -21,6 +21,7 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.action.TransportActions;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -31,6 +32,7 @@
import org.elasticsearch.cluster.action.index.MappingUpdatedAction;
import org.elasticsearch.cluster.action.shard.ShardStateAction;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.routing.ShardsIterator;
import org.elasticsearch.common.UUID;
import org.elasticsearch.common.inject.Inject;
Expand Down Expand Up @@ -133,9 +135,18 @@ public class TransportIndexAction extends TransportShardReplicationOperationActi
.indexShards(clusterService.state(), request.index(), request.type(), request.id(), request.routing());
}

@Override protected IndexResponse shardOperationOnPrimary(ShardOperationRequest shardRequest) {
IndexShard indexShard = indexShard(shardRequest);
@Override protected IndexResponse shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest) {
final IndexRequest request = shardRequest.request;

// validate, if routing is required, that we got routing
MappingMetaData mappingMd = clusterState.metaData().index(request.index()).mapping(request.type());
if (mappingMd != null && mappingMd.routing().required()) {
if (request.routing() == null) {
throw new RoutingMissingException(request.index(), request.type(), request.id());
}
}

IndexShard indexShard = indexShard(shardRequest);
SourceToParse sourceToParse = SourceToParse.source(request.source()).type(request.type()).id(request.id()).routing(request.routing());
ParsedDocument doc;
if (request.opType() == IndexRequest.OpType.INDEX) {
Expand Down
Expand Up @@ -103,7 +103,7 @@ protected TransportShardReplicationOperationAction(Settings settings, TransportS

protected abstract String transportAction();

protected abstract Response shardOperationOnPrimary(ShardOperationRequest shardRequest);
protected abstract Response shardOperationOnPrimary(ClusterState clusterState, ShardOperationRequest shardRequest);

protected abstract void shardOperationOnReplica(ShardOperationRequest shardRequest);

Expand Down Expand Up @@ -254,7 +254,7 @@ public void start() {
* Returns <tt>true</tt> if the action starting to be performed on the primary (or is done).
*/
public boolean start(final boolean fromClusterEvent) throws ElasticSearchException {
ClusterState clusterState = clusterService.state();
final ClusterState clusterState = clusterService.state();
nodes = clusterState.nodes();
if (!clusterState.routingTable().hasIndex(request.index())) {
retry(fromClusterEvent, null);
Expand Down Expand Up @@ -313,11 +313,11 @@ public boolean start(final boolean fromClusterEvent) throws ElasticSearchExcepti
request.beforeLocalFork();
threadPool.execute(new Runnable() {
@Override public void run() {
performOnPrimary(shard.id(), fromClusterEvent, true, shard);
performOnPrimary(shard.id(), fromClusterEvent, true, shard, clusterState);
}
});
} else {
performOnPrimary(shard.id(), fromClusterEvent, false, shard);
performOnPrimary(shard.id(), fromClusterEvent, false, shard, clusterState);
}
} else {
DiscoveryNode node = nodes.get(shard.currentNodeId());
Expand Down Expand Up @@ -413,9 +413,9 @@ private void retry(boolean fromClusterEvent, final ShardId shardId) {
}
}

private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard) {
private void performOnPrimary(int primaryShardId, boolean fromDiscoveryListener, boolean alreadyThreaded, final ShardRouting shard, ClusterState clusterState) {
try {
Response response = shardOperationOnPrimary(new ShardOperationRequest(primaryShardId, request));
Response response = shardOperationOnPrimary(clusterState, new ShardOperationRequest(primaryShardId, request));
performReplicas(response, alreadyThreaded);
} catch (Exception e) {
// shard has not been allocated yet, retry it here
Expand Down
Expand Up @@ -31,18 +31,43 @@
*/
public class MappingMetaData {

public static class Routing {

public static final Routing EMPTY = new Routing(false);

private final boolean required;

public Routing(boolean required) {
this.required = required;
}

public boolean required() {
return required;
}
}

private final String type;

private final CompressedString source;

private final Routing routing;

public MappingMetaData(DocumentMapper docMapper) {
this.type = docMapper.type();
this.source = docMapper.mappingSource();
this.routing = new Routing(docMapper.routingFieldMapper().required());
}

public MappingMetaData(String type, CompressedString source) {
this.type = type;
this.source = source;
this.routing = Routing.EMPTY;
}

MappingMetaData(String type, CompressedString source, Routing routing) {
this.type = type;
this.source = source;
this.routing = routing;
}

public String type() {
Expand All @@ -53,12 +78,22 @@ public CompressedString source() {
return this.source;
}

public Routing routing() {
return this.routing;
}

public static void writeTo(MappingMetaData mappingMd, StreamOutput out) throws IOException {
out.writeUTF(mappingMd.type());
mappingMd.source().writeTo(out);
// routing
out.writeBoolean(mappingMd.routing().required());
}

public static MappingMetaData readFrom(StreamInput in) throws IOException {
return new MappingMetaData(in.readUTF(), CompressedString.readCompressedString(in));
String type = in.readUTF();
CompressedString source = CompressedString.readCompressedString(in);
// routing
Routing routing = new Routing(in.readBoolean());
return new MappingMetaData(type, source, routing);
}
}
Expand Up @@ -66,6 +66,8 @@ public interface DocumentMapper {

AllFieldMapper allFieldMapper();

RoutingFieldMapper routingFieldMapper();

DocumentFieldMappers mappers();

/**
Expand Down
Expand Up @@ -26,5 +26,7 @@
*/
public interface RoutingFieldMapper extends FieldMapper<String>, InternalMapper {

boolean required();

String value(Document document);
}
Expand Up @@ -41,28 +41,43 @@ public static class Defaults extends AbstractFieldMapper.Defaults {
public static final Field.Store STORE = Field.Store.YES;
public static final boolean OMIT_NORMS = true;
public static final boolean OMIT_TERM_FREQ_AND_POSITIONS = true;
public static final boolean REQUIRED = false;
}

public static class Builder extends AbstractFieldMapper.Builder<Builder, RoutingFieldMapper> {

private boolean required = Defaults.REQUIRED;

public Builder() {
super(Defaults.NAME);
store = Defaults.STORE;
index = Defaults.INDEX;
}

public Builder required(boolean required) {
this.required = required;
return builder;
}

@Override public RoutingFieldMapper build(BuilderContext context) {
return new RoutingFieldMapper(store, index);
return new RoutingFieldMapper(store, index, required);
}
}

private final boolean required;

protected RoutingFieldMapper() {
this(Defaults.STORE, Defaults.INDEX);
this(Defaults.STORE, Defaults.INDEX, Defaults.REQUIRED);
}

protected RoutingFieldMapper(Field.Store store, Field.Index index) {
protected RoutingFieldMapper(Field.Store store, Field.Index index, boolean required) {
super(new Names(Defaults.NAME, Defaults.NAME, Defaults.NAME, Defaults.NAME), index, store, Defaults.TERM_VECTOR, 1.0f, Defaults.OMIT_NORMS, Defaults.OMIT_TERM_FREQ_AND_POSITIONS,
Lucene.KEYWORD_ANALYZER, Lucene.KEYWORD_ANALYZER);
this.required = required;
}

@Override public boolean required() {
return this.required;
}

@Override public String value(Document document) {
Expand Down Expand Up @@ -107,7 +122,7 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) {

@Override public void toXContent(XContentBuilder builder, Params params) throws IOException {
// if all are defaults, no sense to write it at all
if (index == Defaults.INDEX && store == Defaults.STORE) {
if (index == Defaults.INDEX && store == Defaults.STORE && required == Defaults.REQUIRED) {
return;
}
builder.startObject(CONTENT_TYPE);
Expand All @@ -117,6 +132,9 @@ protected RoutingFieldMapper(Field.Store store, Field.Index index) {
if (store != Defaults.STORE) {
builder.field("store", store.name().toLowerCase());
}
if (required != Defaults.REQUIRED) {
builder.field("required", required);
}
builder.endObject();
}

Expand Down
Expand Up @@ -312,6 +312,10 @@ public RootObjectMapper root() {
return this.allFieldMapper;
}

@Override public org.elasticsearch.index.mapper.RoutingFieldMapper routingFieldMapper() {
return this.routingFieldMapper;
}

@Override public Analyzer indexAnalyzer() {
return this.indexAnalyzer;
}
Expand Down
Expand Up @@ -218,6 +218,13 @@ private IdFieldMapper.Builder parseIdField(Map<String, Object> idNode, XContentM
private RoutingFieldMapper.Builder parseRoutingField(Map<String, Object> routingNode, XContentMapper.TypeParser.ParserContext parserContext) {
RoutingFieldMapper.Builder builder = routing();
parseField(builder, builder.name, routingNode, parserContext);
for (Map.Entry<String, Object> entry : routingNode.entrySet()) {
String fieldName = Strings.toUnderscoreCase(entry.getKey());
Object fieldNode = entry.getValue();
if (fieldName.equals("required")) {
builder.required(nodeBooleanValue(fieldNode));
}
}
return builder;
}

Expand Down
Expand Up @@ -56,6 +56,10 @@ public Node buildNode(String id) {
return buildNode(id, EMPTY_SETTINGS);
}

public Node buildNode(String id, Settings.Builder settings) {
return buildNode(id, settings.build());
}

public Node buildNode(String id, Settings settings) {
String settingsSource = getClass().getName().replace('.', '/') + ".yml";
Settings finalSettings = settingsBuilder()
Expand Down

0 comments on commit 02981f6

Please sign in to comment.