Skip to content

Commit

Permalink
Add restore level safeguards to prevent file cache oversubscription (#…
Browse files Browse the repository at this point in the history
…8606)

Signed-off-by: Kunal Kotwani <kkotwani@amazon.com>
  • Loading branch information
kotwanikunal committed Jul 29, 2023
1 parent 883559c commit a3aab67
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 20 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add server version as REST response header [#6583](https://github.com/opensearch-project/OpenSearch/issues/6583)
- Start replication checkpointTimers on primary before segments upload to remote store. ([#8221]()https://github.com/opensearch-project/OpenSearch/pull/8221)
- [distribution/archives] [Linux] [x64] Provide the variant of the distributions bundled with JRE ([#8195]()https://github.com/opensearch-project/OpenSearch/pull/8195)
- Add configuration for file cache size to max remote data ratio to prevent oversubscription of file cache ([#8606](https://github.com/opensearch-project/OpenSearch/pull/8606))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ public String snapshotUuid() {
/**
* Sets the storage type for this request.
*/
RestoreSnapshotRequest storageType(StorageType storageType) {
public RestoreSnapshotRequest storageType(StorageType storageType) {
this.storageType = storageType;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,16 @@ public ShardsIterator allShardsIncludingRelocationTargets(String[] indices) {
return allShardsSatisfyingPredicate(indices, shardRouting -> true, true);
}

/**
* All the shards on the node which match the predicate
* @param predicate condition to match
* @return iterator over shards matching the predicate
*/
public ShardsIterator allShardsSatisfyingPredicate(Predicate<ShardRouting> predicate) {
String[] indices = indicesRouting.keySet().toArray(new String[0]);
return allShardsSatisfyingPredicate(indices, predicate, false);
}

private ShardsIterator allShardsSatisfyingPredicate(
String[] indices,
Predicate<ShardRouting> predicate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@
import static org.opensearch.cluster.routing.RoutingPool.getShardPool;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING;
import static org.opensearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_LOW_DISK_WATERMARK_SETTING;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;

/**
* The {@link DiskThresholdDecider} checks that the node a shard is potentially
Expand Down Expand Up @@ -199,8 +199,8 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
final FileCacheStats fileCacheStats = clusterInfo.getNodeFileCacheStats().getOrDefault(node.nodeId(), null);
final long nodeCacheSize = fileCacheStats != null ? fileCacheStats.getTotal().getBytes() : 0;
final long totalNodeRemoteShardSize = currentNodeRemoteShardSize + shardSize;

if (totalNodeRemoteShardSize > DATA_TO_FILE_CACHE_SIZE_RATIO * nodeCacheSize) {
final double dataToFileCacheSizeRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(allocation.metadata().settings());
if (dataToFileCacheSizeRatio > 0.0f && totalNodeRemoteShardSize > dataToFileCacheSizeRatio * nodeCacheSize) {
return allocation.decision(
Decision.NO,
NAME,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.opensearch.index.ShardIndexingPressureMemoryManager;
import org.opensearch.index.ShardIndexingPressureSettings;
import org.opensearch.index.ShardIndexingPressureStore;
import org.opensearch.index.store.remote.filecache.FileCache;
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
Expand Down Expand Up @@ -643,6 +644,7 @@ public void apply(Settings value, Settings current, Settings previous) {

// Settings related to Searchable Snapshots
Node.NODE_SEARCH_CACHE_SIZE_SETTING,
FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING,

// Settings related to Remote Refresh Segment Pressure
RemoteRefreshSegmentPressureSettings.REMOTE_REFRESH_SEGMENT_PRESSURE_ENABLED,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import org.apache.lucene.store.IndexInput;
import org.opensearch.common.breaker.CircuitBreaker;
import org.opensearch.common.breaker.CircuitBreakingException;
import org.opensearch.common.settings.Setting;
import org.opensearch.index.store.remote.utils.cache.CacheUsage;
import org.opensearch.index.store.remote.utils.cache.RefCountedCache;
import org.opensearch.index.store.remote.utils.cache.SegmentedCache;
Expand Down Expand Up @@ -49,8 +50,20 @@ public class FileCache implements RefCountedCache<Path, CachedIndexInput> {

private final CircuitBreaker circuitBreaker;

// TODO: Convert the constant into an integer setting
public static final int DATA_TO_FILE_CACHE_SIZE_RATIO = 5;
/**
* Defines a limit of how much total remote data can be referenced as a ratio of the size of the disk reserved for
* the file cache. For example, if 100GB disk space is configured for use as a file cache and the
* remote_data_ratio of 5 is defined, then a total of 500GB of remote data can be loaded as searchable snapshots.
* This is designed to be a safeguard to prevent oversubscribing a cluster.
* Specify a value of zero for no limit, which is the default for compatibility reasons.
*/
public static final Setting<Double> DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING = Setting.doubleSetting(
"cluster.filecache.remote_data_ratio",
0.0,
0.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);

public FileCache(SegmentedCache<Path, CachedIndexInput> cache, CircuitBreaker circuitBreaker) {
this.theCache = cache;
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -941,8 +941,9 @@ protected Node(
clusterModule.getAllocationService(),
metadataCreateIndexService,
metadataIndexUpgradeService,
clusterService.getClusterSettings(),
shardLimitValidator
shardLimitValidator,
indicesService,
clusterInfoService::getClusterInfo
);

final DiskThresholdMonitor diskThresholdMonitor = new DiskThresholdMonitor(
Expand Down
83 changes: 73 additions & 10 deletions server/src/main/java/org/opensearch/snapshots/RestoreService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.action.admin.cluster.snapshots.restore.RestoreSnapshotRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterInfo;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.ClusterStateApplier;
import org.opensearch.cluster.ClusterStateTaskConfig;
Expand Down Expand Up @@ -69,6 +70,7 @@
import org.opensearch.cluster.routing.RoutingChangesObserver;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardsIterator;
import org.opensearch.cluster.routing.UnassignedInfo;
import org.opensearch.cluster.routing.allocation.AllocationService;
import org.opensearch.cluster.service.ClusterManagerTaskKeys;
Expand All @@ -87,6 +89,9 @@
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.snapshots.IndexShardSnapshotStatus;
import org.opensearch.index.store.remote.filecache.FileCacheStats;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.ShardLimitValidator;
import org.opensearch.repositories.IndexId;
import org.opensearch.repositories.RepositoriesService;
Expand All @@ -104,6 +109,7 @@
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableSet;
Expand All @@ -119,6 +125,8 @@
import static org.opensearch.common.util.FeatureFlags.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY;
import static org.opensearch.common.util.set.Sets.newHashSet;
import static org.opensearch.index.store.remote.directory.RemoteSnapshotDirectory.SEARCHABLE_SNAPSHOT_EXTENDED_COMPATIBILITY_MINIMUM_VERSION;
import static org.opensearch.index.store.remote.filecache.FileCache.DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING;
import static org.opensearch.node.Node.NODE_SEARCH_CACHE_SIZE_SETTING;
import static org.opensearch.snapshots.SnapshotUtils.filterIndices;

/**
Expand Down Expand Up @@ -177,6 +185,10 @@ public class RestoreService implements ClusterStateApplier {

private final ClusterSettings clusterSettings;

private final IndicesService indicesService;

private final Supplier<ClusterInfo> clusterInfoSupplier;

private final ClusterManagerTaskThrottler.ThrottlingKey restoreSnapshotTaskKey;

private static final CleanRestoreStateTaskExecutor cleanRestoreStateTaskExecutor = new CleanRestoreStateTaskExecutor();
Expand All @@ -187,8 +199,9 @@ public RestoreService(
AllocationService allocationService,
MetadataCreateIndexService createIndexService,
MetadataIndexUpgradeService metadataIndexUpgradeService,
ClusterSettings clusterSettings,
ShardLimitValidator shardLimitValidator
ShardLimitValidator shardLimitValidator,
IndicesService indicesService,
Supplier<ClusterInfo> clusterInfoSupplier
) {
this.clusterService = clusterService;
this.repositoriesService = repositoriesService;
Expand All @@ -200,6 +213,8 @@ public RestoreService(
}
this.clusterSettings = clusterService.getClusterSettings();
this.shardLimitValidator = shardLimitValidator;
this.indicesService = indicesService;
this.clusterInfoSupplier = clusterInfoSupplier;

// Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction.
restoreSnapshotTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.RESTORE_SNAPSHOT_KEY, true);
Expand Down Expand Up @@ -415,7 +430,6 @@ public ClusterManagerTaskThrottler.ThrottlingKey getClusterManagerThrottlingKey(

@Override
public ClusterState execute(ClusterState currentState) {
RestoreInProgress restoreInProgress = currentState.custom(RestoreInProgress.TYPE, RestoreInProgress.EMPTY);
// Check if the snapshot to restore is currently being deleted
SnapshotDeletionsInProgress deletionsInProgress = currentState.custom(
SnapshotDeletionsInProgress.TYPE,
Expand All @@ -436,7 +450,9 @@ public ClusterState execute(ClusterState currentState) {
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
RoutingTable.Builder rtBuilder = RoutingTable.builder(currentState.routingTable());
final Map<ShardId, RestoreInProgress.ShardRestoreStatus> shards;
final boolean isRemoteSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString());
Set<String> aliases = new HashSet<>();
long totalRestorableRemoteIndexesSize = 0;

if (indices.isEmpty() == false) {
// We have some indices to restore
Expand All @@ -447,17 +463,14 @@ public ClusterState execute(ClusterState currentState) {
String index = indexEntry.getValue();
boolean partial = checkPartial(index);

IndexId snapshotIndexId = repositoryData.resolveIndexId(index);
IndexMetadata snapshotIndexMetadata = updateIndexSettings(
metadata.index(index),
request.indexSettings(),
request.ignoreIndexSettings()
);
if (IndexModule.Type.REMOTE_SNAPSHOT.match(request.storageType().toString())) {
snapshotIndexMetadata = addSnapshotToIndexSettings(
snapshotIndexMetadata,
snapshot,
repositoryData.resolveIndexId(index)
);
if (isRemoteSnapshot) {
snapshotIndexMetadata = addSnapshotToIndexSettings(snapshotIndexMetadata, snapshot, snapshotIndexId);
}
final boolean isSearchableSnapshot = IndexModule.Type.REMOTE_SNAPSHOT.match(
snapshotIndexMetadata.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey())
Expand All @@ -483,7 +496,7 @@ public ClusterState execute(ClusterState currentState) {
restoreUUID,
snapshot,
snapshotInfo.version(),
repositoryData.resolveIndexId(index),
snapshotIndexId,
isSearchableSnapshot,
isRemoteStoreShallowCopy,
request.getSourceRemoteStoreRepository()
Expand Down Expand Up @@ -602,6 +615,14 @@ public ClusterState execute(ClusterState currentState) {
}

for (int shard = 0; shard < snapshotIndexMetadata.getNumberOfShards(); shard++) {
if (isRemoteSnapshot) {
IndexShardSnapshotStatus.Copy shardStatus = repository.getShardSnapshotStatus(
snapshotInfo.snapshotId(),
snapshotIndexId,
new ShardId(metadata.index(index).getIndex(), shard)
).asCopy();
totalRestorableRemoteIndexesSize += shardStatus.getTotalSize();
}
if (!ignoreShards.contains(shard)) {
shardsBuilder.put(
new ShardId(renamedIndex, shard),
Expand Down Expand Up @@ -638,6 +659,9 @@ public ClusterState execute(ClusterState currentState) {
}

checkAliasNameConflicts(indices, aliases);
if (isRemoteSnapshot) {
validateSearchableSnapshotRestorable(totalRestorableRemoteIndexesSize);
}

Map<String, DataStream> updatedDataStreams = new HashMap<>(currentState.metadata().dataStreams());
updatedDataStreams.putAll(
Expand Down Expand Up @@ -837,6 +861,45 @@ private IndexMetadata updateIndexSettings(
return builder.settings(settingsBuilder).build();
}

private void validateSearchableSnapshotRestorable(long totalRestorableRemoteIndexesSize) {
ClusterInfo clusterInfo = clusterInfoSupplier.get();
double remoteDataToFileCacheRatio = DATA_TO_FILE_CACHE_SIZE_RATIO_SETTING.get(clusterService.getSettings());
Map<String, FileCacheStats> nodeFileCacheStats = clusterInfo.getNodeFileCacheStats();
if (nodeFileCacheStats.isEmpty() || remoteDataToFileCacheRatio <= 0.01f) {
return;
}

long totalNodeFileCacheSize = clusterInfo.getNodeFileCacheStats()
.values()
.stream()
.map(fileCacheStats -> fileCacheStats.getTotal().getBytes())
.mapToLong(Long::longValue)
.sum();

Predicate<ShardRouting> isRemoteSnapshotShard = shardRouting -> shardRouting.primary()
&& indicesService.indexService(shardRouting.index()).getIndexSettings().isRemoteSnapshot();

ShardsIterator shardsIterator = clusterService.state()
.routingTable()
.allShardsSatisfyingPredicate(isRemoteSnapshotShard);

long totalRestoredRemoteIndexesSize = shardsIterator.getShardRoutings()
.stream()
.map(clusterInfo::getShardSize)
.mapToLong(Long::longValue)
.sum();

if (totalRestoredRemoteIndexesSize + totalRestorableRemoteIndexesSize > remoteDataToFileCacheRatio
* totalNodeFileCacheSize) {
throw new SnapshotRestoreException(
snapshot,
"Size of the indexes to be restored exceeds the file cache bounds. Increase the file cache capacity on the cluster nodes using "
+ NODE_SEARCH_CACHE_SIZE_SETTING.getKey()
+ " setting."
);
}
}

@Override
public void onFailure(String source, Exception e) {
logger.warn(() -> new ParameterizedMessage("[{}] failed to restore snapshot", snapshotId), e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ private RestoreSnapshotRequest randomState(RestoreSnapshotRequest instance) {
instance.snapshotUuid(randomBoolean() ? null : randomAlphaOfLength(10));
}

instance.storageType(
randomBoolean() ? RestoreSnapshotRequest.StorageType.LOCAL : RestoreSnapshotRequest.StorageType.REMOTE_SNAPSHOT
);

if (randomBoolean()) {
instance.setSourceRemoteStoreRepository(randomAlphaOfLengthBetween(5, 10));
}
Expand Down

0 comments on commit a3aab67

Please sign in to comment.