Skip to content

Commit

Permalink
[Tiered caching] Introducing cache plugins and exposing Ehcache as on…
Browse files Browse the repository at this point in the history
…e of the pluggable disk cache option (opensearch-project#11874)

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
  • Loading branch information
sgup432 authored and Peter Alfonsi committed Aug 30, 2024
1 parent f372c2c commit 80d2130
Show file tree
Hide file tree
Showing 53 changed files with 3,402 additions and 720 deletions.
17 changes: 17 additions & 0 deletions modules/cache-common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

opensearchplugin {
description 'Module for caches which are optional and do not require additional security permission'
classname 'org.opensearch.cache.common.tier.TieredSpilloverCachePlugin'
}

test {
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
systemProperty 'tests.security.manager', 'false'
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,335 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.LoadAwareCacheLoader;
import org.opensearch.common.cache.RemovalListener;
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.config.CacheConfig;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.ReleasableLock;
import org.opensearch.common.util.iterable.Iterables;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;

/**
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
* then items are eventually evicted from it and removed which will result in cache miss.
*
* @param <K> Type of key
* @param <V> Type of value
*
* @opensearch.experimental
*/
@ExperimentalApi
public class TieredSpilloverCache<K, V> implements ICache<K, V> {

private final ICache<K, V> diskCache;
private final ICache<K, V> onHeapCache;
private final RemovalListener<K, V> removalListener;
ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
ReleasableLock readLock = new ReleasableLock(readWriteLock.readLock());
ReleasableLock writeLock = new ReleasableLock(readWriteLock.writeLock());
/**
* Maintains caching tiers in ascending order of cache latency.
*/
private final List<ICache<K, V>> cacheList;

TieredSpilloverCache(Builder<K, V> builder) {
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");

this.onHeapCache = builder.onHeapCacheFactory.create(
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<K, V>() {
@Override
public void onRemoval(RemovalNotification<K, V> notification) {
try (ReleasableLock ignore = writeLock.acquire()) {
diskCache.put(notification.getKey(), notification.getValue());
}
removalListener.onRemoval(notification);
}
})
.setKeyType(builder.cacheConfig.getKeyType())
.setValueType(builder.cacheConfig.getValueType())
.setSettings(builder.cacheConfig.getSettings())
.setWeigher(builder.cacheConfig.getWeigher())
.build(),
builder.cacheType,
builder.cacheFactories

);
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
this.cacheList = Arrays.asList(onHeapCache, diskCache);
}

// Package private for testing
ICache<K, V> getOnHeapCache() {
return onHeapCache;
}

// Package private for testing
ICache<K, V> getDiskCache() {
return diskCache;
}

@Override
public V get(K key) {
return getValueFromTieredCache().apply(key);
}

@Override
public void put(K key, V value) {
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(key, value);
}
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {

V cacheValue = getValueFromTieredCache().apply(key);
if (cacheValue == null) {
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
// This is needed as there can be many requests for the same key at the same time and we only want to load
// the value once.
V value = null;
try (ReleasableLock ignore = writeLock.acquire()) {
value = onHeapCache.computeIfAbsent(key, loader);
}
return value;
}
return cacheValue;
}

@Override
public void invalidate(K key) {
// We are trying to invalidate the key from all caches though it would be present in only of them.
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
// also trigger a hit/miss listener event, so ignoring it for now.
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidate(key);
}
}
}

@Override
public void invalidateAll() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.invalidateAll();
}
}
}

/**
* Provides an iteration over both onHeap and disk keys. This is not protected from any mutations to the cache.
* @return An iterable over (onHeap + disk) keys
*/
@SuppressWarnings("unchecked")
@Override
public Iterable<K> keys() {
return Iterables.concat(onHeapCache.keys(), diskCache.keys());
}

@Override
public long count() {
long count = 0;
for (ICache<K, V> cache : cacheList) {
count += cache.count();
}
return count;
}

@Override
public void refresh() {
try (ReleasableLock ignore = writeLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
cache.refresh();
}
}
}

@Override
public void close() throws IOException {
for (ICache<K, V> cache : cacheList) {
cache.close();
}
}

private Function<K, V> getValueFromTieredCache() {
return key -> {
try (ReleasableLock ignore = readLock.acquire()) {
for (ICache<K, V> cache : cacheList) {
V value = cache.get(key);
if (value != null) {
// update hit stats
return value;
} else {
// update miss stats
}
}
}
return null;
};
}

/**
* Factory to create TieredSpilloverCache objects.
*/
public static class TieredSpilloverCacheFactory implements ICache.Factory {

/**
* Defines cache name
*/
public static final String TIERED_SPILLOVER_CACHE_NAME = "tiered_spillover";

/**
* Default constructor
*/
public TieredSpilloverCacheFactory() {}

@Override
public <K, V> ICache<K, V> create(CacheConfig<K, V> config, CacheType cacheType, Map<String, Factory> cacheFactories) {
Settings settings = config.getSettings();
Setting<String> onHeapSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String onHeapCacheStoreName = onHeapSetting.get(settings);
if (!cacheFactories.containsKey(onHeapCacheStoreName)) {
throw new IllegalArgumentException(
"No associated onHeapCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory onHeapCacheFactory = cacheFactories.get(onHeapCacheStoreName);

Setting<String> onDiskSetting = TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
cacheType.getSettingPrefix()
);
String diskCacheStoreName = onDiskSetting.get(settings);
if (!cacheFactories.containsKey(diskCacheStoreName)) {
throw new IllegalArgumentException(
"No associated diskCache found for tieredSpilloverCache for " + "cacheType:" + cacheType
);
}
ICache.Factory diskCacheFactory = cacheFactories.get(diskCacheStoreName);
return new Builder<K, V>().setDiskCacheFactory(diskCacheFactory)
.setOnHeapCacheFactory(onHeapCacheFactory)
.setRemovalListener(config.getRemovalListener())
.setCacheConfig(config)
.setCacheType(cacheType)
.build();
}

@Override
public String getCacheName() {
return TIERED_SPILLOVER_CACHE_NAME;
}
}

/**
* Builder object for tiered spillover cache.
* @param <K> Type of key
* @param <V> Type of value
*/
public static class Builder<K, V> {
private ICache.Factory onHeapCacheFactory;
private ICache.Factory diskCacheFactory;
private RemovalListener<K, V> removalListener;
private CacheConfig<K, V> cacheConfig;
private CacheType cacheType;
private Map<String, ICache.Factory> cacheFactories;

/**
* Default constructor
*/
public Builder() {}

/**
* Set onHeap cache factory
* @param onHeapCacheFactory Factory for onHeap cache.
* @return builder
*/
public Builder<K, V> setOnHeapCacheFactory(ICache.Factory onHeapCacheFactory) {
this.onHeapCacheFactory = onHeapCacheFactory;
return this;
}

/**
* Set disk cache factory
* @param diskCacheFactory Factory for disk cache.
* @return builder
*/
public Builder<K, V> setDiskCacheFactory(ICache.Factory diskCacheFactory) {
this.diskCacheFactory = diskCacheFactory;
return this;
}

/**
* Set removal listener for tiered cache.
* @param removalListener Removal listener
* @return builder
*/
public Builder<K, V> setRemovalListener(RemovalListener<K, V> removalListener) {
this.removalListener = removalListener;
return this;
}

/**
* Set cache config.
* @param cacheConfig cache config.
* @return builder
*/
public Builder<K, V> setCacheConfig(CacheConfig<K, V> cacheConfig) {
this.cacheConfig = cacheConfig;
return this;
}

/**
* Set cache type.
* @param cacheType Cache type
* @return builder
*/
public Builder<K, V> setCacheType(CacheType cacheType) {
this.cacheType = cacheType;
return this;
}

/**
* Set cache factories
* @param cacheFactories cache factories
* @return builder
*/
public Builder<K, V> setCacheFactories(Map<String, ICache.Factory> cacheFactories) {
this.cacheFactories = cacheFactories;
return this;
}

/**
* Build tiered spillover cache.
* @return TieredSpilloverCache
*/
public TieredSpilloverCache<K, V> build() {
return new TieredSpilloverCache<>(this);
}
}
}
Loading

0 comments on commit 80d2130

Please sign in to comment.