Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added resource usage trackers for in-flight cancellation of SearchShardTask #4805

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
- Add project health badges to the README.md ([#4843](https://github.com/opensearch-project/OpenSearch/pull/4843))
- Added changes for graceful node decommission ([#4586](https://github.com/opensearch-project/OpenSearch/pull/4586))
- Build no-jdk distributions as part of release build ([#4902](https://github.com/opensearch-project/OpenSearch/pull/4902))
- Added resource usage trackers for in-flight cancellation of SearchShardTask ([#4805](https://github.com/opensearch-project/OpenSearch/pull/4805))

### Dependencies
- Bumps `log4j-core` from 2.18.0 to 2.19.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,11 +596,12 @@ public void apply(Settings value, Settings current, Settings previous) {
NodeDuressSettings.SETTING_NUM_SUCCESSIVE_BREACHES,
NodeDuressSettings.SETTING_CPU_THRESHOLD,
NodeDuressSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_THRESHOLD,
SearchShardTaskSettings.SETTING_TOTAL_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_PERCENT_THRESHOLD,
SearchShardTaskSettings.SETTING_HEAP_VARIANCE_THRESHOLD,
SearchShardTaskSettings.SETTING_CPU_TIME_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_THRESHOLD
SearchShardTaskSettings.SETTING_HEAP_MOVING_AVERAGE_WINDOW_SIZE,
SearchShardTaskSettings.SETTING_CPU_TIME_MILLIS_THRESHOLD,
SearchShardTaskSettings.SETTING_ELAPSED_TIME_MILLIS_THRESHOLD
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,9 @@ public boolean request() {
*/
private static class State {
final double tokens;
final double lastRefilledAt;
final long lastRefilledAt;

public State(double tokens, double lastRefilledAt) {
public State(double tokens, long lastRefilledAt) {
this.tokens = tokens;
this.lastRefilledAt = lastRefilledAt;
}
Expand All @@ -113,7 +113,7 @@ public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
State state = (State) o;
return Double.compare(state.tokens, tokens) == 0 && Double.compare(state.lastRefilledAt, lastRefilledAt) == 0;
return Double.compare(state.tokens, tokens) == 0 && lastRefilledAt == state.lastRefilledAt;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import org.opensearch.monitor.jvm.JvmStats;
import org.opensearch.monitor.process.ProcessProbe;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.trackers.CpuUsageTracker;
import org.opensearch.search.backpressure.trackers.ElapsedTimeTracker;
import org.opensearch.search.backpressure.trackers.HeapUsageTracker;
import org.opensearch.search.backpressure.trackers.NodeDuressTracker;
import org.opensearch.search.backpressure.trackers.TaskResourceUsageTracker;
import org.opensearch.tasks.CancellableTask;
Expand All @@ -29,7 +32,6 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -84,7 +86,7 @@ public SearchBackpressureService(
() -> JvmStats.jvmStats().getMem().getHeapUsedPercent() / 100.0 >= settings.getNodeDuressSettings().getHeapThreshold()
)
),
Collections.emptyList()
List.of(new CpuUsageTracker(settings), new HeapUsageTracker(settings), new ElapsedTimeTracker(settings, System::nanoTime))
);
}

Expand All @@ -97,7 +99,7 @@ public SearchBackpressureService(
List<TaskResourceUsageTracker> taskResourceUsageTrackers
) {
this.settings = settings;
this.settings.setListener(this);
this.settings.addListener(this);
this.taskResourceTrackingService = taskResourceTrackingService;
this.taskResourceTrackingService.addTaskCompletionListener(this);
this.threadPool = threadPool;
Expand Down Expand Up @@ -181,10 +183,10 @@ boolean isNodeInDuress() {
* Returns true if the increase in heap usage is due to search requests.
*/
boolean isHeapUsageDominatedBySearch(List<CancellableTask> searchShardTasks) {
long runningTasksHeapUsage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long searchTasksHeapThreshold = getSettings().getSearchShardTaskSettings().getTotalHeapThresholdBytes();
if (runningTasksHeapUsage < searchTasksHeapThreshold) {
logger.debug("heap usage not dominated by search requests [{}/{}]", runningTasksHeapUsage, searchTasksHeapThreshold);
long usage = searchShardTasks.stream().mapToLong(task -> task.getTotalResourceStats().getMemoryInBytes()).sum();
long threshold = getSettings().getSearchShardTaskSettings().getTotalHeapBytesThreshold();
if (usage < threshold) {
logger.debug("heap usage not dominated by search requests [{}/{}]", usage, threshold);
return false;
}

Expand Down Expand Up @@ -213,7 +215,7 @@ TaskCancellation getTaskCancellation(CancellableTask task) {
List<Runnable> callbacks = new ArrayList<>();

for (TaskResourceUsageTracker tracker : taskResourceUsageTrackers) {
Optional<TaskCancellation.Reason> reason = tracker.cancellationReason(task);
Optional<TaskCancellation.Reason> reason = tracker.checkAndMaybeGetCancellationReason(task);
Copy link
Collaborator

@Bukhtawar Bukhtawar Oct 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for this, but in general we should think about decoupling tracking and action once thresholds have breached. Today it might be search cancellation but I do envision this as an action that modifies threadpool size/queue in a manner that creates a backpressure
We can think about that refactor as a fast follow up as that will help us add more actions

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also cancellation isn't truly back pressure :) it's load shedding

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense. Trackers can recommend actions once thresholds are met, and cancellation of tasks can be one such action. This will however influence how dissimilar actions from different trackers are grouped/compared with each other in the SearchBackpressureService. For example, we need to aggregate the cancellation scores from each tracker before we start cancelling tasks. With generic actions, this might become really complicated.

Let's do a detailed design of this first and refactor as a follow-up. Enhancement: #4985

if (reason.isPresent()) {
reasons.add(reason.get());
callbacks.add(tracker::incrementCancellations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

package org.opensearch.search.backpressure.settings;

import org.apache.lucene.util.SetOnce;
import org.opensearch.ExceptionsHelper;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* Settings related to search backpressure and cancellation of in-flight requests.
Expand All @@ -23,7 +26,7 @@
*/
public class SearchBackpressureSettings {
private static class Defaults {
private static final long INTERVAL = 1000;
private static final long INTERVAL_MILLIS = 1000;

private static final boolean ENABLED = true;
private static final boolean ENFORCED = false;
Expand All @@ -37,9 +40,9 @@ private static class Defaults {
* Defines the interval (in millis) at which the SearchBackpressureService monitors and cancels tasks.
*/
private final TimeValue interval;
public static final Setting<Long> SETTING_INTERVAL = Setting.longSetting(
"search_backpressure.interval",
Defaults.INTERVAL,
public static final Setting<Long> SETTING_INTERVAL_MILLIS = Setting.longSetting(
"search_backpressure.interval_millis",
Defaults.INTERVAL_MILLIS,
1,
Setting.Property.NodeScope
);
Expand Down Expand Up @@ -116,15 +119,19 @@ public interface Listener {
void onCancellationBurstChanged();
}

private final SetOnce<Listener> listener = new SetOnce<>();
private final List<Listener> listeners = new ArrayList<>();
private final Settings settings;
private final ClusterSettings clusterSettings;
private final NodeDuressSettings nodeDuressSettings;
private final SearchShardTaskSettings searchShardTaskSettings;

public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSettings) {
this.settings = settings;
this.clusterSettings = clusterSettings;
this.nodeDuressSettings = new NodeDuressSettings(settings, clusterSettings);
this.searchShardTaskSettings = new SearchShardTaskSettings(settings, clusterSettings);

interval = new TimeValue(SETTING_INTERVAL.get(settings));
interval = new TimeValue(SETTING_INTERVAL_MILLIS.get(settings));

enabled = SETTING_ENABLED.get(settings);
clusterSettings.addSettingsUpdateConsumer(SETTING_ENABLED, this::setEnabled);
Expand All @@ -142,8 +149,16 @@ public SearchBackpressureSettings(Settings settings, ClusterSettings clusterSett
clusterSettings.addSettingsUpdateConsumer(SETTING_CANCELLATION_BURST, this::setCancellationBurst);
}

public void setListener(Listener listener) {
this.listener.set(listener);
public void addListener(Listener listener) {
listeners.add(listener);
}

public Settings getSettings() {
return settings;
}

public ClusterSettings getClusterSettings() {
return clusterSettings;
}

public NodeDuressSettings getNodeDuressSettings() {
Expand Down Expand Up @@ -180,9 +195,7 @@ public double getCancellationRatio() {

private void setCancellationRatio(double cancellationRatio) {
this.cancellationRatio = cancellationRatio;
if (listener.get() != null) {
listener.get().onCancellationRatioChanged();
}
notifyListeners(Listener::onCancellationRatioChanged);
}

public double getCancellationRate() {
Expand All @@ -195,9 +208,7 @@ public double getCancellationRateNanos() {

private void setCancellationRate(double cancellationRate) {
this.cancellationRate = cancellationRate;
if (listener.get() != null) {
listener.get().onCancellationRateChanged();
}
notifyListeners(Listener::onCancellationRateChanged);
}

public double getCancellationBurst() {
Expand All @@ -206,8 +217,20 @@ public double getCancellationBurst() {

private void setCancellationBurst(double cancellationBurst) {
this.cancellationBurst = cancellationBurst;
if (listener.get() != null) {
listener.get().onCancellationBurstChanged();
notifyListeners(Listener::onCancellationBurstChanged);
}

private void notifyListeners(Consumer<Listener> consumer) {
List<Exception> exceptions = new ArrayList<>();

for (Listener listener : listeners) {
try {
consumer.accept(listener);
} catch (Exception e) {
exceptions.add(e);
}
}

ExceptionsHelper.maybeThrowRuntimeAndSuppress(exceptions);
}
}
Loading