From 348b0bc86a39d0cf5ebba98e596fcd86ae70df59 Mon Sep 17 00:00:00 2001 From: "opensearch-trigger-bot[bot]" <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com> Date: Thu, 5 Oct 2023 09:47:14 -0700 Subject: [PATCH] SearchBackPressure Service Collector added (#483) (#558) * SearchBackPressure Collector added Signed-off-by: CoderJeffrey (cherry picked from commit e0ea1da309a288ef79bb45dcc7c48ce381bb145a) Co-authored-by: Jeffrey Liu --- .../PerformanceAnalyzerPlugin.java | 5 + .../SearchBackPressureStatsCollector.java | 733 ++++++++++++++++++ .../performanceanalyzer/util/Utils.java | 1 + ...SearchBackPressureStatsCollectorTests.java | 176 +++++ 4 files changed, 915 insertions(+) create mode 100644 src/main/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollector.java create mode 100644 src/test/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollectorTests.java diff --git a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java index 76b0a16e..470ad2a0 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java +++ b/src/main/java/org/opensearch/performanceanalyzer/PerformanceAnalyzerPlugin.java @@ -53,6 +53,7 @@ import org.opensearch.performanceanalyzer.collectors.NodeDetailsCollector; import org.opensearch.performanceanalyzer.collectors.NodeStatsAllShardsMetricsCollector; import org.opensearch.performanceanalyzer.collectors.NodeStatsFixedShardsMetricsCollector; +import org.opensearch.performanceanalyzer.collectors.SearchBackPressureStatsCollector; import org.opensearch.performanceanalyzer.collectors.ShardIndexingPressureMetricsCollector; import org.opensearch.performanceanalyzer.collectors.ShardStateCollector; import org.opensearch.performanceanalyzer.collectors.ThreadPoolMetricsCollector; @@ -224,6 +225,10 @@ public PerformanceAnalyzerPlugin(final Settings settings, final java.nio.file.Pa scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new ClusterApplierServiceStatsCollector( performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( + new SearchBackPressureStatsCollector( + performanceAnalyzerController, configOverridesWrapper)); + scheduledMetricCollectorsExecutor.addScheduledMetricCollector( new AdmissionControlMetricsCollector()); scheduledMetricCollectorsExecutor.addScheduledMetricCollector( diff --git a/src/main/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollector.java b/src/main/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollector.java new file mode 100644 index 00000000..fadbf135 --- /dev/null +++ b/src/main/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollector.java @@ -0,0 +1,733 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors; + +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.CLUSTER_APPLIER_SERVICE_STATS_COLLECTOR_ERROR; +import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatMetrics.CLUSTER_APPLIER_SERVICE_STATS_COLLECTOR_EXECUTION_TIME; + +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.Map; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.node.Node; +import org.opensearch.node.NodeService; +import org.opensearch.performanceanalyzer.commons.collectors.MetricStatus; +import org.opensearch.performanceanalyzer.commons.collectors.PerformanceAnalyzerMetricsCollector; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.SearchBackPressureStatsValue; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.opensearch.search.backpressure.SearchBackpressureService; +import org.opensearch.search.backpressure.stats.SearchShardTaskStats; +import org.opensearch.search.backpressure.stats.SearchTaskStats; + +public class SearchBackPressureStatsCollector extends PerformanceAnalyzerMetricsCollector + implements MetricsProcessor { + // SAMPLING TIME INTERVAL to collect search back pressure stats + public static final int SAMPLING_TIME_INTERVAL = + MetricsConfiguration.CONFIG_MAP.get(SearchBackPressureStatsCollector.class) + .samplingInterval; + private static final int KEYS_PATH_LENGTH = 0; + private static final Logger LOG = LogManager.getLogger(SearchBackPressureStatsCollector.class); + private static final ObjectMapper mapper; + + public static final String BOOTSTRAP_CLASS_NAME = "org.opensearch.bootstrap.Bootstrap"; + public static final String NODE_CLASS_NAME = "org.opensearch.node.Node"; + public static final String BOOTSTRAP_INSTANCE_FIELD_NAME = "INSTANCE"; + public static final String BOOTSTRAP_NODE_FIELD_NAME = "node"; + public static final String NODE_SERVICE_FIELD_NAME = "nodeService"; + + public static final String HEAP_USAGE_TRACKER_FIELD_NAME = "HEAP_USAGE_TRACKER"; + public static final String CPU_USAGE_TRACKER_FIELD_NAME = "CPU_USAGE_TRACKER"; + public static final String ELAPSED_TIME_USAGE_TRACKER_FIELD_NAME = "ELAPSED_TIME_TRACKER"; + + // Headline for search back pressure metrics + public static final String PATH_TO_STORE_METRICS = "search_back_pressure"; + private String nodeId; + + // Metrics to be collected as a String and written in a JSON String + private final StringBuilder value; + private final PerformanceAnalyzerController controller; + private final ConfigOverridesWrapper configOverridesWrapper; + + static { + mapper = new ObjectMapper(); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + /* + * SearchBackPressureStatsCollector collects SearchBackPressure Related Stats from org.opensearch.search.backpressure.SearchBackpressureService + * Example Stats include the cancellation count of search tasks on shard level or node level + */ + public SearchBackPressureStatsCollector( + PerformanceAnalyzerController controller, + ConfigOverridesWrapper configOverridesWrapper) { + super( + SAMPLING_TIME_INTERVAL, + SearchBackPressureStatsCollector.class.getSimpleName(), + CLUSTER_APPLIER_SERVICE_STATS_COLLECTOR_EXECUTION_TIME, + CLUSTER_APPLIER_SERVICE_STATS_COLLECTOR_ERROR); + + this.controller = controller; + this.configOverridesWrapper = configOverridesWrapper; + this.value = new StringBuilder(); + LOG.info("SearchBackPressureStatsCollector started"); + } + + /* + * NodeId is an additional field to be added to the searchbackpressure stats returned from SearchBackpressureService + */ + private void setNodeId(String nodeId) { + this.nodeId = nodeId; + } + + private String getNodeId() { + return this.nodeId; + } + + @Override + public void collectMetrics(long startTime) { + SearchBackPressureStats currentSearchBackPressureStats = null; + try { + String jsonString = mapper.writeValueAsString(getSearchBackPressureStats()); + currentSearchBackPressureStats = + mapper.readValue(jsonString, SearchBackPressureStats.class); + + } catch (InvocationTargetException + | IllegalAccessException + | NoSuchMethodException + | NoSuchFieldException + | ClassNotFoundException + | JsonProcessingException ex) { + ex.printStackTrace(); + LOG.warn( + "No method found to get Search BackPressure Stats. " + + "Skipping SearchBackPressureStatsCollector. Error: " + + ex.getMessage()); + return; + } + + SearchBackPressureMetrics searchBackPressureMetrics = + new SearchBackPressureMetrics( + currentSearchBackPressureStats.getMode(), + getNodeId(), + currentSearchBackPressureStats.getSearchShardTaskStats(), + currentSearchBackPressureStats.getSearchTaskStats()); + + // clear previous buffered value + value.setLength(0); + + // Append system current time and line seperator + value.append(PerformanceAnalyzerMetrics.getJsonCurrentMilliSeconds()) + .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor); + + // Append search back pressure metrics + value.append(searchBackPressureMetrics.serialize()); + + // Save metrics into /dev/shm folder + saveMetricValues(value.toString(), startTime); + } + + Field getField(String className, String fieldName) + throws NoSuchFieldException, ClassNotFoundException { + + Class BootStrapClass = Class.forName(className); + Field bootStrapField = BootStrapClass.getDeclaredField(fieldName); + + // set the field to be accessible + bootStrapField.setAccessible(true); + return bootStrapField; + } + + @VisibleForTesting + public Object getSearchBackPressureStats() + throws InvocationTargetException, IllegalAccessException, NoSuchMethodException, + NoSuchFieldException, NoSuchFieldError, ClassNotFoundException { + + // Get the static instance of Bootstrap + Object bootStrapSInstance = + getField(BOOTSTRAP_CLASS_NAME, BOOTSTRAP_INSTANCE_FIELD_NAME).get(null); + + // Get the Node instance from the Bootstrap instance + Node node = + (Node) + getField(BOOTSTRAP_CLASS_NAME, BOOTSTRAP_NODE_FIELD_NAME) + .get(bootStrapSInstance); + + NodeEnvironment nodeEnvironment = node.getNodeEnvironment(); + setNodeId(nodeEnvironment.nodeId()); + + // Get the NodeService instance from the Node instance + NodeService nodeService = + (NodeService) getField(NODE_CLASS_NAME, NODE_SERVICE_FIELD_NAME).get(node); + + String GET_STATS_METHOD_NAME = "nodeStats"; + Method method = SearchBackpressureService.class.getMethod(GET_STATS_METHOD_NAME); + + return method.invoke(nodeService.getSearchBackpressureService()); + } + + @Override + public String getMetricsPath(long startTime, String... keysPath) { + if (keysPath.length != KEYS_PATH_LENGTH) { + throw new RuntimeException("keys length should be " + KEYS_PATH_LENGTH); + } + return PerformanceAnalyzerMetrics.generatePath(startTime, PATH_TO_STORE_METRICS); + } + + /* + * POJO Class to deserialize the stats JSON String from SearchBackPressureService + */ + public static class SearchBackPressureStats { + private SearchShardTaskStats searchShardTaskStats; + private String mode; + private SearchTaskStats searchTaskStats; + + @VisibleForTesting + @JsonCreator + public SearchBackPressureStats( + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_BACK_PRESSURE_STATS_SEARCH_SHARD_TASK_STATS) + SearchShardTaskStats searchShardTaskStats, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_BACK_PRESSURE_STATS_MODE) + String mode, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_BACK_PRESSURE_STATS_SEARCH_TASK_STATS) + SearchTaskStats searchTaskStats) { + this.searchShardTaskStats = searchShardTaskStats; + this.mode = mode; + this.searchTaskStats = searchTaskStats; + } + + public SearchBackPressureStats() {} + + // Getters and setters + public String getMode() { + return mode; + } + + public void setMode(String mode) { + this.mode = mode; + } + + public SearchShardTaskStats getSearchShardTaskStats() { + return searchShardTaskStats; + } + + public void setSearchShardTaskStats(SearchShardTaskStats searchShardTaskStats) { + this.searchShardTaskStats = searchShardTaskStats; + } + + public SearchTaskStats getSearchTaskStats() { + return searchTaskStats; + } + + public void setSearchTaskStats(SearchTaskStats searchTaskStats) { + this.searchTaskStats = searchTaskStats; + } + } + + /* + * POJO Class to deserialize Shard level Search Task Stats + */ + public static class SearchShardTaskStats { + private long cancellationCount; + private long limitReachedCount; + private Map resourceUsageTrackerStats; + + @JsonCreator + public SearchShardTaskStats( + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_SHARD_TASK_STATS_CANCELLATIONCOUNT) + long cancellationCount, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_SHARD_TASK_STATS_LIMITREACHEDCOUNT) + long limitReachedCount, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_SHARD_TASK_STATS_RESOURCE_USAGE_TRACKER_STATS) + Map resourceUsageTrackerStats) { + this.cancellationCount = cancellationCount; + this.limitReachedCount = limitReachedCount; + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + + // Getters and Setters + public long getCancellationCount() { + return cancellationCount; + } + + public void setCancellationCount(long cancellationCount) { + this.cancellationCount = cancellationCount; + } + + public long getLimitReachedCount() { + return limitReachedCount; + } + + public void setLimitReachedCount(long limitReachedCount) { + this.limitReachedCount = limitReachedCount; + } + + public Map getResourceUsageTrackerStats() { + return resourceUsageTrackerStats; + } + + public void setResourceUsageTrackerStats( + Map resourceUsageTrackerStats) { + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + } + + /* + * POJO Class to deserialize Node level Search Task Stats + */ + public static class SearchTaskStats { + private long cancellationCount; + private long limitReachedCount; + private Map resourceUsageTrackerStats; + + @JsonCreator + public SearchTaskStats( + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_TASK_STATS_CANCELLATIONCOUNT) + long cancellationCount, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_TASK_STATS_LIMITREACHEDCOUNT) + long limitReachedCount, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SEARCH_TASK_STATS_RESOURCE_USAGE_TRACKER_STATS) + Map resourceUsageTrackerStats) { + this.cancellationCount = cancellationCount; + this.limitReachedCount = limitReachedCount; + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + + // Getters and Setters + public long getCancellationCount() { + return cancellationCount; + } + + public void setCancellationCount(long cancellationCount) { + this.cancellationCount = cancellationCount; + } + + public long getLimitReachedCount() { + return limitReachedCount; + } + + public void setLimitReachedCount(long limitReachedCount) { + this.limitReachedCount = limitReachedCount; + } + + public Map getResourceUsageTrackerStats() { + return resourceUsageTrackerStats; + } + + public void setResourceUsageTrackerStats( + Map resourceUsageTrackerStats) { + this.resourceUsageTrackerStats = resourceUsageTrackerStats; + } + } + + public static class ResourceUsageTrackerStats { + private long cancellationCount; + private long currentMax; + private long currentAvg; + private long rollingAvg; + private boolean fragment; + + @JsonCreator + public ResourceUsageTrackerStats( + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_RESOURCE_USAGE_TRACKER_STATS_CANCELLATIONCOUNT) + long cancellationCount, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_RESOURCE_USAGE_TRACKER_STATS_CURRENTMAX) + long currentMax, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_RESOURCE_USAGE_TRACKER_STATS_CURRENTAVG) + long currentAvg, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_RESOURCE_USAGE_TRACKER_STATS_ROLLINGAVG) + long rollingAvg, + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_RESOURCE_USAGE_TRACKER_STATS_FRAGMENT) + boolean fragment) { + this.cancellationCount = cancellationCount; + this.currentMax = currentMax; + this.currentAvg = currentAvg; + this.rollingAvg = rollingAvg; + this.fragment = fragment; + } + // Getters and Setters + public long getCancellationCount() { + return cancellationCount; + } + + public void setCancellationCount(long cancellationCount) { + this.cancellationCount = cancellationCount; + } + + public long getCurrentMax() { + return currentMax; + } + + public void setCurrentMax(long currentMax) { + this.currentMax = currentMax; + } + + public long getCurrentAvg() { + return currentAvg; + } + + public void setCurrentAvg(long currentAvg) { + this.currentAvg = currentAvg; + } + + public long getRollingAvg() { + return rollingAvg; + } + + public void setRollingAvg(long rollingAvg) { + this.rollingAvg = rollingAvg; + } + + public boolean isFragment() { + return fragment; + } + + public void setFragment(boolean fragment) { + this.fragment = fragment; + } + } + + /* + * SearchBackPressureMetrics class to be stored + * Flatten the data fields for easier access + */ + public static class SearchBackPressureMetrics extends MetricStatus { + private SearchShardTaskStats searchShardTaskStats; + private SearchTaskStats searchTaskStats; + private String mode; + private String nodeId; + + // SearchShardTaskStats related stats (General) + private long searchbp_shard_stats_cancellationCount; + private long searchbp_shard_stats_limitReachedCount; + + // SearchShardTaskStats related stats (resourceUsageTrackerStats) + // HEAP_USAGE_TRACKER + private long searchbp_shard_stats_resource_heap_usage_cancellationCount; + private long searchbp_shard_stats_resource_heap_usage_currentMax; + private long searchbp_shard_stats_resource_heap_usage_rollingAvg; + + // CPU_USAGE_TRACKER + private long searchbp_shard_stats_resource_cpu_usage_cancellationCount; + private long searchbp_shard_stats_resource_cpu_usage_currentMax; + private long searchbp_shard_stats_resource_cpu_usage_currentAvg; + + // ELAPSED_TIME_TRACKER + private long searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount; + private long searchbp_shard_stats_resource_elaspedtime_usage_currentMax; + private long searchbp_shard_stats_resource_elaspedtime_usage_currentAvg; + + // SearchTaskStats related stats (General) + private long searchbp_task_stats_cancellationCount; + private long searchbp_task_stats_limitReachedCount; + + // SearchTaskStats related stats (resourceUsageTrackerStats) + // HEAP_USAGE_TRACKER + private long searchbp_task_stats_resource_heap_usage_cancellationCount; + private long searchbp_task_stats_resource_heap_usage_currentMax; + private long searchbp_task_stats_resource_heap_usage_rollingAvg; + + // CPU_USAGE_TRACKER + private long searchbp_task_stats_resource_cpu_usage_cancellationCount; + private long searchbp_task_stats_resource_cpu_usage_currentMax; + private long searchbp_task_stats_resource_cpu_usage_currentAvg; + + // ELAPSED_TIME_TRACKER + private long searchbp_task_stats_resource_elaspedtime_usage_cancellationCount; + private long searchbp_task_stats_resource_elaspedtime_usage_currentMax; + private long searchbp_task_stats_resource_elaspedtime_usage_currentAvg; + + public SearchBackPressureMetrics( + String mode, + String nodeId, + SearchShardTaskStats searchShardTaskStats, + SearchTaskStats searchTaskStats) { + this.mode = mode; + this.nodeId = nodeId; + this.searchShardTaskStats = searchShardTaskStats; + this.searchTaskStats = searchTaskStats; + populate_shard_task_stats(searchShardTaskStats); + populate_task_stats(searchTaskStats); + } + + public void populate_shard_task_stats(SearchShardTaskStats searchShardTaskStats) { + // Create shard HEAP/CPU/TIME Stats ResourceUsageTrackerStats for simplification + ResourceUsageTrackerStats shard_heap_stats = + searchShardTaskStats + .getResourceUsageTrackerStats() + .get(HEAP_USAGE_TRACKER_FIELD_NAME); + + ResourceUsageTrackerStats shard_cpu_stats = + searchShardTaskStats + .getResourceUsageTrackerStats() + .get(CPU_USAGE_TRACKER_FIELD_NAME); + + ResourceUsageTrackerStats shard_time_stats = + searchShardTaskStats + .getResourceUsageTrackerStats() + .get(ELAPSED_TIME_USAGE_TRACKER_FIELD_NAME); + + this.searchbp_shard_stats_cancellationCount = + searchShardTaskStats.getCancellationCount(); + this.searchbp_shard_stats_limitReachedCount = + searchShardTaskStats.getLimitReachedCount(); + this.searchbp_shard_stats_resource_heap_usage_cancellationCount = + shard_heap_stats.getCancellationCount(); + this.searchbp_shard_stats_resource_heap_usage_currentMax = + shard_heap_stats.getCurrentMax(); + this.searchbp_shard_stats_resource_heap_usage_rollingAvg = + shard_heap_stats.getRollingAvg(); + this.searchbp_shard_stats_resource_cpu_usage_cancellationCount = + shard_cpu_stats.getCancellationCount(); + this.searchbp_shard_stats_resource_cpu_usage_currentMax = + shard_cpu_stats.getCurrentMax(); + this.searchbp_shard_stats_resource_cpu_usage_currentAvg = + shard_cpu_stats.getCurrentAvg(); + this.searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount = + shard_time_stats.getCancellationCount(); + this.searchbp_shard_stats_resource_elaspedtime_usage_currentMax = + shard_time_stats.getCurrentMax(); + this.searchbp_shard_stats_resource_elaspedtime_usage_currentAvg = + shard_time_stats.getCurrentAvg(); + } + + public void populate_task_stats(SearchTaskStats searchTaskStats) { + // Create task HEAP/CPU/TIME Stats ResourceUsageTrackerStats for simplification + ResourceUsageTrackerStats task_heap_stats = + searchTaskStats + .getResourceUsageTrackerStats() + .get(HEAP_USAGE_TRACKER_FIELD_NAME); + + ResourceUsageTrackerStats task_cpu_stats = + searchTaskStats + .getResourceUsageTrackerStats() + .get(CPU_USAGE_TRACKER_FIELD_NAME); + + ResourceUsageTrackerStats task_time_stats = + searchTaskStats + .getResourceUsageTrackerStats() + .get(ELAPSED_TIME_USAGE_TRACKER_FIELD_NAME); + + this.searchbp_task_stats_cancellationCount = searchTaskStats.getCancellationCount(); + this.searchbp_task_stats_limitReachedCount = searchTaskStats.getLimitReachedCount(); + this.searchbp_task_stats_resource_heap_usage_cancellationCount = + task_heap_stats.getCancellationCount(); + this.searchbp_task_stats_resource_heap_usage_currentMax = + task_heap_stats.getCurrentMax(); + this.searchbp_task_stats_resource_heap_usage_rollingAvg = + task_heap_stats.getRollingAvg(); + this.searchbp_task_stats_resource_cpu_usage_cancellationCount = + task_cpu_stats.getCancellationCount(); + this.searchbp_task_stats_resource_cpu_usage_currentMax = task_cpu_stats.getCurrentMax(); + this.searchbp_task_stats_resource_cpu_usage_currentAvg = task_cpu_stats.getCurrentAvg(); + this.searchbp_task_stats_resource_elaspedtime_usage_cancellationCount = + task_time_stats.getCancellationCount(); + this.searchbp_task_stats_resource_elaspedtime_usage_currentMax = + task_time_stats.getCurrentMax(); + this.searchbp_task_stats_resource_elaspedtime_usage_currentAvg = + task_time_stats.getCurrentAvg(); + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_MODE) + public String getSearchBackPressureStats_Mode() { + return this.mode; + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_NODEID) + public String getSearchBackPressureStats_NodeId() { + return this.nodeId; + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_SHARD_STATS_CANCELLATIONCOUNT) + public long getSearchbp_shard_stats_cancellationCount() { + return searchbp_shard_stats_cancellationCount; + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_SHARD_STATS_LIMITREACHEDCOUNT) + public long getSearchbp_shard_stats_limitReachedCount() { + return searchbp_shard_stats_limitReachedCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_shard_stats_resource_heap_usage_cancellationCount() { + return searchbp_shard_stats_resource_heap_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX) + public long getSearchbp_shard_stats_resource_heap_usage_currentMax() { + return searchbp_shard_stats_resource_heap_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG) + public long getsearchbp_shard_stats_resource_heap_usage_rollingAvg() { + return searchbp_shard_stats_resource_heap_usage_rollingAvg; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_shard_stats_resource_cpu_usage_cancellationCount() { + return searchbp_shard_stats_resource_cpu_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTMAX) + public long getSearchbp_shard_stats_resource_cpu_usage_currentMax() { + return searchbp_shard_stats_resource_cpu_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_CPU_USAGE_CURRENTAVG) + public long getSearchbp_shard_stats_resource_cpu_usage_currentAvg() { + return searchbp_shard_stats_resource_cpu_usage_currentAvg; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_ELASPEDTIME_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_shard_stats_resource_elaspedtime_usage_cancellationCount() { + return searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_ELASPEDTIME_USAGE_CURRENTMAX) + public long getSearchbp_shard_stats_resource_elaspedtime_usage_currentMax() { + return searchbp_shard_stats_resource_elaspedtime_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_SHARD_STATS_RESOURCE_ELASPEDTIME_USAGE_CURRENTAVG) + public long getSearchbp_shard_stats_resource_elaspedtime_usage_currentAvg() { + return searchbp_shard_stats_resource_elaspedtime_usage_currentAvg; + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_TASK_STATS_CANCELLATIONCOUNT) + public long getSearchbp_task_stats_cancellationCount() { + return searchbp_task_stats_cancellationCount; + } + + @JsonProperty(SearchBackPressureStatsValue.Constants.SEARCHBP_TASK_STATS_LIMITREACHEDCOUNT) + public long getSearchbp_task_stats_limitReachedCount() { + return searchbp_task_stats_limitReachedCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_task_stats_resource_heap_usage_cancellationCount() { + return searchbp_task_stats_resource_heap_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_CURRENTMAX) + public long getSearchbp_task_stats_resource_heap_usage_currentMax() { + return searchbp_task_stats_resource_heap_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_HEAP_USAGE_ROLLINGAVG) + public long getsearchbp_task_stats_resource_heap_usage_rollingAvg() { + return searchbp_task_stats_resource_heap_usage_rollingAvg; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_task_stats_resource_cpu_usage_cancellationCount() { + return searchbp_task_stats_resource_cpu_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTMAX) + public long getSearchbp_task_stats_resource_cpu_usage_currentMax() { + return searchbp_task_stats_resource_cpu_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_CPU_USAGE_CURRENTAVG) + public long getSearchbp_task_stats_resource_cpu_usage_currentAvg() { + return searchbp_task_stats_resource_cpu_usage_currentAvg; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_ELASPEDTIME_USAGE_CANCELLATIONCOUNT) + public long getSearchbp_task_stats_resource_elaspedtime_usage_cancellationCount() { + return searchbp_task_stats_resource_elaspedtime_usage_cancellationCount; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_ELASPEDTIME_USAGE_CURRENTMAX) + public long getSearchbp_task_stats_resource_elaspedtime_usage_currentMax() { + return searchbp_task_stats_resource_elaspedtime_usage_currentMax; + } + + @JsonProperty( + SearchBackPressureStatsValue.Constants + .SEARCHBP_TASK_STATS_RESOURCE_ELASPEDTIME_USAGE_CURRENTAVG) + public long getSearchbp_task_stats_resource_elaspedtime_usage_currentAvg() { + return searchbp_task_stats_resource_elaspedtime_usage_currentAvg; + } + } +} diff --git a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java index a6a82bc6..7c0f1591 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java +++ b/src/main/java/org/opensearch/performanceanalyzer/util/Utils.java @@ -44,6 +44,7 @@ public static void configureMetrics() { MetricsConfiguration.CONFIG_MAP.put( ClusterManagerThrottlingMetricsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ClusterApplierServiceStatsCollector.class, cdefault); + MetricsConfiguration.CONFIG_MAP.put(SearchBackPressureStatsCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ElectionTermCollector.class, cdefault); MetricsConfiguration.CONFIG_MAP.put(ShardIndexingPressureMetricsCollector.class, cdefault); } diff --git a/src/test/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollectorTests.java b/src/test/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollectorTests.java new file mode 100644 index 00000000..059a9302 --- /dev/null +++ b/src/test/java/org/opensearch/performanceanalyzer/collectors/SearchBackPressureStatsCollectorTests.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.performanceanalyzer.collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.opensearch.performanceanalyzer.commons.config.overrides.ConfigOverridesWrapper; +import org.opensearch.performanceanalyzer.commons.event_process.Event; +import org.opensearch.performanceanalyzer.commons.metrics.MetricsConfiguration; +import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; +import org.powermock.modules.junit4.PowerMockRunner; + +@RunWith(PowerMockRunner.class) +public class SearchBackPressureStatsCollectorTests { + private ObjectMapper mapper; + private long startTimeInMills; + private PerformanceAnalyzerController controller; + private ConfigOverridesWrapper configOverrides; + private SearchBackPressureStatsCollector searchBackPressureStatsCollector; + + // Required fields needed for search back pressure stats + private List required_fields_for_searchBackPressureStats = + Arrays.asList( + "searchbp_shard_stats_cancellationCount", + "searchbp_shard_stats_limitReachedCount", + "searchbp_shard_stats_resource_heap_usage_cancellationCount", + "searchbp_shard_stats_resource_heap_usage_currentMax", + "searchbp_shard_stats_resource_heap_usage_rollingAvg", + "searchbp_shard_stats_resource_cpu_usage_cancellationCount", + "searchbp_shard_stats_resource_cpu_usage_currentMax", + "searchbp_shard_stats_resource_cpu_usage_currentAvg", + "searchbp_shard_stats_resource_elaspedtime_usage_cancellationCount", + "searchbp_shard_stats_resource_elaspedtime_usage_currentMax", + "searchbp_shard_stats_resource_elaspedtime_usage_currentAvg", + "searchbp_task_stats_cancellationCount", + "searchbp_task_stats_limitReachedCount", + "searchbp_task_stats_resource_heap_usage_cancellationCount", + "searchbp_task_stats_resource_heap_usage_currentMax", + "searchbp_task_stats_resource_heap_usage_rollingAvg", + "searchbp_task_stats_resource_cpu_usage_cancellationCount", + "searchbp_task_stats_resource_cpu_usage_currentMax", + "searchbp_task_stats_resource_cpu_usage_currentAvg", + "searchbp_task_stats_resource_elaspedtime_usage_cancellationCount", + "searchbp_task_stats_resource_elaspedtime_usage_currentMax", + "searchbp_task_stats_resource_elaspedtime_usage_currentAvg", + "searchbp_mode", + "searchbp_nodeid"); + + // Mock Instance for HEAP/CPU/ELAPSED_TIME usage + SearchBackPressureStatsCollector.ResourceUsageTrackerStats HEAP_USAGE_TRACKER_MOCK_STATS; + SearchBackPressureStatsCollector.ResourceUsageTrackerStats CPU_USAGE_TRACKER_MOCK_STATS; + SearchBackPressureStatsCollector.ResourceUsageTrackerStats ELAPSED_TIME_TRACKER_MOCK_STATS; + + /* + * Required Config to be initialized + * Set the LOG property to be false + * Set the controller to be a Mock PerforamcneAnalyzerController.class + * Set the configWrapper to be a Mock ConfigOverridesWrapper + * Set the Collector to be a new SearchBackPressureServiceCollector + * Set the ObjectMapper to be a new ObjectMapper() instance + */ + @Before + public void init() { + mapper = new ObjectMapper(); + MetricsConfiguration.CONFIG_MAP.put( + SearchBackPressureStatsCollector.class, MetricsConfiguration.cdefault); + System.setProperty("performanceanalyzer.metrics.log.enabled", "False"); + startTimeInMills = 1153721339; + controller = Mockito.mock(PerformanceAnalyzerController.class); + configOverrides = Mockito.mock(ConfigOverridesWrapper.class); + searchBackPressureStatsCollector = + new SearchBackPressureStatsCollector(controller, configOverrides); + + HEAP_USAGE_TRACKER_MOCK_STATS = + new SearchBackPressureStatsCollector.ResourceUsageTrackerStats(0, 0, 0, 0, false); + CPU_USAGE_TRACKER_MOCK_STATS = + new SearchBackPressureStatsCollector.ResourceUsageTrackerStats(0, 0, 0, 0, false); + ELAPSED_TIME_TRACKER_MOCK_STATS = + new SearchBackPressureStatsCollector.ResourceUsageTrackerStats(0, 0, 0, 0, false); + } + + /* + * testSearchBackPressureStats_collectMetrics() test saveMetricValues() for SearchBackPressureStatsCollector + */ + @Test + public void testSearchBackPressureStats_saveMetricValues() { + Mockito.when( + controller.isCollectorEnabled( + configOverrides, "SearchBackPressureStatsCollector")) + .thenReturn(true); + searchBackPressureStatsCollector.saveMetricValues("search_back_pressure", startTimeInMills); + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + + // Valid case testing + assertEquals(1, metrics.size()); + assertEquals("search_back_pressure", metrics.get(0).value); + + // Exception case testing + try { + searchBackPressureStatsCollector.saveMetricValues( + "search_back_pressure", startTimeInMills, "dummy"); + assertTrue("Negative scenario test: Should have been a RuntimeException", true); + } catch (RuntimeException ex) { + // - expecting exception...1 values passed; 0 expected + // since keyPath does not match + } + } + + /* + * testSearchBackPressureStats_collectMetrics() test collectoMetrics() for SearchBackPressureStatsCollector + * Mock the behavior getSearchBackPressureStats() to return a mock SearchBackPressureStats Instance + */ + @Test + public void testSearchBackPressureStats_collectMetrics() + throws NoSuchMethodException, IllegalAccessException, InvocationTargetException, + JsonProcessingException, NoSuchFieldException, ClassNotFoundException { + String SEARCH_BACK_PRESSURE_MODE_FIELD_NAME = "searchbp_mode"; + SearchBackPressureStatsCollector spyCollector = + Mockito.spy(searchBackPressureStatsCollector); + + Map + resource_usage_mock_stats = + Map.ofEntries( + Map.entry("HEAP_USAGE_TRACKER", HEAP_USAGE_TRACKER_MOCK_STATS), + Map.entry("CPU_USAGE_TRACKER", CPU_USAGE_TRACKER_MOCK_STATS), + Map.entry("ELAPSED_TIME_TRACKER", ELAPSED_TIME_TRACKER_MOCK_STATS)); + + Mockito.doReturn( + new SearchBackPressureStatsCollector.SearchBackPressureStats( + new SearchBackPressureStatsCollector.SearchShardTaskStats( + 0, 0, resource_usage_mock_stats), + "MONITOR_ONLY", + new SearchBackPressureStatsCollector.SearchTaskStats( + 0, 0, resource_usage_mock_stats))) + .when(spyCollector) + .getSearchBackPressureStats(); + + Mockito.when( + controller.isCollectorEnabled( + configOverrides, + SearchBackPressureStatsCollector.class.getSimpleName())) + .thenReturn(true); + + spyCollector.collectMetrics(startTimeInMills); + List metrics = new ArrayList<>(); + PerformanceAnalyzerMetrics.metricQueue.drainTo(metrics); + + assertEquals(1, metrics.size()); + + String[] lines = metrics.get(0).value.split(System.lineSeparator()); + Map map = mapper.readValue(lines[1], Map.class); + + // Verify requried fields are all presented in the metrics + String jsonStr = lines[1]; + for (String required_field : required_fields_for_searchBackPressureStats) { + assertTrue(jsonStr.contains(required_field)); + } + } +}