Skip to content

Commit

Permalink
Add resource stats to task framework (#2089)
Browse files Browse the repository at this point in the history
* Add resource stats to task framework

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>

* Update thread resource info and add tests

Signed-off-by: sruti1312 <srutiparthiban@gmail.com>
(cherry picked from commit 8b997c1)
  • Loading branch information
sruti1312 authored and github-actions[bot] committed Mar 29, 2022
1 parent 7d8f9ac commit 79f536c
Show file tree
Hide file tree
Showing 19 changed files with 844 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> resourceStats = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -150,6 +151,14 @@ public Map<String, Object> getStatus() {
return status;
}

void setResourceStats(Map<String, Object> resourceStats) {
this.resourceStats.putAll(resourceStats);
}

public Map<String, Object> getResourceStats() {
return resourceStats;
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -170,6 +179,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -188,7 +198,8 @@ && isCancelled() == taskInfo.isCancelled()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
}

@Override
Expand All @@ -204,7 +215,8 @@ public int hashCode() {
isCancelled(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getResourceStats()
);
}

Expand Down Expand Up @@ -236,6 +248,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", resource_stats="
+ resourceStats
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.TaskResourceStats;
import org.opensearch.tasks.TaskResourceUsage;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -57,7 +60,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats"))
.test();
}

Expand Down Expand Up @@ -106,7 +109,8 @@ static TaskInfo randomTaskInfo() {
cancellable,
cancelled,
parentTaskId,
headers
headers,
randomResourceStats()
);
}

Expand All @@ -127,4 +131,14 @@ private static RawTaskStatus randomRawTaskStatus() {
throw new IllegalStateException(e);
}
}

private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
cancellable,
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public void testRethrottleSuccessfulResponse() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down Expand Up @@ -167,7 +168,8 @@ public void testRethrottleWithSomeSucceeded() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ public void testNodeNotFoundButTaskFound() throws Exception {
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected Table getTableWithHeader(final RestRequest request) {
// Task detailed info
if (detailed) {
table.addCell("description", "default:true;alias:desc;desc:task action");
table.addCell("resource_stats", "default:false;desc:resource consumption info of the task");
}
table.endHeaders();
return table;
Expand Down Expand Up @@ -173,6 +174,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo

if (detailed) {
table.addCell(taskInfo.getDescription());
table.addCell(taskInfo.getResourceStats());
}
table.endRow();
}
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

/**
* Different resource stats are defined.
*/
public enum ResourceStats {
CPU("cpu_time_in_nanos"),
MEMORY("memory_in_bytes");

private final String statsName;

ResourceStats(String statsName) {
this.statsName = statsName;
}

@Override
public String toString() {
return statsName;
}
}
32 changes: 32 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceStatsType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

/** Defines the different types of resource stats. */
public enum ResourceStatsType {
// resource stats of the worker thread reported directly from runnable.
WORKER_STATS("worker_stats", false);

private final String statsType;
private final boolean onlyForAnalysis;

ResourceStatsType(String statsType, boolean onlyForAnalysis) {
this.statsType = statsType;
this.onlyForAnalysis = onlyForAnalysis;
}

public boolean isOnlyForAnalysis() {
return onlyForAnalysis;
}

@Override
public String toString() {
return statsType;
}
}
108 changes: 108 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Thread resource usage information for particular resource stats type.
* <p>
* It captures the resource usage information like memory, CPU about a particular execution of thread
* for a specific stats type.
*/
public class ResourceUsageInfo {
private static final Logger logger = LogManager.getLogger(ResourceUsageInfo.class);
private final EnumMap<ResourceStats, ResourceStatsInfo> statsInfo = new EnumMap<>(ResourceStats.class);

public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
this.statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue()));
}
}

public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats());
if (resourceStatsInfo != null) {
updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric);
} else {
throw new IllegalStateException(
"cannot update ["
+ resourceUsageMetric.getStats().toString()
+ "] entry as its not present current_stats_info:"
+ statsInfo
);
}
}
}

private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, ResourceUsageMetric resourceUsageMetric) {
long currentEndValue;
long newEndValue;
do {
currentEndValue = resourceStatsInfo.endValue.get();
newEndValue = resourceUsageMetric.getValue();
if (currentEndValue > newEndValue) {
logger.debug(
"dropping resource usage update as the new value is lower than current value ["
+ "resource_stats=[{}], "
+ "current_end_value={}, "
+ "new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
return;
}
} while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue));
logger.debug(
"updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
}

public Map<ResourceStats, ResourceStatsInfo> getStatsInfo() {
return Collections.unmodifiableMap(statsInfo);
}

@Override
public String toString() {
return statsInfo.toString();
}

/**
* Defines resource stats information.
*/
static class ResourceStatsInfo {
private final long startValue;
private final AtomicLong endValue;

private ResourceStatsInfo(long startValue) {
this.startValue = startValue;
this.endValue = new AtomicLong(startValue);
}

public long getTotalValue() {
return endValue.get() - startValue;
}

@Override
public String toString() {
return String.valueOf(getTotalValue());
}
}
}
27 changes: 27 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceUsageMetric.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

public class ResourceUsageMetric {
private final ResourceStats stats;
private final long value;

public ResourceUsageMetric(ResourceStats stats, long value) {
this.stats = stats;
this.value = value;
}

public ResourceStats getStats() {
return stats;
}

public long getValue() {
return value;
}
}
Loading

0 comments on commit 79f536c

Please sign in to comment.