Skip to content

Commit

Permalink
Mappings: Same code path for dynamic mappings updates and updates com…
Browse files Browse the repository at this point in the history
…ing from the API.

We have two completely different code paths for mappings updates, depending on
whether they come from the API or are guessed based on the parsed documents.
This commit makes dynamic mappings updates execute like updates from the API.

The only change in behaviour is that a document that fails parsing can not
modify mappings anymore (useful to prevent issues such as elastic#9851). Other than
that, this change should be fairly transparent to users but working this way
opens doors to other changes such as validating dynamic mappings updates on the
master node (elastic#8688).

The way it works internally is that Mapper.parse now returns a Mapper instead
of being void. The returned Mapper represents a mapping update that has been
performed in order to parse the document. Mappings updates are propagated
recursively back to the root mapper, and once parsing is finished, we check
that the mappings update can be applied, and either fail the parsing if the
update cannot be merged (eg. because of a concurrent mapping update from the
API) or merge the update into the mappings.

However not all mappings updates can be applied recursively, `copy_to` for
instance can add mappings at totally different places in the tree. Because of
it I added ParseContext.rootMapperUpdates which `copy_to` fills when the
field to copy data to does not exist in the mappings yet. These mappings
updates are merged from the ones generated by regular parsing.

One particular mapping update was the `auto_boost` setting on the `all` root
mapper. Being tricky to work on, I removed it in favour of search-time checks
that payloads have been indexed.

One interesting side-effect of the change is that concurrency on ObjectMapper
is greatly simplified since we do not have to care anymore about having
concurrent dynamic mappings and API updates.
  • Loading branch information
jpountz committed Apr 16, 2015
1 parent df11821 commit 38bec83
Show file tree
Hide file tree
Showing 38 changed files with 804 additions and 534 deletions.
5 changes: 5 additions & 0 deletions dev-tools/create-bwc-index.py
Expand Up @@ -226,6 +226,11 @@ def generate_index(client, version, index_name):
}
}
}
mappings['auto_boost'] = {
'_all': {
'auto_boost': True
}
}

client.indices.create(index=index_name, body={
'settings': {
Expand Down
Expand Up @@ -19,12 +19,16 @@

package org.elasticsearch.common.lucene.all;

import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.search.ComplexExplanation;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.similarities.Similarity;
import org.apache.lucene.search.similarities.Similarity.SimScorer;
Expand Down Expand Up @@ -197,4 +201,22 @@ public boolean equals(Object obj) {
return true;
}

@Override
public Query rewrite(IndexReader reader) throws IOException {
boolean hasPayloads = false;
for (LeafReaderContext context : reader.leaves()) {
final Terms terms = context.reader().terms(term.field());
if (terms.hasPayloads()) {
hasPayloads = true;
break;
}
}
if (hasPayloads == false) {
TermQuery rewritten = new TermQuery(term);
rewritten.setBoost(getBoost());
return rewritten;
}
return this;
}

}
77 changes: 69 additions & 8 deletions src/main/java/org/elasticsearch/index/mapper/DocumentMapper.java
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;

import org.apache.lucene.document.Field;
import org.apache.lucene.index.IndexOptions;
import org.apache.lucene.index.IndexableField;
Expand All @@ -33,6 +34,7 @@
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedString;
Expand Down Expand Up @@ -70,6 +72,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -438,10 +441,11 @@ public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listen
ParseContext.InternalParseContext context = cache.get();

if (source.type() != null && !source.type().equals(this.type)) {
throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + this.type + "]", context.mappingsModified());
throw new MapperParsingException("Type mismatch, provide type [" + source.type() + "] but mapper is of type [" + this.type + "]");
}
source.type(this.type);

boolean mappingsModified = false;
XContentParser parser = source.parser();
try {
if (parser == null) {
Expand All @@ -456,23 +460,47 @@ public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listen
int countDownTokens = 0;
XContentParser.Token token = parser.nextToken();
if (token != XContentParser.Token.START_OBJECT) {
throw new MapperParsingException("Malformed content, must start with an object", context.mappingsModified());
throw new MapperParsingException("Malformed content, must start with an object");
}
boolean emptyDoc = false;
token = parser.nextToken();
if (token == XContentParser.Token.END_OBJECT) {
// empty doc, we can handle it...
emptyDoc = true;
} else if (token != XContentParser.Token.FIELD_NAME) {
throw new MapperParsingException("Malformed content, after first object, either the type field or the actual properties should exist", context.mappingsModified());
throw new MapperParsingException("Malformed content, after first object, either the type field or the actual properties should exist");
}

for (RootMapper rootMapper : rootMappersOrdered) {
rootMapper.preParse(context);
}

if (!emptyDoc) {
rootObjectMapper.parse(context);
Mapper update = rootObjectMapper.parse(context);
for (RootObjectMapper mapper : context.updates()) {
if (update == null) {
update = mapper;
} else {
MapperUtils.merge(update, mapper);
}
}
if (update != null) {
// TODO: validate the mapping update on the master node
// lock to avoid concurrency issues with mapping updates coming from the API
synchronized(this) {
// simulate on the first time to check if the mapping update is applicable
MergeContext mergeContext = newMmergeContext(new MergeFlags().simulate(true));
rootObjectMapper.merge(update, mergeContext);
if (mergeContext.hasConflicts()) {
throw new MapperParsingException("Could not apply generated dynamic mappings: " + Arrays.toString(mergeContext.buildConflicts()));
} else {
// then apply it for real
mappingsModified = true;
mergeContext = newMmergeContext(new MergeFlags().simulate(false));
rootObjectMapper.merge(update, mergeContext);
}
}
}
}

for (int i = 0; i < countDownTokens; i++) {
Expand All @@ -490,10 +518,10 @@ public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listen

// Throw a more meaningful message if the document is empty.
if (source.source() != null && source.source().length() == 0) {
throw new MapperParsingException("failed to parse, document is empty", context.mappingsModified());
throw new MapperParsingException("failed to parse, document is empty");
}

throw new MapperParsingException("failed to parse", e, context.mappingsModified());
throw new MapperParsingException("failed to parse", e);
} finally {
// only close the parser when its not provided externally
if (source.parser() == null && parser != null) {
Expand Down Expand Up @@ -521,7 +549,7 @@ public ParsedDocument parse(SourceToParse source, @Nullable ParseListener listen
}

ParsedDocument doc = new ParsedDocument(context.uid(), context.version(), context.id(), context.type(), source.routing(), source.timestamp(), source.ttl(), context.docs(),
context.source(), context.mappingsModified()).parent(source.parent());
context.source(), mappingsModified).parent(source.parent());
// reset the context to free up memory
context.reset(null, null, null, null);
return doc;
Expand Down Expand Up @@ -637,8 +665,41 @@ public void traverse(ObjectMapperListener listener) {
rootObjectMapper.traverse(listener);
}

private MergeContext newMmergeContext(MergeFlags mergeFlags) {
return new MergeContext(mergeFlags) {

List<String> conflicts = new ArrayList<>();

@Override
public void addFieldMappers(List<FieldMapper<?>> fieldMappers) {
DocumentMapper.this.addFieldMappers(fieldMappers);
}

@Override
public void addObjectMappers(Collection<ObjectMapper> objectMappers) {
DocumentMapper.this.addObjectMappers(objectMappers);
}

@Override
public void addConflict(String mergeFailure) {
conflicts.add(mergeFailure);
}

@Override
public boolean hasConflicts() {
return conflicts.isEmpty() == false;
}

@Override
public String[] buildConflicts() {
return conflicts.toArray(Strings.EMPTY_ARRAY);
}

};
}

public synchronized MergeResult merge(DocumentMapper mergeWith, MergeFlags mergeFlags) {
MergeContext mergeContext = new MergeContext(this, mergeFlags);
final MergeContext mergeContext = newMmergeContext(mergeFlags);
assert rootMappers.size() == mergeWith.rootMappers.size();

rootObjectMapper.merge(mergeWith.rootObjectMapper, mergeContext);
Expand Down
7 changes: 6 additions & 1 deletion src/main/java/org/elasticsearch/index/mapper/Mapper.java
Expand Up @@ -125,7 +125,12 @@ public Version indexVersionCreated() {

String name();

void parse(ParseContext context) throws IOException;
/**
* Parse using the provided {@link ParseContext} and return a mapping
* update if dynamic mappings modified the mappings, or {@code null} if
* mappings were not modified.
*/
Mapper parse(ParseContext context) throws IOException;

void merge(Mapper mergeWith, MergeContext mergeContext) throws MergeMappingException;

Expand Down
Expand Up @@ -28,28 +28,10 @@ public class MapperParsingException extends MapperException {

public MapperParsingException(String message) {
super(message);
mappingsModified = false;
}

public boolean isMappingsModified() {
return mappingsModified;
}

private boolean mappingsModified = false;

public MapperParsingException(String message, boolean mappingsModified) {
super(message);
this.mappingsModified = mappingsModified;
}

public MapperParsingException(String message, Throwable cause, boolean mappingsModified) {
super(message, cause);
this.mappingsModified = mappingsModified;
}

public MapperParsingException(String message, Throwable cause) {
super(message, cause);
this.mappingsModified = false;
}

@Override
Expand Down
82 changes: 82 additions & 0 deletions src/main/java/org/elasticsearch/index/mapper/MapperUtils.java
@@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.elasticsearch.index.mapper;

import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.Strings;
import org.elasticsearch.index.mapper.object.ObjectMapper;
import org.elasticsearch.index.mapper.object.RootObjectMapper;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

public enum MapperUtils {
;

/**
* Parse the given {@code context} with the given {@code mapper} and apply
* the potential mapping update in-place. This method is useful when
* composing mapping updates.
*/
public static <M extends Mapper> M parseAndMergeUpdate(M mapper, ParseContext context) throws IOException {
final Mapper update = mapper.parse(context);
if (update != null) {
merge(mapper, update);
}
return mapper;
}

/**
* Merge {@code mergeWith} into {@code mergeTo}. Note: this method only
* merges mappings, not lookup structures. Conflicts are returned as exceptions.
*/
public static void merge(Mapper mergeInto, Mapper mergeWith) {
MergeContext ctx = new MergeContext(new DocumentMapper.MergeFlags().simulate(false)) {

@Override
public boolean hasConflicts() {
return false;
}

@Override
public String[] buildConflicts() {
return Strings.EMPTY_ARRAY;
}

@Override
public void addObjectMappers(Collection<ObjectMapper> objectMappers) {
// no-op
}

@Override
public void addFieldMappers(List<FieldMapper<?>> fieldMappers) {
// no-op
}

@Override
public void addConflict(String mergeFailure) {
throw new ElasticsearchIllegalStateException("Merging dynamic updates triggered a conflict: " + mergeFailure);
}
};
mergeInto.merge(mergeWith, ctx);
}

}
28 changes: 10 additions & 18 deletions src/main/java/org/elasticsearch/index/mapper/MergeContext.java
Expand Up @@ -19,41 +19,33 @@

package org.elasticsearch.index.mapper;

import com.google.common.collect.Lists;
import org.elasticsearch.index.mapper.object.ObjectMapper;

import java.util.Collection;
import java.util.List;

/**
*
*/
public class MergeContext {
public abstract class MergeContext {

private final DocumentMapper documentMapper;
private final DocumentMapper.MergeFlags mergeFlags;
private final List<String> mergeConflicts = Lists.newArrayList();

public MergeContext(DocumentMapper documentMapper, DocumentMapper.MergeFlags mergeFlags) {
this.documentMapper = documentMapper;
public MergeContext(DocumentMapper.MergeFlags mergeFlags) {
this.mergeFlags = mergeFlags;
}

public DocumentMapper docMapper() {
return documentMapper;
}
public abstract void addFieldMappers(List<FieldMapper<?>> fieldMappers);

public abstract void addObjectMappers(Collection<ObjectMapper> objectMappers);

public DocumentMapper.MergeFlags mergeFlags() {
return mergeFlags;
}

public void addConflict(String mergeFailure) {
mergeConflicts.add(mergeFailure);
}
public abstract void addConflict(String mergeFailure);

public boolean hasConflicts() {
return !mergeConflicts.isEmpty();
}
public abstract boolean hasConflicts();

public String[] buildConflicts() {
return mergeConflicts.toArray(new String[mergeConflicts.size()]);
}
public abstract String[] buildConflicts();
}

0 comments on commit 38bec83

Please sign in to comment.