Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add resource stats to task framework #2089

Merged
merged 3 commits into from
Mar 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public class TaskInfo {
private TaskId parentTaskId;
private final Map<String, Object> status = new HashMap<>();
private final Map<String, String> headers = new HashMap<>();
private final Map<String, Object> resourceStats = new HashMap<>();

public TaskInfo(TaskId taskId) {
this.taskId = taskId;
Expand Down Expand Up @@ -150,6 +151,14 @@ public Map<String, Object> getStatus() {
return status;
}

void setResourceStats(Map<String, Object> resourceStats) {
this.resourceStats.putAll(resourceStats);
}

public Map<String, Object> getResourceStats() {
return resourceStats;
}

private void noOpParse(Object s) {}

public static final ObjectParser.NamedObjectParser<TaskInfo, Void> PARSER;
Expand All @@ -170,6 +179,7 @@ private void noOpParse(Object s) {}
parser.declareBoolean(TaskInfo::setCancelled, new ParseField("cancelled"));
parser.declareString(TaskInfo::setParentTaskId, new ParseField("parent_task_id"));
parser.declareObject(TaskInfo::setHeaders, (p, c) -> p.mapStrings(), new ParseField("headers"));
parser.declareObject(TaskInfo::setResourceStats, (p, c) -> p.map(), new ParseField("resource_stats"));
PARSER = (XContentParser p, Void v, String name) -> parser.parse(p, new TaskInfo(new TaskId(name)), null);
}

Expand All @@ -188,7 +198,8 @@ && isCancelled() == taskInfo.isCancelled()
&& Objects.equals(getDescription(), taskInfo.getDescription())
&& Objects.equals(getParentTaskId(), taskInfo.getParentTaskId())
&& Objects.equals(status, taskInfo.status)
&& Objects.equals(getHeaders(), taskInfo.getHeaders());
&& Objects.equals(getHeaders(), taskInfo.getHeaders())
&& Objects.equals(getResourceStats(), taskInfo.getResourceStats());
}

@Override
Expand All @@ -204,7 +215,8 @@ public int hashCode() {
isCancelled(),
getParentTaskId(),
status,
getHeaders()
getHeaders(),
getResourceStats()
);
}

Expand Down Expand Up @@ -236,6 +248,8 @@ public String toString() {
+ status
+ ", headers="
+ headers
+ ", resource_stats="
+ resourceStats
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,16 @@
import org.opensearch.common.xcontent.ToXContent;
import org.opensearch.common.xcontent.XContentBuilder;
import org.opensearch.tasks.RawTaskStatus;
import org.opensearch.tasks.TaskResourceStats;
import org.opensearch.tasks.TaskResourceUsage;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskId;
import org.opensearch.tasks.TaskInfo;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

import static org.opensearch.test.AbstractXContentTestCase.xContentTester;
Expand All @@ -57,7 +60,7 @@ public void testFromXContent() throws IOException {
)
.assertEqualsConsumer(this::assertEqualInstances)
.assertToXContentEquivalence(true)
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status"))
.randomFieldsExcludeFilter(field -> field.endsWith("headers") || field.endsWith("status") || field.contains("resource_stats"))
.test();
}

Expand Down Expand Up @@ -106,7 +109,8 @@ static TaskInfo randomTaskInfo() {
cancellable,
cancelled,
parentTaskId,
headers
headers,
randomResourceStats()
);
}

Expand All @@ -127,4 +131,14 @@ private static RawTaskStatus randomRawTaskStatus() {
throw new IllegalStateException(e);
}
}

private static TaskResourceStats randomResourceStats() {
return randomBoolean() ? null : new TaskResourceStats(new HashMap<String, TaskResourceUsage>() {
{
for (int i = 0; i < randomInt(5); i++) {
put(randomAlphaOfLength(5), new TaskResourceUsage(randomNonNegativeLong(), randomNonNegativeLong()));
}
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@ protected CancelTasksResponseTests.ByNodeCancelTasksResponse createServerTestIns
cancellable,
cancelled,
new TaskId("node1", randomLong()),
Collections.singletonMap("x-header-of", "some-value")
Collections.singletonMap("x-header-of", "some-value"),
null
)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public void testRethrottleSuccessfulResponse() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down Expand Up @@ -167,7 +168,8 @@ public void testRethrottleWithSomeSucceeded() {
true,
false,
new TaskId("test", task.getId()),
Collections.emptyMap()
Collections.emptyMap(),
null
)
);
sliceStatuses.add(new BulkByScrollTask.StatusOrException(status));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,7 +907,8 @@ public void testNodeNotFoundButTaskFound() throws Exception {
false,
false,
TaskId.EMPTY_TASK_ID,
Collections.emptyMap()
Collections.emptyMap(),
null
),
new RuntimeException("test")
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ protected Table getTableWithHeader(final RestRequest request) {
// Task detailed info
if (detailed) {
table.addCell("description", "default:true;alias:desc;desc:task action");
table.addCell("resource_stats", "default:false;desc:resource consumption info of the task");
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
}
table.endHeaders();
return table;
Expand Down Expand Up @@ -173,6 +174,7 @@ private void buildRow(Table table, boolean fullId, boolean detailed, DiscoveryNo

if (detailed) {
table.addCell(taskInfo.getDescription());
table.addCell(taskInfo.getResourceStats());
}
table.endRow();
}
Expand Down
28 changes: 28 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

/**
* Different resource stats are defined.
*/
public enum ResourceStats {
CPU("cpu_time_in_nanos"),
MEMORY("memory_in_bytes");

private final String statsName;

ResourceStats(String statsName) {
this.statsName = statsName;
}

@Override
public String toString() {
return statsName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

/** Defines the different types of resource stats. */
public enum ResourceStatsType {
// resource stats of the worker thread reported directly from runnable.
WORKER_STATS("worker_stats", false);

private final String statsType;
private final boolean onlyForAnalysis;

ResourceStatsType(String statsType, boolean onlyForAnalysis) {
this.statsType = statsType;
this.onlyForAnalysis = onlyForAnalysis;
}

public boolean isOnlyForAnalysis() {
return onlyForAnalysis;
}

@Override
public String toString() {
return statsType;
}
}
108 changes: 108 additions & 0 deletions server/src/main/java/org/opensearch/tasks/ResourceUsageInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.Collections;
import java.util.EnumMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
* Thread resource usage information for particular resource stats type.
* <p>
* It captures the resource usage information like memory, CPU about a particular execution of thread
* for a specific stats type.
*/
public class ResourceUsageInfo {
private static final Logger logger = LogManager.getLogger(ResourceUsageInfo.class);
private final EnumMap<ResourceStats, ResourceStatsInfo> statsInfo = new EnumMap<>(ResourceStats.class);

public ResourceUsageInfo(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
this.statsInfo.put(resourceUsageMetric.getStats(), new ResourceStatsInfo(resourceUsageMetric.getValue()));
}
}

public void recordResourceUsageMetrics(ResourceUsageMetric... resourceUsageMetrics) {
for (ResourceUsageMetric resourceUsageMetric : resourceUsageMetrics) {
final ResourceStatsInfo resourceStatsInfo = statsInfo.get(resourceUsageMetric.getStats());
if (resourceStatsInfo != null) {
updateResourceUsageInfo(resourceStatsInfo, resourceUsageMetric);
} else {
throw new IllegalStateException(
"cannot update ["
+ resourceUsageMetric.getStats().toString()
+ "] entry as its not present current_stats_info:"
+ statsInfo
);
}
}
}

private void updateResourceUsageInfo(ResourceStatsInfo resourceStatsInfo, ResourceUsageMetric resourceUsageMetric) {
long currentEndValue;
long newEndValue;
do {
currentEndValue = resourceStatsInfo.endValue.get();
newEndValue = resourceUsageMetric.getValue();
if (currentEndValue > newEndValue) {
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved
logger.debug(
"dropping resource usage update as the new value is lower than current value ["
+ "resource_stats=[{}], "
+ "current_end_value={}, "
+ "new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
return;
}
} while (!resourceStatsInfo.endValue.compareAndSet(currentEndValue, newEndValue));
logger.debug(
"updated resource usage info [resource_stats=[{}], " + "old_end_value={}, new_end_value={}]",
resourceUsageMetric.getStats(),
currentEndValue,
newEndValue
);
}
sruti1312 marked this conversation as resolved.
Show resolved Hide resolved

public Map<ResourceStats, ResourceStatsInfo> getStatsInfo() {
return Collections.unmodifiableMap(statsInfo);
}

@Override
public String toString() {
return statsInfo.toString();
}

/**
* Defines resource stats information.
*/
static class ResourceStatsInfo {
private final long startValue;
private final AtomicLong endValue;

private ResourceStatsInfo(long startValue) {
this.startValue = startValue;
this.endValue = new AtomicLong(startValue);
}

public long getTotalValue() {
return endValue.get() - startValue;
}

@Override
public String toString() {
return String.valueOf(getTotalValue());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

public class ResourceUsageMetric {
private final ResourceStats stats;
private final long value;

public ResourceUsageMetric(ResourceStats stats, long value) {
this.stats = stats;
this.value = value;
}

public ResourceStats getStats() {
return stats;
}

public long getValue() {
return value;
}
}
Loading