Skip to content

Commit

Permalink
add completion count into shard level and task level SBP RCA (#517)
Browse files Browse the repository at this point in the history
* SBP dev changes

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* use completionCount stat to calculate SBP RCA and enhance logging

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* change logging level

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* undo dev changes

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* undo dev docker changes

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* rename search backpressure constants

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* address comments

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* add logic to keep the heap node duress setting in sync with cluster

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

* refactor a constant and change the logging level for resource summary

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>

---------

Signed-off-by: Kaushal Kumar <ravi.kaushal97@gmail.com>
  • Loading branch information
kaushalmahi12 committed Dec 22, 2023
1 parent 6217f2f commit 70222ba
Show file tree
Hide file tree
Showing 18 changed files with 418 additions and 156 deletions.
8 changes: 8 additions & 0 deletions config/rca.conf
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 80,
"max-shard-heap-cancellation-percentage": 5,
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 90,
"min-shard-heap-cancellation-percentage": 3,
"min-task-heap-cancellation-percentage": 3
},
"admission-control-rca": {
"request-size": {
"heap-range": [
Expand Down
13 changes: 13 additions & 0 deletions config/rca_cluster_manager.conf
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@
// shard request cache rca
"shard-request-cache-rca": {
"shard-request-cache-threshold" : 0.9
},
"search-back-pressure-rca": {
"max-heap-usage-increase": 80,
"max-shard-heap-cancellation-percentage": 5,
"max-task-heap-cancellation-percentage": 5,
"max-heap-usage-decrease": 90,
"min-shard-heap-cancellation-percentage": 3,
"min-task-heap-cancellation-percentage": 3
}
},

Expand All @@ -98,6 +106,11 @@
"old-gen-threshold-level-one" : 0.6,
"old-gen-threshold-level-two" : 0.75,
"old-gen-threshold-level-three" : 0.9
},
"search-back-pressure-policy-config": {
"enabled": true,
"hour-breach-threshold": 1,
"searchbp-heap-stepsize-in-percentage": 5
}
},
// Action Configurations
Expand Down
2 changes: 1 addition & 1 deletion docker/opensearch.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
cluster.name: "docker-cluster"
network.host: 0.0.0.0
network.host: 0.0.0.0
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@


import io.netty.handler.codec.http.HttpMethod;
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.performanceanalyzer.core.Util;
Expand Down Expand Up @@ -54,6 +62,62 @@ public static void disablePA() throws InterruptedException {
throw new RuntimeException("Failed to disable PA after 5 attempts");
}

public static class ClusterSettings {
static List<String> clusterSettings = new ArrayList<>();

public final static String SETTING_NOT_FOUND = "NULL";

static final String CLUSTER_SETTINGS_URL =
"/_cluster/settings?flat_settings=true&include_defaults=true&pretty";

/** Refreshes the Cluster Settings */
private static void refreshClusterSettings() {
final HttpURLConnection urlConnection =
LocalhostConnectionUtil.createHTTPConnection(
CLUSTER_SETTINGS_URL, HttpMethod.GET);
try (final BufferedReader br =
new BufferedReader(new InputStreamReader(urlConnection.getInputStream()))) {
String line;
clusterSettings.clear();
while ((line = br.readLine()) != null) {
clusterSettings.add(line);
}
} catch (IOException e) {
LOG.warn("could not refresh the cluster settings");
}
}

/**
* @param settingName a string representing the setting name in cluster settings, expected
* values are flat settings, e,g; <code>search_backpressure.node_duress.heap_threshold
* </code>
* @param settingValRegex a regex value representing valid regex match for the setting val
* and should encapsulate the value in a group inside the string settingValRegex, e,g;
* "\"([0-9].[0-9]+)\"" to match any floating value with one leading digit
* @returns the value for the setting settingName if present e,g; "0.7" or else NULL
*/
public static String getClusterSettingValue(String settingName, String settingValRegex) {
refreshClusterSettings();
Pattern settingValPattern = Pattern.compile(settingValRegex);
Optional<String> setting =
clusterSettings.stream()
.filter(settingLine -> settingLine.contains(settingName))
.findFirst();
final String settingVal =
setting.map(
settingLine -> {
Matcher settingValMatcher =
settingValPattern.matcher(settingLine);
if (settingValMatcher.find()) {
return settingValMatcher.group(1);
}
return null;
})
.orElseGet(() -> SETTING_NOT_FOUND);
return settingVal;
}
}

private static HttpURLConnection createHTTPConnection(String path, HttpMethod httpMethod) {
try {
String endPoint = "http://localhost:9200" + path;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public String summary() {
return summary.toJson();
}

@Override
public String toString() {
return summary();
}

public static final class Builder {
public static final boolean DEFAULT_CAN_UPDATE = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,18 @@ public SearchBackPressurePolicy(SearchBackPressureClusterRCA searchBackPressureC
* @param issue an issue with the application
*/
private void record(HotResourceSummary summary) {
LOG.trace("SearchBackPressurePolicy capturing resource summary: {}", summary);

if (HEAP_SEARCHBP_SHARD_SIGNALS.contains(summary.getResource())) {
LOG.debug("Shard signal in SBP RCA summary...");
searchBackPressureIssue =
new SearchBackPressureShardIssue(
summary, searchBackPressureShardAlarmMonitorMap);
searchBackPressureIssue.recordIssueBySummaryType(summary);
}

if (HEAP_SEARCHBP_TASK_SIGNALS.contains(summary.getResource())) {
LOG.debug("Task signal in SBP RCA summary...");
searchBackPressureIssue =
new SearchBackPressureSearchTaskIssue(
summary, searchBackPressureTaskAlarmMonitorMap);
Expand All @@ -126,7 +130,9 @@ private void recordIssues() {
LOG.debug("No flow units in searchBackPressureClusterRCA");
return;
}

LOG.debug(
"SearchBackPressurePolicy flow units: {}",
searchBackPressureClusterRCA.getFlowUnits());
for (ResourceFlowUnit<HotClusterSummary> flowUnit :
searchBackPressureClusterRCA.getFlowUnits()) {
if (!flowUnit.hasResourceSummary()) {
Expand Down Expand Up @@ -248,7 +254,10 @@ public List<Action> evaluate() {
checkTaskAlarms(actions);

// print current size of the actions
LOG.debug("SearchBackPressurePolicy#evaluate() action size: {}", actions.size());
LOG.debug(
"SearchBackPressurePolicy#evaluate() action size: {} actions: {}",
actions.size(),
actions);

return actions;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ public SearchBackPressureShardIssue(

@Override
public void recordIssueBySummaryType(HotResourceSummary summary) {
if (summary.getMetaData() == SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR) {
LOG.debug("Recording issue by summary type..... summary: {}", summary);
if (summary.getMetaData()
.equalsIgnoreCase(SearchBackPressureRcaConfig.INCREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording increase-level issue for shard");
LOG.debug("size of the HashMap: {}", actionsAlarmMonitorMap.size());
actionsAlarmMonitorMap
Expand All @@ -33,7 +35,8 @@ public void recordIssueBySummaryType(HotResourceSummary summary) {
}

// decrease alarm for heap-related threshold
if (summary.getMetaData() == SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR) {
if (summary.getMetaData()
.equalsIgnoreCase(SearchBackPressureRcaConfig.DECREASE_THRESHOLD_BY_JVM_STR)) {
LOG.debug("recording decrease-level issue for shard");
actionsAlarmMonitorMap
.get(SearchbpShardAlarmMonitorMapKeys.SHARD_HEAP_DECREASE_ALARM.toString())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,7 +466,7 @@ public class MetricsModel {

// Search Back Pressure Metrics
allMetricsInitializer.put(
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT
AllMetrics.SearchBackPressureStatsValue.SEARCHBP_SHARD_TASK_STATS_CANCELLATION_COUNT
.toString(),
new MetricAttributes(MetricUnits.COUNT.toString(), EmptyDimension.values()));
ALL_METRICS = Collections.unmodifiableMap(allMetricsInitializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import org.opensearch.performanceanalyzer.rca.framework.core.RcaConf;

public class SearchBackPressureRcaConfig {
public static final String CONFIG_NAME = "search-back-pressure-rca-policy";
public static final String CONFIG_NAME = "search-back-pressure-rca";

/* Metadata fields for thresholds */
public static final String INCREASE_THRESHOLD_BY_JVM_STR = "increase_jvm";
Expand All @@ -25,28 +25,41 @@ public class SearchBackPressureRcaConfig {

/* Increase Threshold */
// node max heap usage in last 60 secs is less than 70%
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD = 70;
public static final int DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT = 80;
private Integer maxHeapIncreasePercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 5% of all task completions at shard level
// (Taking 3 because we don't cancel more than 10% of all completions at any time)
// Basically this threshold tell that we are overcancelling the shard level tasks given max heap
// from last rca eval period is still
// below or equal to DEFAULT_MAX_HEAP_INCREASE_THRESHOLD
public static final int DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is more than 50% of all task cancellations in task level
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD = 50;
// cancellation percent due to heap is more than 5% of all task completions in
// SearchTask(co-ordinator) level (Taking 3 because we don't cancel more than 10% of all
// completions at any time)
// Basically this threshold tell that we are overcancelling the co-ordinator level tasks
public static final int DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT = 5;
private Integer maxTaskHeapCancellationPercentageThreshold;

/* Decrease Threshold */
// node min heap usage in last 60 secs is more than 80%
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD = 80;
public static final int DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT = 90;
private Integer minHeapDecreasePercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in shard level
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in shard level
// Basically this threshold tell that we are under cancelling the shard level tasks given min
// heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minShardHeapCancellationPercentageThreshold;

// cancellationCount due to heap is less than 30% of all task cancellations in task level
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD = 30;
// cancellationCount due to heap is less than 3% of all task completions in task level
// Basically this threshold tell that we are under cancelling the coordinator level tasks given
// min heap from last rca eval period is still
// above or equal to DEFAULT_MIN_HEAP_DECREASE_THRESHOLD
public static final int DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT = 3;
private Integer minTaskHeapCancellationPercentageThreshold;

public SearchBackPressureRcaConfig(final RcaConf conf) {
Expand All @@ -56,46 +69,46 @@ public SearchBackPressureRcaConfig(final RcaConf conf) {
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_INCREASE_FIELD.toString(),
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD,
DEFAULT_MAX_HEAP_INCREASE_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
maxShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
maxTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MAX_HEAP_CANCELLATION_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
minHeapDecreasePercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MAX_HEAP_USAGE_DECREASE_FIELD.toString(),
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD,
DEFAULT_MIN_HEAP_DECREASE_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
minShardHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_SHARD_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_SHARD_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
minTaskHeapCancellationPercentageThreshold =
conf.readRcaConfig(
CONFIG_NAME,
SearchBackPressureRcaConfigKeys.MIN_TASK_HEAP_CANCELLATION_PERCENTAGE_FIELD
.toString(),
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD,
DEFAULT_TASK_MIN_HEAP_CANCELLATION_THRESHOLD_PERCENT,
(s) -> s >= 0 && s <= 100,
Integer.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ public ImmutableList<FlowUnitMessage> drainNode(final String graphNode) {
final List<FlowUnitMessage> tempList = new ArrayList<>();
BlockingQueue<FlowUnitMessage> existing = flowUnitMap.get(graphNode);
if (existing == null) {
LOG.debug("Nothing in the FlowUnitStore for vertex: {}", graphNode);
return ImmutableList.of();
}

existing.drainTo(tempList);
LOG.debug("Available flow units for vertex: {}, flowUnits: {}", graphNode, tempList);

return ImmutableList.copyOf(tempList);
}
Expand Down
Loading

0 comments on commit 70222ba

Please sign in to comment.