Skip to content

Commit

Permalink
[Tiered Caching] Cache tier policies (opensearch-project#12542)
Browse files Browse the repository at this point in the history
* Adds policy interface and took time policy impl

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Changes IndicesService to write a CachePolicyInfoWrapper before the QSR

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Moved took time logic from QSR to IndicesService

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* spotlessApply

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Addressed ansjcy's comments

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Partial rebase on most recent changes

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Integrated policies with new TSC changes

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Reverted unintended change to idea/vcs.xml

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* javadocs

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* github actions

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Set default threshold value to 10 ms

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Addressed Sorabh's comments

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Addressed Sorabh's second round of comments

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Set cachedQueryParser in IRC

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Addressed Sorabh's comments besides dynamic setting

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Removed dynamic setting, misc comments

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Added changelog entry

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Added missing javadoc

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Fixed failed gradle run

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* Added setting validation test

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* rerun gradle for flaky IT

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

* javadocs

Signed-off-by: Peter Alfonsi <petealft@amazon.com>

---------

Signed-off-by: Peter Alfonsi <petealft@amazon.com>
Co-authored-by: Peter Alfonsi <petealft@amazon.com>
  • Loading branch information
peteralfonsi and Peter Alfonsi committed Aug 30, 2024
1 parent 45fce15 commit 15c4afa
Show file tree
Hide file tree
Showing 12 changed files with 644 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.cache.common.policy;

import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.unit.TimeValue;

import java.util.function.Function;
import java.util.function.Predicate;

/**
* A cache tier policy which accepts queries whose took time is greater than some threshold.
* The threshold should be set to approximately the time it takes to get a result from the cache tier.
* The policy accepts values of type V and decodes them into CachedQueryResult.PolicyValues, which has the data needed
* to decide whether to admit the value.
* @param <V> The type of data consumed by test().
*/
public class TookTimePolicy<V> implements Predicate<V> {
/**
* The minimum took time to allow a query. Set to TimeValue.ZERO to let all data through.
*/
private final TimeValue threshold;

/**
* Function which extracts the relevant PolicyValues from a serialized CachedQueryResult
*/
private final Function<V, CachedQueryResult.PolicyValues> cachedResultParser;

/**
* Constructs a took time policy.
* @param threshold the threshold
* @param cachedResultParser the function providing policy values
*/
public TookTimePolicy(TimeValue threshold, Function<V, CachedQueryResult.PolicyValues> cachedResultParser) {
if (threshold.compareTo(TimeValue.ZERO) < 0) {
throw new IllegalArgumentException("Threshold for TookTimePolicy must be >= 0ms but was " + threshold.getStringRep());
}
this.threshold = threshold;
this.cachedResultParser = cachedResultParser;
}

/**
* Check whether to admit data.
* @param data the input argument
* @return whether to admit the data
*/
public boolean test(V data) {
long tookTimeNanos;
try {
tookTimeNanos = cachedResultParser.apply(data).getTookTimeNanos();
} catch (Exception e) {
// If we can't read a CachedQueryResult.PolicyValues from the BytesReference, reject the data
return false;
}

TimeValue tookTime = TimeValue.timeValueNanos(tookTimeNanos);
return tookTime.compareTo(threshold) >= 0;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** A package for policies controlling what can enter caches. */
package org.opensearch.cache.common.policy;
Original file line number Diff line number Diff line change
Expand Up @@ -8,26 +8,31 @@

package org.opensearch.cache.common.tier;

import org.opensearch.cache.common.policy.TookTimePolicy;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.function.Predicate;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
Expand All @@ -52,6 +57,7 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;
private final List<Predicate<V>> policies;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Expand All @@ -63,21 +69,27 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
if (evaluatePolicies(notification.getValue())) {
diskCache.put(notification.getKey(), notification.getValue());
}
}
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.setMaxSizeInBytes(builder.cacheConfig.getMaxSizeInBytes()) // TODO: Part of a workaround for an issue in TSC. Overall fix
// coming soon
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);

this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
}

// Package private for testing
Expand Down Expand Up @@ -192,6 +204,15 @@ private Function<K, V> getValueFromTieredCache() {
};
}

boolean evaluatePolicies(V value) {
for (Predicate<V> policy : policies) {
if (!policy.test(value)) {
return false;
}
}
return true;
}

/**
* Factory to create TieredSpilloverCache objects.
*/
Expand Down Expand Up @@ -231,11 +252,21 @@ public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType,
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);

TimeValue diskPolicyThreshold = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD
.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
.get(settings);
Function<V, CachedQueryResult.PolicyValues> cachedResultParser = Objects.requireNonNull(
config.getCachedResultParser(),
"Cached result parser fn can't be null"
);

return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.addPolicy(new TookTimePolicy<V>(diskPolicyThreshold, cachedResultParser))
.build();
}

Expand All @@ -257,6 +288,7 @@ public static class Builder<K, V> {
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;
private final ArrayList<Predicate<V>> policies = new ArrayList<>();

/**
* Default constructor
Expand Down Expand Up @@ -323,6 +355,26 @@ public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactorie
return this;
}

/**
* Set a cache policy to be used to limit access to this cache's disk tier.
* @param policy the policy
* @return builder
*/
public Builder<K, V> addPolicy(Predicate<V> policy) {
this.policies.add(policy);
return this;
}

/**
* Set multiple policies to be used to limit access to this cache's disk tier.
* @param policies the policies
* @return builder
*/
public Builder<K, V> addPolicies(List<Predicate<V>> policies) {
this.policies.addAll(policies);
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ public List<Setting<?>> getSettings() {
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
);
settingList.add(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
)
);
}
return settingList;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
package org.opensearch.cache.common.tier;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;

import java.util.concurrent.TimeUnit;

import static org.opensearch.common.settings.Setting.Property.NodeScope;

Expand Down Expand Up @@ -36,6 +39,21 @@ public class TieredSpilloverCacheSettings {
(key) -> Setting.simpleString(key, "", NodeScope)
);

/**
* Setting defining the minimum took time for a query to be allowed into the disk cache.
*/
public static final Setting.AffixSetting<TimeValue> TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD = Setting.suffixKeySetting(
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.policies.took_time.threshold",
(key) -> Setting.timeSetting(
key,
new TimeValue(10, TimeUnit.MILLISECONDS), // Default value for this setting
TimeValue.ZERO, // Minimum value for this setting
NodeScope
)
);
// 10 ms was chosen as a safe value based on proof of concept, where we saw disk latencies in this range.
// Will be tuned further with future benchmarks.

/**
* Default constructor
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.policy;

import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.TotalHits;
import org.opensearch.common.Randomness;
import org.opensearch.common.cache.policy.CachedQueryResult;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lucene.search.TopDocsAndMaxScore;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.search.DocValueFormat;
import org.opensearch.search.query.QuerySearchResult;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Random;
import java.util.function.Function;

public class TookTimePolicyTests extends OpenSearchTestCase {
private final Function<BytesReference, CachedQueryResult.PolicyValues> transformationFunction = (data) -> {
try {
return CachedQueryResult.getPolicyValues(data);
} catch (IOException e) {
throw new RuntimeException(e);
}
};

private TookTimePolicy<BytesReference> getTookTimePolicy(TimeValue threshold) {
return new TookTimePolicy<>(threshold, transformationFunction);
}

public void testTookTimePolicy() throws Exception {
double threshMillis = 10;
long shortMillis = (long) (0.9 * threshMillis);
long longMillis = (long) (1.5 * threshMillis);
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(new TimeValue((long) threshMillis));
BytesReference shortTime = getValidPolicyInput(shortMillis * 1000000);
BytesReference longTime = getValidPolicyInput(longMillis * 1000000);

boolean shortResult = tookTimePolicy.test(shortTime);
assertFalse(shortResult);
boolean longResult = tookTimePolicy.test(longTime);
assertTrue(longResult);

TookTimePolicy<BytesReference> disabledPolicy = getTookTimePolicy(TimeValue.ZERO);
shortResult = disabledPolicy.test(shortTime);
assertTrue(shortResult);
longResult = disabledPolicy.test(longTime);
assertTrue(longResult);
}

public void testNegativeOneInput() throws Exception {
// PolicyValues with -1 took time can be passed to this policy if we shouldn't accept it for whatever reason
TookTimePolicy<BytesReference> tookTimePolicy = getTookTimePolicy(TimeValue.ZERO);
BytesReference minusOne = getValidPolicyInput(-1L);
assertFalse(tookTimePolicy.test(minusOne));
}

public void testInvalidThreshold() throws Exception {
assertThrows(IllegalArgumentException.class, () -> getTookTimePolicy(TimeValue.MINUS_ONE));
}

private BytesReference getValidPolicyInput(Long tookTimeNanos) throws IOException {
// When it's used in the cache, the policy will receive BytesReferences which come from
// serializing a CachedQueryResult.
CachedQueryResult cachedQueryResult = new CachedQueryResult(getQSR(), tookTimeNanos);
BytesStreamOutput out = new BytesStreamOutput();
cachedQueryResult.writeToNoId(out);
return out.bytes();
}

private QuerySearchResult getQSR() {
// We can't mock the QSR with mockito because the class is final. Construct a real one
QuerySearchResult mockQSR = new QuerySearchResult();

// duplicated from DfsQueryPhaseTests.java
mockQSR.topDocs(
new TopDocsAndMaxScore(
new TopDocs(new TotalHits(1, TotalHits.Relation.EQUAL_TO), new ScoreDoc[] { new ScoreDoc(42, 1.0F) }),
2.0F
),
new DocValueFormat[0]
);
return mockQSR;
}

private void writeRandomBytes(StreamOutput out, int numBytes) throws IOException {
Random rand = Randomness.get();
byte[] bytes = new byte[numBytes];
rand.nextBytes(bytes);
out.writeBytes(bytes);
}
}
Loading

0 comments on commit 15c4afa

Please sign in to comment.