Skip to content

Commit

Permalink
Moved TaskResourceUsageTracker settings to their respective classes
Browse files Browse the repository at this point in the history
Signed-off-by: Ketan Verma <ketan9495@gmail.com>
  • Loading branch information
ketanv3 committed Oct 31, 2022
1 parent e19c448 commit 442ff82
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 208 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.opensearch.search.backpressure.settings.NodeDuressSettings;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.watcher.ResourceWatcherService;
Expand Down Expand Up @@ -597,11 +600,11 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
HeapUsageTracker.SETTING_HEAP_PERCENT_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_VARIANCE_THRESHOLD,
HeapUsageTracker.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
CpuUsageTracker.SETTING_CPU_TIME_MILLIS_THRESHOLD,
ElapsedTimeTracker.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@

package org.opensearch.search.backpressure.settings;

import org.opensearch.ExceptionsHelper;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.monitor.jvm.JvmStats;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
* Defines the settings related to the cancellation of SearchShardTasks.
*
Expand All @@ -28,11 +23,6 @@ public class SearchShardTaskSettings {

private static class Defaults {
private static final double TOTAL_HEAP_PERCENT_THRESHOLD = 0.05;
private static final double HEAP_PERCENT_THRESHOLD = 0.005;
private static final double HEAP_VARIANCE_THRESHOLD = 2.0;
private static final int HEAP_MOVING_AVERAGE_WINDOW_SIZE = 100;
private static final long CPU_TIME_MILLIS_THRESHOLD = 15;
private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000;
}

/**
Expand All @@ -49,99 +39,9 @@ private static class Defaults {
Setting.Property.NodeScope
);

/**
* Defines the heap usage threshold (in percentage) for an individual task before it is considered for cancellation.
*/
private volatile double heapPercentThreshold;
public static final Setting<Double> SETTING_HEAP_PERCENT_THRESHOLD = Setting.doubleSetting(
"search_backpressure.search_shard_task.heap_percent_threshold",
Defaults.HEAP_PERCENT_THRESHOLD,
0.0,
1.0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Defines the heap usage variance for an individual task before it is considered for cancellation.
* A task is considered for cancellation when taskHeapUsage is greater than or equal to heapUsageMovingAverage * variance.
*/
private volatile double heapVarianceThreshold;
public static final Setting<Double> SETTING_HEAP_VARIANCE_THRESHOLD = Setting.doubleSetting(
"search_backpressure.search_shard_task.heap_variance",
Defaults.HEAP_VARIANCE_THRESHOLD,
0.0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Defines the window size to calculate the moving average of heap usage of completed tasks.
*/
private volatile int heapMovingAverageWindowSize;
public static final Setting<Integer> SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE = Setting.intSetting(
"search_backpressure.search_shard_task.heap_moving_average_window_size",
Defaults.HEAP_MOVING_AVERAGE_WINDOW_SIZE,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation.
*/
private volatile long cpuTimeMillisThreshold;
public static final Setting<Long> SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting(
"search_backpressure.search_shard_task.cpu_time_millis_threshold",
Defaults.CPU_TIME_MILLIS_THRESHOLD,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation.
*/
private volatile long elapsedTimeMillisThreshold;
public static final Setting<Long> SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting(
"search_backpressure.search_shard_task.elapsed_time_millis_threshold",
Defaults.ELAPSED_TIME_MILLIS_THRESHOLD,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

/**
* Callback listeners.
*/
public interface Listener {
void onHeapMovingAverageWindowSizeChanged(int newWindowSize);
}

private final List<Listener> listeners = new ArrayList<>();

public SearchShardTaskSettings(Settings settings, ClusterSettings clusterSettings) {
totalHeapPercentThreshold = SETTING_TOTAL_HEAP_PERCENT_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_TOTAL_HEAP_PERCENT_THRESHOLD, this::setTotalHeapPercentThreshold);

heapPercentThreshold = SETTING_HEAP_PERCENT_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_PERCENT_THRESHOLD, this::setHeapPercentThreshold);

heapVarianceThreshold = SETTING_HEAP_VARIANCE_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_VARIANCE_THRESHOLD, this::setHeapVarianceThreshold);

heapMovingAverageWindowSize = SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE, this::setHeapMovingAverageWindowSize);

cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold);

elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold);
}

public void addListener(Listener listener) {
listeners.add(listener);
}

public double getTotalHeapPercentThreshold() {
Expand All @@ -155,66 +55,4 @@ public long getTotalHeapBytesThreshold() {
private void setTotalHeapPercentThreshold(double totalHeapPercentThreshold) {
this.totalHeapPercentThreshold = totalHeapPercentThreshold;
}

public double getHeapPercentThreshold() {
return heapPercentThreshold;
}

public long getHeapBytesThreshold() {
return (long) (HEAP_SIZE_BYTES * getHeapPercentThreshold());
}

private void setHeapPercentThreshold(double heapPercentThreshold) {
this.heapPercentThreshold = heapPercentThreshold;
}

public double getHeapVarianceThreshold() {
return heapVarianceThreshold;
}

private void setHeapVarianceThreshold(double heapVarianceThreshold) {
this.heapVarianceThreshold = heapVarianceThreshold;
}

public int getHeapMovingAverageWindowSize() {
return heapMovingAverageWindowSize;
}

public void setHeapMovingAverageWindowSize(int heapMovingAverageWindowSize) {
this.heapMovingAverageWindowSize = heapMovingAverageWindowSize;

List<Exception> exceptions = new ArrayList<>();
for (Listener listener : listeners) {
try {
listener.onHeapMovingAverageWindowSizeChanged(heapMovingAverageWindowSize);
} catch (Exception e) {
exceptions.add(e);
}
}
ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
}

public long getCpuTimeMillisThreshold() {
return cpuTimeMillisThreshold;
}

public long getCpuTimeNanosThreshold() {
return TimeUnit.MILLISECONDS.toNanos(getCpuTimeMillisThreshold());
}

private void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) {
this.cpuTimeMillisThreshold = cpuTimeMillisThreshold;
}

public long getElapsedTimeMillisThreshold() {
return elapsedTimeMillisThreshold;
}

public long getElapsedTimeNanosThreshold() {
return TimeUnit.MILLISECONDS.toNanos(getElapsedTimeMillisThreshold());
}

private void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) {
this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

package org.opensearch.search.backpressure.trackers;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskCancellation;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.LongSupplier;

import static org.opensearch.search.backpressure.trackers.TaskResourceUsageTrackerType.CPU_USAGE_TRACKER;

Expand All @@ -25,10 +25,25 @@
* @opensearch.internal
*/
public class CpuUsageTracker extends TaskResourceUsageTracker {
private final LongSupplier cpuTimeNanosThresholdSupplier;
private static class Defaults {
private static final long CPU_TIME_MILLIS_THRESHOLD = 15000;
}

/**
* Defines the CPU usage threshold (in millis) for an individual task before it is considered for cancellation.
*/
private volatile long cpuTimeMillisThreshold;
public static final Setting<Long> SETTING_CPU_TIME_MILLIS_THRESHOLD = Setting.longSetting(
"search_backpressure.search_shard_task.cpu_time_millis_threshold",
Defaults.CPU_TIME_MILLIS_THRESHOLD,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

public CpuUsageTracker(SearchBackpressureSettings settings) {
this.cpuTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getCpuTimeNanosThreshold();
this.cpuTimeMillisThreshold = SETTING_CPU_TIME_MILLIS_THRESHOLD.get(settings.getSettings());
settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_CPU_TIME_MILLIS_THRESHOLD, this::setCpuTimeMillisThreshold);
}

@Override
Expand All @@ -39,7 +54,7 @@ public String name() {
@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = task.getTotalResourceStats().getCpuTimeInNanos();
long threshold = cpuTimeNanosThresholdSupplier.getAsLong();
long threshold = getCpuTimeNanosThreshold();

if (usage < threshold) {
return Optional.empty();
Expand All @@ -56,4 +71,12 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
)
);
}

public long getCpuTimeNanosThreshold() {
return TimeUnit.MILLISECONDS.toNanos(cpuTimeMillisThreshold);
}

public void setCpuTimeMillisThreshold(long cpuTimeMillisThreshold) {
this.cpuTimeMillisThreshold = cpuTimeMillisThreshold;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

package org.opensearch.search.backpressure.trackers;

import org.opensearch.common.settings.Setting;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.tasks.Task;
Expand All @@ -25,12 +26,28 @@
* @opensearch.internal
*/
public class ElapsedTimeTracker extends TaskResourceUsageTracker {
private static class Defaults {
private static final long ELAPSED_TIME_MILLIS_THRESHOLD = 30000;
}

/**
* Defines the elapsed time threshold (in millis) for an individual task before it is considered for cancellation.
*/
private volatile long elapsedTimeMillisThreshold;
public static final Setting<Long> SETTING_ELAPSED_TIME_MILLIS_THRESHOLD = Setting.longSetting(
"search_backpressure.search_shard_task.elapsed_time_millis_threshold",
Defaults.ELAPSED_TIME_MILLIS_THRESHOLD,
0,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private final LongSupplier timeNanosSupplier;
private final LongSupplier elapsedTimeNanosThresholdSupplier;

public ElapsedTimeTracker(SearchBackpressureSettings settings, LongSupplier timeNanosSupplier) {
this.timeNanosSupplier = timeNanosSupplier;
this.elapsedTimeNanosThresholdSupplier = () -> settings.getSearchShardTaskSettings().getElapsedTimeNanosThreshold();
this.elapsedTimeMillisThreshold = SETTING_ELAPSED_TIME_MILLIS_THRESHOLD.get(settings.getSettings());
settings.getClusterSettings().addSettingsUpdateConsumer(SETTING_ELAPSED_TIME_MILLIS_THRESHOLD, this::setElapsedTimeMillisThreshold);
}

@Override
Expand All @@ -41,7 +58,7 @@ public String name() {
@Override
public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task task) {
long usage = timeNanosSupplier.getAsLong() - task.getStartTimeNanos();
long threshold = elapsedTimeNanosThresholdSupplier.getAsLong();
long threshold = getElapsedTimeNanosThreshold();

if (usage < threshold) {
return Optional.empty();
Expand All @@ -58,4 +75,12 @@ public Optional<TaskCancellation.Reason> checkAndMaybeGetCancellationReason(Task
)
);
}

public long getElapsedTimeNanosThreshold() {
return TimeUnit.MILLISECONDS.toNanos(elapsedTimeMillisThreshold);
}

public void setElapsedTimeMillisThreshold(long elapsedTimeMillisThreshold) {
this.elapsedTimeMillisThreshold = elapsedTimeMillisThreshold;
}
}
Loading

0 comments on commit 442ff82

Please sign in to comment.