Skip to content

Commit

Permalink
Merge branch 'master' of github.com:elastic/elasticsearch into primar…
Browse files Browse the repository at this point in the history
…y-context

* 'master' of github.com:elastic/elasticsearch: (21 commits)
  [DOCS] Clarify expected availability of HDFS for the HDFS Repository (elastic#25220)
  Remove some redundant 140 character checkstyle suppressions
  [Docs] more fix for the parent-join docs
  [Docs] Fix cross reference for parent-join field
  More advices around search speed and disk usage. (elastic#25252)
  Add documentation for the new parent-join field (elastic#25227)
  [analysis-icu] Allow setting unicodeSetFilter (elastic#20814)
  Introduce translog size and age based retention policies (elastic#25147)
  Add needs methods for specific variables to Painless script context factories. (elastic#25267)
  Improves snapshot logging and snapshoth deletion error handling (elastic#25264)
  Add unit test for PathHierarchyTokenizerFactory (elastic#24984)
  Deprecate tribe service
  Moved more token filters to analysis-common module.
  [Test] Make sure that SearchAfterSortedDocQueryTests uses a single threaded searcher
  [DOCS] Defined es-test-dir and plugins-examples-dir in index.asciidoc.  (elastic#25232)
  Test fix - removed superfluous assertion (elastic#25247)
  [Test] restore BWC for parent-join now that the new mapping format is in 5.x
  Add a section named "relations" in the ParentJoinFieldMapper (elastic#25248)
  test: Ported more OldIndexBackwardsCompatibilityIT tests to full cluster restart qa tests. (elastic#25173)
  fix: Sort Processor does not have proper behavior with targetField (elastic#25237)
  ...
  • Loading branch information
jasontedor committed Jun 16, 2017
2 parents 9683757 + 9c65073 commit d9dda28
Show file tree
Hide file tree
Showing 88 changed files with 2,452 additions and 724 deletions.
18 changes: 1 addition & 17 deletions buildSrc/src/main/resources/checkstyle_suppressions.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,6 @@ final class PerThreadIDVersionAndSeqNoLookup {
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff

/** The {@link LeafReaderContext} that needs to be looked up. */
private final LeafReaderContext context;
/** Live docs of the context, cached to avoid the cost of ensureOpen() on every
* segment for every index operation. */
private final Bits liveDocs;

/** terms enum for uid field */
final String uidField;
private final TermsEnum termsEnum;
Expand All @@ -71,10 +65,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
/**
* Initialize lookup for the provided segment
*/
PerThreadIDVersionAndSeqNoLookup(LeafReaderContext context, String uidField) throws IOException {
this.context = context;
final LeafReader reader = context.reader();
this.liveDocs = reader.getLiveDocs();
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
this.uidField = uidField;
Fields fields = reader.fields();
Terms terms = fields.terms(uidField);
Expand All @@ -91,12 +82,17 @@ final class PerThreadIDVersionAndSeqNoLookup {
this.readerKey = readerKey;
}

/** Return null if id is not found. */
public DocIdAndVersion lookupVersion(BytesRef id)
/** Return null if id is not found.
* We pass the {@link LeafReaderContext} as an argument so that things
* still work with reader wrappers that hide some documents while still
* using the same cache key. Otherwise we'd have to disable caching
* entirely for these readers.
*/
public DocIdAndVersion lookupVersion(BytesRef id, LeafReaderContext context)
throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id);
int docID = getDocID(id, context.reader().getLiveDocs());

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
final NumericDocValues versions = context.reader().getNumericDocValues(VersionFieldMapper.NAME);
Expand All @@ -116,7 +112,7 @@ public DocIdAndVersion lookupVersion(BytesRef id)
* returns the internal lucene doc id for the given id bytes.
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
* */
private int getDocID(BytesRef id) throws IOException {
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
if (termsEnum.seekExact(id)) {
int docID = DocIdSetIterator.NO_MORE_DOCS;
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
Expand All @@ -134,8 +130,10 @@ private int getDocID(BytesRef id) throws IOException {
}

/** Return null if id is not found. */
DocIdAndSeqNo lookupSeqNo(BytesRef id) throws IOException {
int docID = getDocID(id);
DocIdAndSeqNo lookupSeqNo(BytesRef id, LeafReaderContext context) throws IOException {
assert context.reader().getCoreCacheHelper().getKey().equals(readerKey) :
"context's reader is not the same as the reader class was initialized on.";
int docID = getDocID(id, context.reader().getLiveDocs());
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
NumericDocValues seqNos = context.reader().getNumericDocValues(SeqNoFieldMapper.NAME);
long seqNo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ private static PerThreadIDVersionAndSeqNoLookup[] getLookupState(IndexReader rea
if (lookupState == null) {
lookupState = new PerThreadIDVersionAndSeqNoLookup[reader.leaves().size()];
for (LeafReaderContext leaf : reader.leaves()) {
lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf, uidField);
lookupState[leaf.ord] = new PerThreadIDVersionAndSeqNoLookup(leaf.reader(), uidField);
}
ctl.set(lookupState);
}
Expand Down Expand Up @@ -132,8 +132,9 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes());
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf);
if (result != null) {
return result;
}
Expand All @@ -153,8 +154,9 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaves.get(i).ord];
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes());
final LeafReaderContext leaf = leaves.get(i);
PerThreadIDVersionAndSeqNoLookup lookup = lookups[leaf.ord];
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf);
if (result != null) {
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
EnableAllocationDecider.INDEX_ROUTING_ALLOCATION_ENABLE_SETTING,
IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_AGE_SETTING,
IndexSettings.INDEX_TRANSLOG_RETENTION_SIZE_SETTING,
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
FieldMapper.IGNORE_MALFORMED_SETTING,
FieldMapper.COERCE_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ private static void extractRawValues(List values, List<Object> part, String[] pa
}
}

public static Object extractValue(String path, Map<String, Object> map) {
public static Object extractValue(String path, Map<?, ?> map) {
String[] pathElements = path.split("\\.");
if (pathElements.length == 0) {
return null;
Expand Down
43 changes: 43 additions & 0 deletions core/src/main/java/org/elasticsearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;

/**
Expand Down Expand Up @@ -111,6 +112,24 @@ public final class IndexSettings {
Setting.byteSizeSetting("index.translog.flush_threshold_size", new ByteSizeValue(512, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* Controls how long translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. A longer retention policy is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<TimeValue> INDEX_TRANSLOG_RETENTION_AGE_SETTING =
Setting.timeSetting("index.translog.retention.age", TimeValue.timeValueMillis(-1), TimeValue.timeValueMillis(-1), Property.Dynamic,
Property.IndexScope);

/**
* Controls how many translog files that are no longer needed for persistence reasons
* will be kept around before being deleted. Keeping more files is useful to increase
* the chance of ops based recoveries.
**/
public static final Setting<ByteSizeValue> INDEX_TRANSLOG_RETENTION_SIZE_SETTING =
Setting.byteSizeSetting("index.translog.retention.size", new ByteSizeValue(-1, ByteSizeUnit.MB), Property.Dynamic,
Property.IndexScope);

/**
* The maximum size of a translog generation. This is independent of the maximum size of
* translog operations that have not been flushed.
Expand Down Expand Up @@ -168,6 +187,8 @@ public final class IndexSettings {
private final TimeValue syncInterval;
private volatile TimeValue refreshInterval;
private volatile ByteSizeValue flushThresholdSize;
private volatile TimeValue translogRetentionAge;
private volatile ByteSizeValue translogRetentionSize;
private volatile ByteSizeValue generationThresholdSize;
private final MergeSchedulerConfig mergeSchedulerConfig;
private final MergePolicyConfig mergePolicyConfig;
Expand Down Expand Up @@ -265,6 +286,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
syncInterval = INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.get(settings);
refreshInterval = scopedSettings.get(INDEX_REFRESH_INTERVAL_SETTING);
flushThresholdSize = scopedSettings.get(INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING);
translogRetentionAge = scopedSettings.get(INDEX_TRANSLOG_RETENTION_AGE_SETTING);
translogRetentionSize = scopedSettings.get(INDEX_TRANSLOG_RETENTION_SIZE_SETTING);
generationThresholdSize = scopedSettings.get(INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING);
mergeSchedulerConfig = new MergeSchedulerConfig(this);
gcDeletesInMillis = scopedSettings.get(INDEX_GC_DELETES_SETTING).getMillis();
Expand Down Expand Up @@ -302,6 +325,8 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
scopedSettings.addSettingsUpdateConsumer(
INDEX_TRANSLOG_GENERATION_THRESHOLD_SIZE_SETTING,
this::setGenerationThresholdSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_AGE_SETTING, this::setTranslogRetentionAge);
scopedSettings.addSettingsUpdateConsumer(INDEX_TRANSLOG_RETENTION_SIZE_SETTING, this::setTranslogRetentionSize);
scopedSettings.addSettingsUpdateConsumer(INDEX_REFRESH_INTERVAL_SETTING, this::setRefreshInterval);
scopedSettings.addSettingsUpdateConsumer(MAX_REFRESH_LISTENERS_PER_SHARD, this::setMaxRefreshListeners);
scopedSettings.addSettingsUpdateConsumer(MAX_SLICES_PER_SCROLL, this::setMaxSlicesPerScroll);
Expand All @@ -311,6 +336,14 @@ private void setTranslogFlushThresholdSize(ByteSizeValue byteSizeValue) {
this.flushThresholdSize = byteSizeValue;
}

private void setTranslogRetentionSize(ByteSizeValue byteSizeValue) {
this.translogRetentionSize = byteSizeValue;
}

private void setTranslogRetentionAge(TimeValue age) {
this.translogRetentionAge = age;
}

private void setGenerationThresholdSize(final ByteSizeValue generationThresholdSize) {
this.generationThresholdSize = generationThresholdSize;
}
Expand Down Expand Up @@ -469,6 +502,16 @@ public TimeValue getRefreshInterval() {
*/
public ByteSizeValue getFlushThresholdSize() { return flushThresholdSize; }

/**
* Returns the transaction log retention size which controls how much of the translog is kept around to allow for ops based recoveries
*/
public ByteSizeValue getTranslogRetentionSize() { return translogRetentionSize; }

/**
* Returns the transaction log retention age which controls the maximum age (time from creation) that translog files will be kept around
*/
public TimeValue getTranslogRetentionAge() { return translogRetentionAge; }

/**
* Returns the generation threshold size. As sequence numbers can cause multiple generations to
* be preserved for rollback purposes, we want to keep the size of individual generations from
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,10 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
}
this.uidField = engineConfig.getIndexSettings().isSingleType() ? IdFieldMapper.NAME : UidFieldMapper.NAME;
this.versionMap = new LiveVersionMap();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy();
final TranslogDeletionPolicy translogDeletionPolicy = new TranslogDeletionPolicy(
engineConfig.getIndexSettings().getTranslogRetentionSize().getBytes(),
engineConfig.getIndexSettings().getTranslogRetentionAge().getMillis()
);
this.deletionPolicy = new CombinedDeletionPolicy(
new SnapshotDeletionPolicy(new KeepOnlyLastCommitDeletionPolicy()), translogDeletionPolicy, openMode);
store.incRef();
Expand Down Expand Up @@ -1854,6 +1857,10 @@ public void onSettingsChanged() {
// the setting will be re-interpreted if it's set to true
this.maxUnsafeAutoIdTimestamp.set(Long.MAX_VALUE);
}
final TranslogDeletionPolicy translogDeletionPolicy = translog.getDeletionPolicy();
final IndexSettings indexSettings = engineConfig.getIndexSettings();
translogDeletionPolicy.setRetentionAgeInMillis(indexSettings.getTranslogRetentionAge().getMillis());
translogDeletionPolicy.setRetentionSizeInBytes(indexSettings.getTranslogRetentionSize().getBytes());
}

public MergeStats getMergeStats() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;

/**
Expand Down Expand Up @@ -121,4 +122,8 @@ public int compareTo(BaseTranslogReader o) {
public Path path() {
return path;
}

public long getLastModifiedTime() throws IOException {
return Files.getLastModifiedTime(path).toMillis();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1522,7 +1522,7 @@ public void trimUnreferencedReaders() throws IOException {
// we're shutdown potentially on some tragic event, don't delete anything
return;
}
long minReferencedGen = deletionPolicy.minTranslogGenRequired();
long minReferencedGen = deletionPolicy.minTranslogGenRequired(readers, current);
assert minReferencedGen >= getMinFileGeneration() :
"deletion policy requires a minReferenceGen of [" + minReferencedGen + "] but the lowest gen available is ["
+ getMinFileGeneration() + "]";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@

import org.apache.lucene.util.Counter;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class TranslogDeletionPolicy {

/** Records how many views are held against each
* translog generation */
/**
* Records how many views are held against each
* translog generation
*/
private final Map<Long, Counter> translogRefCounts = new HashMap<>();

/**
Expand All @@ -36,14 +40,31 @@ public class TranslogDeletionPolicy {
*/
private long minTranslogGenerationForRecovery = 1;

private long retentionSizeInBytes;

private long retentionAgeInMillis;

public TranslogDeletionPolicy(long retentionSizeInBytes, long retentionAgeInMillis) {
this.retentionSizeInBytes = retentionSizeInBytes;
this.retentionAgeInMillis = retentionAgeInMillis;
}

public synchronized void setMinTranslogGenerationForRecovery(long newGen) {
if (newGen < minTranslogGenerationForRecovery) {
throw new IllegalArgumentException("minTranslogGenerationForRecovery can't go backwards. new [" + newGen + "] current [" +
minTranslogGenerationForRecovery+ "]");
minTranslogGenerationForRecovery + "]");
}
minTranslogGenerationForRecovery = newGen;
}

public synchronized void setRetentionSizeInBytes(long bytes) {
retentionSizeInBytes = bytes;
}

public synchronized void setRetentionAgeInMillis(long ageInMillis) {
retentionAgeInMillis = ageInMillis;
}

/**
* acquires the basis generation for a new view. Any translog generation above, and including, the returned generation
* will not be deleted until a corresponding call to {@link #releaseTranslogGenView(long)} is called.
Expand Down Expand Up @@ -74,10 +95,59 @@ synchronized void releaseTranslogGenView(long translogGen) {
/**
* returns the minimum translog generation that is still required by the system. Any generation below
* the returned value may be safely deleted
*
* @param readers current translog readers
* @param writer current translog writer
*/
synchronized long minTranslogGenRequired() {
long viewRefs = translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
return Math.min(viewRefs, minTranslogGenerationForRecovery);
synchronized long minTranslogGenRequired(List<TranslogReader> readers, TranslogWriter writer) throws IOException {
long minByView = getMinTranslogGenRequiredByViews();
long minByAge = getMinTranslogGenByAge(readers, writer, retentionAgeInMillis, currentTime());
long minBySize = getMinTranslogGenBySize(readers, writer, retentionSizeInBytes);
final long minByAgeAndSize;
if (minBySize == Long.MIN_VALUE && minByAge == Long.MIN_VALUE) {
// both size and age are disabled;
minByAgeAndSize = Long.MAX_VALUE;
} else {
minByAgeAndSize = Math.max(minByAge, minBySize);
}
return Math.min(minByAgeAndSize, Math.min(minByView, minTranslogGenerationForRecovery));
}

static long getMinTranslogGenBySize(List<TranslogReader> readers, TranslogWriter writer, long retentionSizeInBytes) {
if (retentionSizeInBytes >= 0) {
long totalSize = writer.sizeInBytes();
long minGen = writer.getGeneration();
for (int i = readers.size() - 1; i >= 0 && totalSize < retentionSizeInBytes; i--) {
final TranslogReader reader = readers.get(i);
totalSize += reader.sizeInBytes();
minGen = reader.getGeneration();
}
return minGen;
} else {
return Long.MIN_VALUE;
}
}

static long getMinTranslogGenByAge(List<TranslogReader> readers, TranslogWriter writer, long maxRetentionAgeInMillis, long now)
throws IOException {
if (maxRetentionAgeInMillis >= 0) {
for (TranslogReader reader: readers) {
if (now - reader.getLastModifiedTime() <= maxRetentionAgeInMillis) {
return reader.getGeneration();
}
}
return writer.getGeneration();
} else {
return Long.MIN_VALUE;
}
}

protected long currentTime() {
return System.currentTimeMillis();
}

private long getMinTranslogGenRequiredByViews() {
return translogRefCounts.keySet().stream().reduce(Math::min).orElse(Long.MAX_VALUE);
}

/** returns the translog generation that will be used as a basis of a future store/peer recovery */
Expand Down
Loading

0 comments on commit d9dda28

Please sign in to comment.