Skip to content

Commit

Permalink
Fix fielddata handling for the _parent field.
Browse files Browse the repository at this point in the history
Change elastic#12371 broke fielddata on the `_parent` child for indices created before
2.0. This pull request adds back caching of the `_parent` fielddata for indices
created before 2.0 and cleans some related stuff. For instance
DocumentTypeListener doesn't need to take care of removed mappings anymore since
mappings can't be removed in 2.0.
  • Loading branch information
jpountz committed Jul 23, 2015
1 parent 948da82 commit 164d8c1
Show file tree
Hide file tree
Showing 6 changed files with 93 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@

import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* This class manages locks. Locks can be accessed with an identifier and are
Expand Down Expand Up @@ -115,58 +112,4 @@ public boolean hasLockedKeys() {
return !map.isEmpty();
}

/**
* A {@link KeyedLock} that allows to acquire a global lock that guarantees
* exclusive access to the resource the KeyedLock is guarding.
*/
public final static class GlobalLockable<T> extends KeyedLock<T> {


private final ReadWriteLock lock;

public GlobalLockable(boolean fair){
super(fair);
lock = new ReentrantReadWriteLock(fair);
}

public GlobalLockable() {
this(false);
}

@Override
public void acquire(T key) {
boolean success = false;
lock.readLock().lock();
try {
super.acquire(key);
success = true;
} finally {
if (!success) {
lock.readLock().unlock();
}
}
}

@Override
public void release(T key) {
KeyLock keyLock = threadLocal.get();
if (keyLock == null) {
throw new IllegalStateException("Lock not acquired");
}
try {
release(key, keyLock);
} finally {
lock.readLock().unlock();
}
}

/**
* Returns a global lock guaranteeing exclusive access to the resource
* this KeyedLock is guarding.
*/
public Lock globalLock() {
return lock.writeLock();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.index.AbstractIndexComponent;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.fielddata.plain.*;
Expand Down Expand Up @@ -136,11 +137,16 @@ public class IndexFieldDataService extends AbstractIndexComponent {
}

private final IndicesFieldDataCache indicesFieldDataCache;
private final KeyedLock.GlobalLockable<String> fieldLoadingLock = new KeyedLock.GlobalLockable<>();
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap(); // no need for concurrency support, always used under lock
// the below map needs to be modified under a lock
private final Map<String, IndexFieldDataCache> fieldDataCaches = Maps.newHashMap();

IndexService indexService;

// We need to cache fielddata on the _parent field because of 1.x indices.
// When we don't support 1.x anymore (3.0) then remove this caching
// This variable needs to be read/written under lock
private IndexFieldData<?> parentIndexFieldData;

@Inject
public IndexFieldDataService(Index index, @IndexSettings Settings indexSettings, IndicesFieldDataCache indicesFieldDataCache,
CircuitBreakerService circuitBreakerService) {
Expand All @@ -154,41 +160,35 @@ public void setIndexService(IndexService indexService) {
this.indexService = indexService;
}

public void clear() {
fieldLoadingLock.globalLock().lock();
try {
List<Throwable> exceptions = new ArrayList<>(0);
final Collection<IndexFieldDataCache> fieldDataCacheValues = fieldDataCaches.values();
for (IndexFieldDataCache cache : fieldDataCacheValues) {
try {
cache.clear();
} catch (Throwable t) {
exceptions.add(t);
}
public synchronized void clear() {
parentIndexFieldData = null;
List<Throwable> exceptions = new ArrayList<>(0);
final Collection<IndexFieldDataCache> fieldDataCacheValues = fieldDataCaches.values();
for (IndexFieldDataCache cache : fieldDataCacheValues) {
try {
cache.clear();
} catch (Throwable t) {
exceptions.add(t);
}
fieldDataCacheValues.clear();
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
} finally {
fieldLoadingLock.globalLock().unlock();
}
fieldDataCacheValues.clear();
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
}

public void clearField(final String fieldName) {
fieldLoadingLock.acquire(fieldName);
try {
List<Throwable> exceptions = new ArrayList<>(0);
final IndexFieldDataCache cache = fieldDataCaches.remove(fieldName);
if (cache != null) {
try {
cache.clear();
} catch (Throwable t) {
exceptions.add(t);
}
public synchronized void clearField(final String fieldName) {
if (ParentFieldMapper.NAME.equals(fieldName)) {
parentIndexFieldData = null;
}
List<Throwable> exceptions = new ArrayList<>(0);
final IndexFieldDataCache cache = fieldDataCaches.remove(fieldName);
if (cache != null) {
try {
cache.clear();
} catch (Throwable t) {
exceptions.add(t);
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
} finally {
fieldLoadingLock.release(fieldName);
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
}

@SuppressWarnings("unchecked")
Expand All @@ -199,32 +199,31 @@ public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType
throw new IllegalArgumentException("found no fielddata type for field [" + fieldNames.fullName() + "]");
}
final boolean docValues = fieldType.hasDocValues();
final String key = fieldNames.indexName();
fieldLoadingLock.acquire(key);
try {
IndexFieldData.Builder builder = null;
String format = type.getFormat(indexSettings);
if (format != null && FieldDataType.DOC_VALUES_FORMAT_VALUE.equals(format) && !docValues) {
logger.warn("field [" + fieldNames.fullName() + "] has no doc values, will use default field data format");
format = null;
}
if (format != null) {
builder = buildersByTypeAndFormat.get(Tuple.tuple(type.getType(), format));
if (builder == null) {
logger.warn("failed to find format [" + format + "] for field [" + fieldNames.fullName() + "], will use default");
}
}
if (builder == null && docValues) {
builder = docValuesBuildersByType.get(type.getType());
}
if (builder == null) {
builder = buildersByType.get(type.getType());
}
IndexFieldData.Builder builder = null;
String format = type.getFormat(indexSettings);
if (format != null && FieldDataType.DOC_VALUES_FORMAT_VALUE.equals(format) && !docValues) {
logger.warn("field [" + fieldNames.fullName() + "] has no doc values, will use default field data format");
format = null;
}
if (format != null) {
builder = buildersByTypeAndFormat.get(Tuple.tuple(type.getType(), format));
if (builder == null) {
throw new IllegalArgumentException("failed to find field data builder for field " + fieldNames.fullName() + ", and type " + type.getType());
logger.warn("failed to find format [" + format + "] for field [" + fieldNames.fullName() + "], will use default");
}
}
if (builder == null && docValues) {
builder = docValuesBuildersByType.get(type.getType());
}
if (builder == null) {
builder = buildersByType.get(type.getType());
}
if (builder == null) {
throw new IllegalArgumentException("failed to find field data builder for field " + fieldNames.fullName() + ", and type " + type.getType());
}

IndexFieldDataCache cache = fieldDataCaches.get(fieldNames.indexName());
IndexFieldDataCache cache;
synchronized (this) {
cache = fieldDataCaches.get(fieldNames.indexName());
if (cache == null) {
// we default to node level cache, which in turn defaults to be unbounded
// this means changing the node level settings is simple, just set the bounds there
Expand All @@ -239,10 +238,18 @@ public <IFD extends IndexFieldData<?>> IFD getForField(MappedFieldType fieldType
fieldDataCaches.put(fieldNames.indexName(), cache);
}

return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
} finally {
fieldLoadingLock.release(key);
// Remove this in 3.0
final boolean isOldParentField = ParentFieldMapper.NAME.equals(fieldNames.indexName())
&& Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1);
if (isOldParentField) {
if (parentIndexFieldData == null) {
parentIndexFieldData = builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
}
return (IFD) parentIndexFieldData;
}
}

return (IFD) builder.build(index, indexSettings, fieldType, cache, circuitBreakerService, indexService.mapperService());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSortedSet;

import org.apache.lucene.index.*;
import org.apache.lucene.index.MultiDocValues.OrdinalMap;
import org.apache.lucene.search.DocIdSetIterator;
Expand Down Expand Up @@ -79,12 +80,23 @@ public ParentChildIndexFieldData(Index index, @IndexSettings Settings indexSetti
FieldDataType fieldDataType, IndexFieldDataCache cache, MapperService mapperService,
CircuitBreakerService breakerService) {
super(index, indexSettings, fieldNames, fieldDataType, cache);
parentTypes = new TreeSet<>();
this.breakerService = breakerService;
for (DocumentMapper documentMapper : mapperService.docMappers(false)) {
beforeCreate(documentMapper);
if (Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1)) {
parentTypes = new TreeSet<>();
for (DocumentMapper documentMapper : mapperService.docMappers(false)) {
beforeCreate(documentMapper);
}
mapperService.addTypeListener(this);
} else {
ImmutableSortedSet.Builder<String> builder = ImmutableSortedSet.naturalOrder();
for (DocumentMapper mapper : mapperService.docMappers(false)) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
builder.add(parentFieldMapper.type());
}
}
parentTypes = builder.build();
}
mapperService.addTypeListener(this);
}

@Override
Expand All @@ -96,10 +108,6 @@ public XFieldComparatorSource comparatorSource(@Nullable Object missingValue, Mu
public AtomicParentChildFieldData load(LeafReaderContext context) {
if (Version.indexCreated(indexSettings).onOrAfter(Version.V_2_0_0_beta1)) {
final LeafReader reader = context.reader();
final NavigableSet<String> parentTypes;
synchronized (lock) {
parentTypes = ImmutableSortedSet.copyOf(this.parentTypes);
}
return new AbstractAtomicParentChildFieldData() {

public Set<String> types() {
Expand Down Expand Up @@ -145,6 +153,8 @@ public void close() throws ElasticsearchException {

@Override
public AbstractAtomicParentChildFieldData loadDirect(LeafReaderContext context) throws Exception {
// Make this method throw an UnsupportedOperationException in 3.0, only
// needed for indices created BEFORE 2.0
LeafReader reader = context.reader();
final float acceptableTransientOverheadRatio = fieldDataType.getSettings().getAsFloat(
"acceptable_transient_overhead_ratio", OrdinalsBuilder.DEFAULT_ACCEPTABLE_OVERHEAD_RATIO
Expand Down Expand Up @@ -219,6 +229,7 @@ public AbstractAtomicParentChildFieldData loadDirect(LeafReaderContext context)

@Override
public void beforeCreate(DocumentMapper mapper) {
// Remove in 3.0
synchronized (lock) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
Expand All @@ -231,16 +242,6 @@ public void beforeCreate(DocumentMapper mapper) {
}
}

@Override
public void afterRemove(DocumentMapper mapper) {
synchronized (lock) {
ParentFieldMapper parentFieldMapper = mapper.parentFieldMapper();
if (parentFieldMapper.active()) {
parentTypes.remove(new BytesRef(parentFieldMapper.type()));
}
}
}

@Override
protected AtomicParentChildFieldData empty(int maxDoc) {
return new ParentChildAtomicFieldData(ImmutableOpenMap.<String, AtomicOrdinalsFieldData>of());
Expand Down Expand Up @@ -358,8 +359,12 @@ public OrdinalMapAndAtomicFieldData(OrdinalMap ordMap, AtomicParentChildFieldDat
public IndexParentChildFieldData localGlobalDirect(IndexReader indexReader) throws Exception {
final long startTime = System.nanoTime();
final Set<String> parentTypes;
synchronized (lock) {
parentTypes = ImmutableSet.copyOf(this.parentTypes);
if (Version.indexCreated(indexSettings).before(Version.V_2_0_0_beta1)) {
synchronized (lock) {
parentTypes = ImmutableSet.copyOf(this.parentTypes);
}
} else {
parentTypes = this.parentTypes;
}

long ramBytesUsed = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,4 @@ public interface DocumentTypeListener {
*/
void beforeCreate(DocumentMapper mapper);

/**
* Invoked just after an existing document type has been removed.
*
* @param mapper The existing document mapper of the type being removed
*/
void afterRemove(DocumentMapper mapper);

}
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,6 @@ public void beforeCreate(DocumentMapper mapper) {
}
}

@Override
public void afterRemove(DocumentMapper mapper) {
if (PercolatorService.TYPE_NAME.equals(mapper.type())) {
disableRealTimePercolator();
clear();
}
}

}

private class ShardLifecycleListener extends IndicesLifecycle.Listener {
Expand Down
Loading

0 comments on commit 164d8c1

Please sign in to comment.