diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index bb87023271fbe..dc9b9303b7b40 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -44,6 +44,9 @@ import org.opensearch.search.backpressure.settings.NodeDuressSettings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; +import org.opensearch.search.backpressure.trackers.CpuUsageTracker; +import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker; +import org.opensearch.search.backpressure.trackers.HeapUsageTracker; import org.opensearch.tasks.TaskManager; import org.opensearch.tasks.TaskResourceTrackingService; import org.opensearch.watcher.ResourceWatcherService; @@ -597,11 +600,11 @@ public void apply(Settings value, Settings current, Settings previous) { NodeDuressSettings.SETTING_CPU_THRESHOLD, NodeDuressSettings.SETTING_HEAP_THRESHOLD, SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD, - SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, - SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD, - SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD + HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD, + HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, + CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD, + ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java index 8a3508b1ee15e..7e40f1c0eab53 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java +++ b/server/src/main/java/org/opensearch/search/backpressure/settings/SearchShardTaskSettings.java @@ -8,16 +8,11 @@ package org.opensearch.search.backpressure.settings; -import org.opensearch.ExceptionsHelper; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.monitor.jvm.JvmStats; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.TimeUnit; - /** * Defines the settings related to the cancellation of SearchShardTasks. * @@ -28,11 +23,6 @@ public class SearchShardTaskSettings { private static class Defaults { private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05; - private static final double HEAP_PERCENT_THRESHOLD = 0.005; - private static final double HEAP_VARIANCE_THRESHOLD = 2.0; - private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; - private static final long CPU_TIME_MILLIS_THRESHOLD = 15; - private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; } /** @@ -49,99 +39,9 @@ private static class Defaults { Setting.Property.NodeScope ); - /** - * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. - */ - private volatile double heapPercentThreshold; - public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_percent_threshold", - Defaults.HEAP_PERCENT_THRESHOLD, - 0.0, - 1.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the heap usage variance for an individual task before it is considered for cancellation. - * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. - */ - private volatile double heapVarianceThreshold; - public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( - "search_backpressure.search_shard_task.heap_variance", - Defaults.HEAP_VARIANCE_THRESHOLD, - 0.0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the window size to calculate the moving average of heap usage of completed tasks. - */ - private volatile int heapMovingAverageWindowSize; - public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( - "search_backpressure.search_shard_task.heap_moving_average_window_size", - Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long cpuTimeMillisThreshold; - public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.cpu_time_millis_threshold", - Defaults.CPU_TIME_MILLIS_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. - */ - private volatile long elapsedTimeMillisThreshold; - public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( - "search_backpressure.search_shard_task.elapsed_time_millis_threshold", - Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, - 0, - Setting.Property.Dynamic, - Setting.Property.NodeScope - ); - - /** - * Callback listeners. - */ - public interface Listener { - void onHeapMovingAverageWindowSizeChanged(int newWindowSize); - } - - private final List listeners = new ArrayList<>(); - public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) { totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings); clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold); - - heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); - - heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); - - heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); - - cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); - - elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings); - clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); - } - - public void addListener(Listener listener) { - listeners.add(listener); } public double getTotalHeapPercentThreshold() { @@ -155,66 +55,4 @@ public long getTotalHeapBytesThreshold() { private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) { this.totalHeapPercentThreshold = totalHeapPercentThreshold; } - - public double getHeapPercentThreshold() { - return heapPercentThreshold; - } - - public long getHeapBytesThreshold() { - return (long) (HEAP_SIZE_BYTES * getHeapPercentThreshold()); - } - - private void setHeapPercentThreshold(double heapPercentThreshold) { - this.heapPercentThreshold = heapPercentThreshold; - } - - public double getHeapVarianceThreshold() { - return heapVarianceThreshold; - } - - private void setHeapVarianceThreshold(double heapVarianceThreshold) { - this.heapVarianceThreshold = heapVarianceThreshold; - } - - public int getHeapMovingAverageWindowSize() { - return heapMovingAverageWindowSize; - } - - public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { - this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; - - List exceptions = new ArrayList<>(); - for (Listener listener : listeners) { - try { - listener.onHeapMovingAverageWindowSizeChanged(heapMovingAverageWindowSize); - } catch (Exception e) { - exceptions.add(e); - } - } - ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions); - } - - public long getCpuTimeMillisThreshold() { - return cpuTimeMillisThreshold; - } - - public long getCpuTimeNanosThreshold() { - return TimeUnit.MILLISECONDS.toNanos(getCpuTimeMillisThreshold()); - } - - private void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { - this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; - } - - public long getElapsedTimeMillisThreshold() { - return elapsedTimeMillisThreshold; - } - - public long getElapsedTimeNanosThreshold() { - return TimeUnit.MILLISECONDS.toNanos(getElapsedTimeMillisThreshold()); - } - - private void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { - this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; - } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java index 03f621dbae06c..fa4f88d8a54e6 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/CpuUsageTracker.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.Task; @@ -15,7 +16,6 @@ import java.util.Optional; import java.util.concurrent.TimeUnit; -import java.util.function.LongSupplier; import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER; @@ -25,10 +25,25 @@ * @opensearch.internal */ public class CpuUsageTracker extends TaskResourceUsageTracker { - private final LongSupplier cpuTimeNanosThresholdSupplier; + private static class Defaults { + private static final long CPU_TIME_MILLIS_THRESHOLD = 15000; + } + + /** + * Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long cpuTimeMillisThreshold; + public static final Setting SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.cpu_time_millis_threshold", + Defaults.CPU_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); public CpuUsageTracker(SearchBackpressureSettings settings) { - this.cpuTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getCpuTimeNanosThreshold(); + this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold); } @Override @@ -39,7 +54,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = task.getTotalResourceStats().getCpuTimeInNanos(); - long threshold = cpuTimeNanosThresholdSupplier.getAsLong(); + long threshold = getCpuTimeNanosThreshold(); if (usage < threshold) { return Optional.empty(); @@ -56,4 +71,12 @@ public Optional checkAndMaybeGetCancellationReason(Task ) ); } + + public long getCpuTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold); + } + + public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) { + this.cpuTimeMillisThreshold = cpuTimeMillisThreshold; + } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java index 84d3b41933002..d76ccdb76db36 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTracker.java @@ -8,6 +8,7 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.TimeValue; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; import org.opensearch.tasks.Task; @@ -25,12 +26,28 @@ * @opensearch.internal */ public class ElapsedTimeTracker extends TaskResourceUsageTracker { + private static class Defaults { + private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000; + } + + /** + * Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation. + */ + private volatile long elapsedTimeMillisThreshold; + public static final Setting SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting( + "search_backpressure.search_shard_task.elapsed_time_millis_threshold", + Defaults.ELAPSED_TIME_MILLIS_THRESHOLD, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private final LongSupplier timeNanosSupplier; - private final LongSupplier elapsedTimeNanosThresholdSupplier; public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) { this.timeNanosSupplier = timeNanosSupplier; - this.elapsedTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getElapsedTimeNanosThreshold(); + this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold); } @Override @@ -41,7 +58,7 @@ public String name() { @Override public Optional checkAndMaybeGetCancellationReason(Task task) { long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos(); - long threshold = elapsedTimeNanosThresholdSupplier.getAsLong(); + long threshold = getElapsedTimeNanosThreshold(); if (usage < threshold) { return Optional.empty(); @@ -58,4 +75,12 @@ public Optional checkAndMaybeGetCancellationReason(Task ) ); } + + public long getElapsedTimeNanosThreshold() { + return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold); + } + + public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) { + this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold; + } } diff --git a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java index b6e7d3c0a797d..983a2e1152511 100644 --- a/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java +++ b/server/src/main/java/org/opensearch/search/backpressure/trackers/HeapUsageTracker.java @@ -8,17 +8,16 @@ package org.opensearch.search.backpressure.trackers; +import org.opensearch.common.settings.Setting; import org.opensearch.common.unit.ByteSizeValue; import org.opensearch.common.util.MovingAverage; +import org.opensearch.monitor.jvm.JvmStats; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.DoubleSupplier; -import java.util.function.LongSupplier; import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.HEAP_USAGE_TRACKER; @@ -28,18 +27,67 @@ * * @opensearch.internal */ -public class HeapUsageTracker extends TaskResourceUsageTracker implements SearchShardTaskSettings.Listener { - private final LongSupplier heapBytesThresholdSupplier; - private final DoubleSupplier heapVarianceThresholdSupplier; +public class HeapUsageTracker extends TaskResourceUsageTracker { + private static final long HEAP_SIZE_BYTES = JvmStats.jvmStats().getMem().getHeapMax().getBytes(); + + private static class Defaults { + private static final double HEAP_PERCENT_THRESHOLD = 0.005; + private static final double HEAP_VARIANCE_THRESHOLD = 2.0; + private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; + } + + /** + * Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation. + */ + private volatile double heapPercentThreshold; + public static final Setting SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_percent_threshold", + Defaults.HEAP_PERCENT_THRESHOLD, + 0.0, + 1.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the heap usage variance for an individual task before it is considered for cancellation. + * A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance. + */ + private volatile double heapVarianceThreshold; + public static final Setting SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting( + "search_backpressure.search_shard_task.heap_variance", + Defaults.HEAP_VARIANCE_THRESHOLD, + 0.0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Defines the window size to calculate the moving average of heap usage of completed tasks. + */ + private volatile int heapMovingAverageWindowSize; + public static final Setting SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting( + "search_backpressure.search_shard_task.heap_moving_average_window_size", + Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE, + 0, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + private final AtomicReference movingAverageReference; public HeapUsageTracker(SearchBackpressureSettings settings) { - this.heapBytesThresholdSupplier = () -> settings.getSearchShardTaskSettings().getHeapBytesThreshold(); - this.heapVarianceThresholdSupplier = () -> settings.getSearchShardTaskSettings().getHeapVarianceThreshold(); - this.movingAverageReference = new AtomicReference<>( - new MovingAverage(settings.getSearchShardTaskSettings().getHeapMovingAverageWindowSize()) - ); - settings.getSearchShardTaskSettings().addListener(this); + heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold); + + heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings.getSettings()); + settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold); + + heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings.getSettings()); + settings.getClusterSettings() + .addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize); + + this.movingAverageReference = new AtomicReference<>(new MovingAverage(heapMovingAverageWindowSize)); } @Override @@ -63,9 +111,9 @@ public Optional checkAndMaybeGetCancellationReason(Task double currentUsage = task.getTotalResourceStats().getMemoryInBytes(); double averageUsage = movingAverage.getAverage(); - double allowedUsage = averageUsage * heapVarianceThresholdSupplier.getAsDouble(); + double allowedUsage = averageUsage * getHeapVarianceThreshold(); - if (currentUsage < heapBytesThresholdSupplier.getAsLong() || currentUsage < allowedUsage) { + if (currentUsage < getHeapBytesThreshold() || currentUsage < allowedUsage) { return Optional.empty(); } @@ -77,8 +125,24 @@ public Optional checkAndMaybeGetCancellationReason(Task ); } - @Override - public void onHeapMovingAverageWindowSizeChanged(int newWindowSize) { - movingAverageReference.set(new MovingAverage(newWindowSize)); + public long getHeapBytesThreshold() { + return (long) (HEAP_SIZE_BYTES * heapPercentThreshold); + } + + public void setHeapPercentThreshold(double heapPercentThreshold) { + this.heapPercentThreshold = heapPercentThreshold; + } + + public double getHeapVarianceThreshold() { + return heapVarianceThreshold; + } + + public void setHeapVarianceThreshold(double heapVarianceThreshold) { + this.heapVarianceThreshold = heapVarianceThreshold; + } + + public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) { + this.heapMovingAverageWindowSize = heapMovingAverageWindowSize; + this.movingAverageReference.set(new MovingAverage(heapMovingAverageWindowSize)); } } diff --git a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java index 2c302172cf6bc..d09312f38e3eb 100644 --- a/server/src/main/java/org/opensearch/tasks/TaskCancellation.java +++ b/server/src/main/java/org/opensearch/tasks/TaskCancellation.java @@ -15,7 +15,10 @@ import java.util.stream.Collectors; /** - * TaskCancellation is a wrapper for a task and its cancellation reasons. + * TaskCancellation represents a task eligible for cancellation. + * It doesn't guarantee that the task will actually get cancelled or not; that decision is left to the caller. + * + * It contains a list of cancellation reasons along with callbacks that are invoked when cancel() is called. * * @opensearch.internal */ diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java index 83c2e3a7673c8..c790fb2e60eea 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/CpuUsageTrackerTests.java @@ -12,7 +12,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; @@ -24,7 +23,7 @@ public class CpuUsageTrackerTests extends OpenSearchTestCase { private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( Settings.builder() - .put(SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms + .put(CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD.getKey(), 15) // 15 ms .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java index 1124c290cd59d..67ed6059a1914 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/ElapsedTimeTrackerTests.java @@ -12,7 +12,6 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; @@ -25,7 +24,7 @@ public class ElapsedTimeTrackerTests extends OpenSearchTestCase { private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( Settings.builder() - .put(SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms + .put(ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.getKey(), 100) // 100 ms .build(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) ); diff --git a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java index 6bb34fd1733a7..b9967da22fbf1 100644 --- a/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java +++ b/server/src/test/java/org/opensearch/search/backpressure/trackers/HeapUsageTrackerTests.java @@ -9,32 +9,34 @@ package org.opensearch.search.backpressure.trackers; import org.opensearch.action.search.SearchShardTask; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; import org.opensearch.search.backpressure.settings.SearchBackpressureSettings; -import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskCancellation; import org.opensearch.test.OpenSearchTestCase; import java.util.Optional; -import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.when; import static org.opensearch.search.backpressure.SearchBackpressureTestHelpers.createMockTaskWithResourceStats; public class HeapUsageTrackerTests extends OpenSearchTestCase { + private static final long HEAP_BYTES_THRESHOLD = 100; private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100; - private static final SearchBackpressureSettings mockSettings = mock(SearchBackpressureSettings.class); - private static final SearchShardTaskSettings mockSearchShardTaskSettings = mock(SearchShardTaskSettings.class); - static { - when(mockSettings.getSearchShardTaskSettings()).thenReturn(mockSearchShardTaskSettings); - when(mockSearchShardTaskSettings.getHeapBytesThreshold()).thenReturn(100L); - when(mockSearchShardTaskSettings.getHeapVarianceThreshold()).thenReturn(2.0); - when(mockSearchShardTaskSettings.getHeapMovingAverageWindowSize()).thenReturn(HEAP_MOVING_AVERAGE_WINDOW_SIZE); - } + private static final SearchBackpressureSettings mockSettings = new SearchBackpressureSettings( + Settings.builder() + .put(HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD.getKey(), 2.0) + .put(HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.getKey(), HEAP_MOVING_AVERAGE_WINDOW_SIZE) + .build(), + new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS) + ); public void testEligibleForCancellation() { - HeapUsageTracker tracker = new HeapUsageTracker(mockSettings); + HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); + when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); Task task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 50); // Record enough observations to make the moving average 'ready'. @@ -53,7 +55,8 @@ public void testEligibleForCancellation() { public void testNotEligibleForCancellation() { Task task; Optional reason; - HeapUsageTracker tracker = new HeapUsageTracker(mockSettings); + HeapUsageTracker tracker = spy(new HeapUsageTracker(mockSettings)); + when(tracker.getHeapBytesThreshold()).thenReturn(HEAP_BYTES_THRESHOLD); // Task with heap usage < heapBytesThreshold. task = createMockTaskWithResourceStats(SearchShardTask.class, 1, 99);