diff --git a/CHANGELOG.md b/CHANGELOG.md index 963ccdcdc9a6b..d56d9ce5ac9c3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -33,6 +33,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Recommissioning of zone. REST layer support. ([#4624](https://github.com/opensearch-project/OpenSearch/pull/4604)) - Added in-flight cancellation of SearchShardTask based on resource consumption ([#4565](https://github.com/opensearch-project/OpenSearch/pull/4565)) - Apply reproducible builds configuration for OpenSearch plugins through gradle plugin ([#4746](https://github.com/opensearch-project/OpenSearch/pull/4746)) +- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805)) ### Dependencies - Bumps `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 320cb5457b21c..b8e3fb41391f9 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -595,11 +595,12 @@ public void apply(Settings value, Settings current, Settings previous) { NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES, NodeDuressSettings.SETTING_CPU_THRESHOLD, NodeDuressSettings.SETTING_HEAP_THRESHOLD, - SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_THRESHOLD, + SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, + SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD, SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, - SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD, - SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD + SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD, + SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/common/util/TokenBucket.java b/server/src/main/java/org/opensearch/common/util/TokenBucket.java index e47f152d71363..d2e7e836bf07f 100644 --- a/server/src/main/java/org/opensearch/common/util/TokenBucket.java +++ b/server/src/main/java/org/opensearch/common/util/TokenBucket.java @@ -101,9 +101,9 @@ public boolean request() { */ private static class State { final double tokens; - final double lastRefilledAt; + final long lastRefilledAt; - public State(double tokens, double lastRefilledAt) { + public State(double tokens, long lastRefilledAt) { this.tokens = tokens; this.lastRefilledAt = lastRefilledAt; } @@ -113,7 +113,7 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; State state = (State) o; - return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0; + return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt; } @Override diff --git a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java index 885846a177d60..2d1779c4049e5 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java +++ b/server/src/main/java/org/opensearch/search/backpressure/SearchBackpressureService.java @@ -17,6 +17,9 @@ import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.monitor.process.ProcessProbe; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; import org.opensearch.search.backpressure.trackers.NodeDuressTracker; import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker; import org.opensearch.tasks.CancellableTask; @@ -29,7 +32,6 @@ import java.io.IOException; import java.util.ArrayList; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -84,7 +86,7 @@ public SearchBackpressureService( () -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold() ) ), - Collections.emptyList() + List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime)) ); } @@ -97,7 +99,7 @@ public SearchBackpressureService( List taskResourceUsageTrackers ) { this.settings = settings; - this.settings.setListener(this); + this.settings.addListener(this); this.taskResourceTrackingService = taskResourceTrackingService; this.taskResourceTrackingService.addTaskCompletionListener(this); this.threadPool = threadPool; @@ -182,7 +184,7 @@ boolean isNodeInDuress() { */ boolean isHeapUsageDominatedBySearch(List searchShardTasks) { long runningTasksHeapUsage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum(); - long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapThresholdBytes(); + long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold(); if (runningTasksHeapUsage < searchTasksHeapThreshold) { logger.debug("heap usage not dominated by search requests [{}/{}]", runningTasksHeapUsage, searchTasksHeapThreshold); return false; diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java index 4834808d768f1..5ceb01666757f 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchBackpressureSettings.java @@ -8,13 +8,16 @@ package org.opensearch.search.backpressure.settings; -import org.apache.lucene.util.SetOnce; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; /** * Settings related to search backpressure and cancellation of in-flight requests. @@ -23,7 +26,7 @@ */ public class SearchBackpressureSettings { private static class Defaults { - private static final long INTERVAL = 1000; + private static final long INTERVAL_MILLIS = 1000; private static final boolean ENABLED = true; private static final boolean ENFORCED = false; @@ -37,9 +40,9 @@ private static class Defaults { * Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks. */ private final TimeValue interval; - public static final Setting SETTING_INTERVAL = Setting.longSetting( - "search_backpressure.interval", - Defaults.INTERVAL, + public static final Setting SETTING_INTERVAL_MILLIS = Setting.longSetting( + "search_backpressure.interval_millis", + Defaults.INTERVAL_MILLIS, 1, Setting.Property.NodeScope ); @@ -116,15 +119,19 @@ public interface Listener { void onCancellationBurstChanged(); } - private final SetOnce listener = new SetOnce<>(); + private final List listeners = new ArrayList<>(); + private final Settings settings; + private final ClusterSettings clusterSettings; private final NodeDuressSettings nodeDuressSettings; private final SearchShardTaskSettings searchShardTaskSettings; public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) { + this.settings = settings; + this.clusterSettings = clusterSettings; this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings); this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings); - interval = new TimeValue(SETTING_INTERVAL.get(settings)); + interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings)); enabled = SETTING_ENABLED.get(settings); clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled); @@ -142,8 +149,16 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst); } - public void setListener(Listener listener) { - this.listener.set(listener); + public void addListener(Listener listener) { + listeners.add(listener); + } + + public Settings getSettings() { + return settings; + } + + public ClusterSettings getClusterSettings() { + return clusterSettings; } public NodeDuressSettings getNodeDuressSettings() { @@ -180,9 +195,7 @@ public double getCancellationRatio() { private void setCancellationRatio(double cancellationRatio) { this.cancellationRatio = cancellationRatio; - if (listener.get() != null) { - listener.get().onCancellationRatioChanged(); - } + notifyListeners(Listener::onCancellationRatioChanged); } public double getCancellationRate() { @@ -195,9 +208,7 @@ public double getCancellationRateNanos() { private void setCancellationRate(double cancellationRate) { this.cancellationRate = cancellationRate; - if (listener.get() != null) { - listener.get().onCancellationRateChanged(); - } + notifyListeners(Listener::onCancellationRateChanged); } public double getCancellationBurst() { @@ -206,8 +217,20 @@ public double getCancellationBurst() { private void setCancellationBurst(double cancellationBurst) { this.cancellationBurst = cancellationBurst; - if (listener.get() != null) { - listener.get().onCancellationBurstChanged(); + notifyListeners(Listener::onCancellationBurstChanged); + } + + private void notifyListeners(Consumer consumer) { + List exceptions = new ArrayList<>(); + + for (Listener listener : listeners) { + try { + consumer.accept(listener); + } catch (Exception e) { + exceptions.add(e); + } } + + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java index 1126dad78f554..8a3508b1ee15e 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -8,11 +8,16 @@ package org.opensearch.search.backpressure.settings; +import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.monitor.jvm.JvmStats; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; + /** * Defines the settings related to the cancellation of SearchShardTasks. * @@ -22,21 +27,22 @@ public class SearchShardTaskSettings { private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); private static class Defaults { - private static final double TOTAL_HEAP_THRESHOLD = 0.05; - private static final double HEAP_THRESHOLD = 0.005; + private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; + private static final double HEAP_PERCENT_THRESHOLD = 0.005; private static final double HEAP_VARIANCE_THRESHOLD = 2.0; - private static final long CPU_TIME_THRESHOLD = 15; - private static final long ELAPSED_TIME_THRESHOLD = 30000; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + private static final long CPU_TIME_MILLIS_THRESHOLD = 15; + private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; } /** * Defines the heap usage threshold (in percentage) for the sum of heap usages across all search shard tasks * before in-flight cancellation is applied. */ - private volatile double totalHeapThreshold; - public static final Setting SETTING_TOTAL_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.total_heap_threshold", - Defaults.TOTAL_HEAP_THRESHOLD, + private volatile double totalHeapPercentThreshold; + public static final Setting SETTING_TOTAL_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.total_heap_percent_threshold", + Defaults.TOTAL_HEAP_PERCENT_THRESHOLD, 0.0, 1.0, Setting.Property.Dynamic, @@ -46,10 +52,10 @@ private static class Defaults { /** * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. */ - private volatile double heapThreshold; - public static final Setting SETTING_HEAP_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_threshold", - Defaults.HEAP_THRESHOLD, + private volatile double heapPercentThreshold; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_percent_threshold", + Defaults.HEAP_PERCENT_THRESHOLD, 0.0, 1.0, Setting.Property.Dynamic, @@ -69,13 +75,25 @@ private static class Defaults { Setting.Property.NodeScope ); + /** + * Defines the window size to calculate the moving average of heap usage of completed tasks. + */ + private volatile int heapMovingAverageWindowSize; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "search_backpressure.search_shard_task.heap_moving_average_window_size", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. */ - private volatile long cpuTimeThreshold; - public static final Setting SETTING_CPU_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.cpu_time_threshold", - Defaults.CPU_TIME_THRESHOLD, + private volatile long cpuTimeMillisThreshold; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.cpu_time_millis_threshold", + Defaults.CPU_TIME_MILLIS_THRESHOLD, 0, Setting.Property.Dynamic, Setting.Property.NodeScope @@ -84,54 +102,70 @@ private static class Defaults { /** * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. */ - private volatile long elapsedTimeThreshold; - public static final Setting SETTING_ELAPSED_TIME_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.elapsed_time_threshold", - Defaults.ELAPSED_TIME_THRESHOLD, + private volatile long elapsedTimeMillisThreshold; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.elapsed_time_millis_threshold", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, 0, Setting.Property.Dynamic, Setting.Property.NodeScope ); + /** + * Callback listeners. + */ + public interface Listener { + void onHeapMovingAverageWindowSizeChanged(int newWindowSize); + } + + private final List listeners = new ArrayList<>(); + public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) { - totalHeapThreshold = SETTING_TOTAL_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_THRESHOLD, this::setTotalHeapThreshold); + totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold); - heapThreshold = SETTING_HEAP_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_THRESHOLD, this::setHeapThreshold); + heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); - cpuTimeThreshold = SETTING_CPU_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_THRESHOLD, this::setCpuTimeThreshold); + heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + + cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); + + elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings); + clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); + } - elapsedTimeThreshold = SETTING_ELAPSED_TIME_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_THRESHOLD, this::setElapsedTimeThreshold); + public void addListener(Listener listener) { + listeners.add(listener); } - public double getTotalHeapThreshold() { - return totalHeapThreshold; + public double getTotalHeapPercentThreshold() { + return totalHeapPercentThreshold; } - public long getTotalHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getTotalHeapThreshold()); + public long getTotalHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * getTotalHeapPercentThreshold()); } - private void setTotalHeapThreshold(double totalHeapThreshold) { - this.totalHeapThreshold = totalHeapThreshold; + private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { + this.totalHeapPercentThreshold = totalHeapPercentThreshold; } - public double getHeapThreshold() { - return heapThreshold; + public double getHeapPercentThreshold() { + return heapPercentThreshold; } - public long getHeapThresholdBytes() { - return (long) (HEAP_SIZE_BYTES * getHeapThreshold()); + public long getHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * getHeapPercentThreshold()); } - private void setHeapThreshold(double heapThreshold) { - this.heapThreshold = heapThreshold; + private void setHeapPercentThreshold(double heapPercentThreshold) { + this.heapPercentThreshold = heapPercentThreshold; } public double getHeapVarianceThreshold() { @@ -142,19 +176,45 @@ private void setHeapVarianceThreshold(double heapVarianceThreshold) { this.heapVarianceThreshold = heapVarianceThreshold; } - public long getCpuTimeThreshold() { - return cpuTimeThreshold; + public int getHeapMovingAverageWindowSize() { + return heapMovingAverageWindowSize; + } + + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { + this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; + + List exceptions = new ArrayList<>(); + for (Listener listener : listeners) { + try { + listener.onHeapMovingAverageWindowSizeChanged(heapMovingAverageWindowSize); + } catch (Exception e) { + exceptions.add(e); + } + } + ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); + } + + public long getCpuTimeMillisThreshold() { + return cpuTimeMillisThreshold; + } + + public long getCpuTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(getCpuTimeMillisThreshold()); + } + + private void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { + this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; } - private void setCpuTimeThreshold(long cpuTimeThreshold) { - this.cpuTimeThreshold = cpuTimeThreshold; + public long getElapsedTimeMillisThreshold() { + return elapsedTimeMillisThreshold; } - public long getElapsedTimeThreshold() { - return elapsedTimeThreshold; + public long getElapsedTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(getElapsedTimeMillisThreshold()); } - private void setElapsedTimeThreshold(long elapsedTimeThreshold) { - this.elapsedTimeThreshold = elapsedTimeThreshold; + private void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { + this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java new file mode 100644 index 0000000000000..6a6f601642676 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +/** + * CpuUsageTracker evaluates if the task has consumed too many CPU cycles than allowed. + * + * @opensearch.internal + */ +public class CpuUsageTracker extends TaskResourceUsageTracker { + public static final String NAME = "cpu_usage_tracker"; + + private final LongSupplier cpuTimeNanosThresholdSupplier; + + public CpuUsageTracker(SearchBackpressureSettings settings) { + this.cpuTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getCpuTimeNanosThreshold(); + } + + @Override + public String name() { + return NAME; + } + + @Override + public void update(Task task) {} + + @Override + public Optional cancellationReason(Task task) { + long usage = task.getTotalResourceStats().getCpuTimeInNanos(); + long threshold = cpuTimeNanosThresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "cpu usage exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java new file mode 100644 index 0000000000000..6e14113b7245a --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -0,0 +1,64 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.unit.TimeValue; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.function.LongSupplier; + +/** + * ElapsedTimeTracker evaluates if the task has been running for more time than allowed. + * + * @opensearch.internal + */ +public class ElapsedTimeTracker extends TaskResourceUsageTracker { + public static final String NAME = "elapsed_time_tracker"; + + private final LongSupplier timeNanosSupplier; + private final LongSupplier elapsedTimeNanosThresholdSupplier; + + public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) { + this.timeNanosSupplier = timeNanosSupplier; + this.elapsedTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getElapsedTimeNanosThreshold(); + } + + @Override + public String name() { + return NAME; + } + + @Override + public void update(Task task) {} + + @Override + public Optional cancellationReason(Task task) { + long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); + long threshold = elapsedTimeNanosThresholdSupplier.getAsLong(); + + if (usage < threshold) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "elapsed time exceeded [" + + new TimeValue(usage, TimeUnit.NANOSECONDS) + + " >= " + + new TimeValue(threshold, TimeUnit.NANOSECONDS) + + "]", + 1 // TODO: fine-tune the cancellation score/weight + ) + ); + } +} diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java new file mode 100644 index 0000000000000..ed5e7165000e9 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.common.unit.ByteSizeValue; +import org.opensearch.common.util.MovingAverage; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; + +import java.util.Optional; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.DoubleSupplier; +import java.util.function.LongSupplier; + +/** + * HeapUsageTracker evaluates if the task has consumed too much heap than allowed. + * It also compares the task's heap usage against a historical moving average of previously completed tasks. + * + * @opensearch.internal + */ +public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener { + public static final String NAME = "heap_usage_tracker"; + + private final LongSupplier searchTaskHeapThresholdBytesSupplier; + private final DoubleSupplier searchTaskHeapVarianceThresholdSupplier; + private final AtomicReference movingAverageReference; + + public HeapUsageTracker(SearchBackpressureSettings settings) { + this.searchTaskHeapThresholdBytesSupplier = () -> settings.getSearchShardTaskSettings().getHeapBytesThreshold(); + this.searchTaskHeapVarianceThresholdSupplier = () -> settings.getSearchShardTaskSettings().getHeapVarianceThreshold(); + this.movingAverageReference = new AtomicReference<>( + new MovingAverage(settings.getSearchShardTaskSettings().getHeapMovingAverageWindowSize()) + ); + settings.getSearchShardTaskSettings().addListener(this); + } + + @Override + public String name() { + return NAME; + } + + @Override + public void update(Task task) { + movingAverageReference.get().record(task.getTotalResourceStats().getMemoryInBytes()); + } + + @Override + public Optional cancellationReason(Task task) { + MovingAverage movingAverage = movingAverageReference.get(); + + // There haven't been enough measurements. + if (movingAverage.isReady() == false) { + return Optional.empty(); + } + + double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); + double averageUsage = movingAverage.getAverage(); + double allowedUsage = averageUsage * searchTaskHeapVarianceThresholdSupplier.getAsDouble(); + + if (currentUsage < searchTaskHeapThresholdBytesSupplier.getAsLong() || currentUsage < allowedUsage) { + return Optional.empty(); + } + + return Optional.of( + new TaskCancellation.Reason( + "heap usage exceeded [" + new ByteSizeValue((long) currentUsage) + " >= " + new ByteSizeValue((long) allowedUsage) + "]", + (int) (currentUsage / averageUsage) // TODO: fine-tune the cancellation score/weight + ) + ); + } + + @Override + public void onHeapMovingAverageWindowSizeChanged(int newWindowSize) { + movingAverageReference.set(new MovingAverage(newWindowSize)); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java index ac5a8229718ba..cf06191d0dcbd 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/SearchBackpressureServiceTests.java @@ -170,7 +170,7 @@ public Optional cancellationReason(Task task) { // Mocking 'settings' with predictable searchHeapThresholdBytes so that cancellation logic doesn't get skipped. long taskHeapUsageBytes = 500; SearchShardTaskSettings shardTaskSettings = mock(SearchShardTaskSettings.class); - when(shardTaskSettings.getTotalHeapThresholdBytes()).thenReturn(taskHeapUsageBytes); + when(shardTaskSettings.getTotalHeapBytesThreshold()).thenReturn(taskHeapUsageBytes); when(settings.getSearchShardTaskSettings()).thenReturn(shardTaskSettings); // Create a mix of low and high resource usage tasks (60 low + 15 high resource usage tasks). diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java new file mode 100644 index 0000000000000..a4b5d4ee76777 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -0,0 +1,49 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class CpuUsageTrackerTests extends OpenSearchTestCase { + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + public void testEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 200000000, 200); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + + Optional reason = tracker.cancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("cpu usage exceeded [200ms >= 15ms]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 5000000, 200); + CpuUsageTracker tracker = new CpuUsageTracker(mockSettings); + + Optional reason = tracker.cancellationReason(task); + assertFalse(reason.isPresent()); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java new file mode 100644 index 0000000000000..b3e33be671e04 --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -0,0 +1,50 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class ElapsedTimeTrackerTests extends OpenSearchTestCase { + + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); + + public void testEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 0); + ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + + Optional reason = tracker.cancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(1, reason.get().getCancellationScore()); + assertEquals("elapsed time exceeded [200ms >= 100ms]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 1, 150000000); + ElapsedTimeTracker tracker = new ElapsedTimeTracker(mockSettings, () -> 200000000); + + Optional reason = tracker.cancellationReason(task); + assertFalse(reason.isPresent()); + } +} diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java new file mode 100644 index 0000000000000..4043e466e424a --- /dev/null +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -0,0 +1,80 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.backpressure.trackers; + +import org.opensearch.action.search.SearchShardTask; +import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; +import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.tasks.Task; +import org.opensearch.tasks.TaskCancellation; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.Optional; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; + +public class HeapUsageTrackerTests extends OpenSearchTestCase { + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + private static final SearchBackpressureSettings mockSettings = mock(SearchBackpressureSettings.class); + private static final SearchShardTaskSettings mockSearchShardTaskSettings = mock(SearchShardTaskSettings.class); + + static { + when(mockSettings.getSearchShardTaskSettings()).thenReturn(mockSearchShardTaskSettings); + when(mockSearchShardTaskSettings.getHeapBytesThreshold()).thenReturn(100L); + when(mockSearchShardTaskSettings.getHeapVarianceThreshold()).thenReturn(2.0); + when(mockSearchShardTaskSettings.getHeapMovingAverageWindowSize()).thenReturn(HEAP_MOVING_AVERAGE_WINDOW_SIZE); + } + + public void testEligibleForCancellation() { + HeapUsageTracker tracker = new HeapUsageTracker(mockSettings); + Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); + + // Record enough observations to make the moving average 'ready'. + for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { + tracker.update(task); + } + + // Task that has heap usage >= searchTaskHeapThresholdBytes and (moving average * variance). + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 200); + Optional reason = tracker.cancellationReason(task); + assertTrue(reason.isPresent()); + assertEquals(4, reason.get().getCancellationScore()); + assertEquals("heap usage exceeded [200b >= 100b]", reason.get().getMessage()); + } + + public void testNotEligibleForCancellation() { + Task task; + Optional reason; + HeapUsageTracker tracker = new HeapUsageTracker(mockSettings); + + // Task with heap usage < searchTaskHeapThresholdBytes. + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99); + + // Not enough observations. + reason = tracker.cancellationReason(task); + assertFalse(reason.isPresent()); + + // Record enough observations to make the moving average 'ready'. + for (int i = 0; i < HEAP_MOVING_AVERAGE_WINDOW_SIZE; i++) { + tracker.update(task); + } + + // Task with heap usage < searchTaskHeapThresholdBytes should not be cancelled. + reason = tracker.cancellationReason(task); + assertFalse(reason.isPresent()); + + // Task with heap usage between [searchTaskHeapThresholdBytes, moving average * variance) should not be cancelled. + double allowedHeapUsage = 99.0 * 2.0; + task = createMockTaskWithResourceStats(SearchShardTask.class, 1, randomLongBetween(99, (long) allowedHeapUsage - 1)); + reason = tracker.cancellationReason(task); + assertFalse(reason.isPresent()); + } +}