Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task cancellation monitoring service #7642

Merged
merged 29 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
706fbb8
Adding task cancellation time in task API
sgup432 May 5, 2023
fc49144
Fixing unit tests and addressing comments
sgup432 May 8, 2023
3994b25
Adding change log for unreleased 2.x
sgup432 May 9, 2023
966d079
Removing running time cancel info from task API
sgup432 May 17, 2023
c74514c
Merge branch 'main' into task_api_changes
sgup432 May 17, 2023
31eeb91
Adding task cancellation monitoring service
sgup432 May 19, 2023
79be38e
Changing interval and duration setting
sgup432 May 19, 2023
5ea1936
Replacing long primitive with Long object
sgup432 May 19, 2023
9ad838d
Merge branch 'main' into task_api_changes
sgup432 May 19, 2023
2836093
Making cancelledAt field human readable
sgup432 May 19, 2023
6b32e98
Fixing failing test
sgup432 May 19, 2023
4d23a8e
Removing the feature from unreleased 3.x
sgup432 May 19, 2023
9fc264c
Fixing ListTasksResponseTests failure
sgup432 May 20, 2023
fd8e542
Test failure fix
sgup432 May 20, 2023
55d54ea
Changing naming convention to cancellationStartTime
sgup432 May 22, 2023
1d1e919
Merge branch 'main' into task_api_changes
sgup432 May 22, 2023
befdf86
Merge branch 'task_api_changes' into task_monitoring_changes
sgup432 May 23, 2023
fef0c4e
Adding task cancellation stats tests
sgup432 May 23, 2023
68d913c
Merge branch 'main' into task_monitoring_changes
sgup432 May 23, 2023
faf24c9
Updating change log
sgup432 May 23, 2023
838c02b
Merge branch 'main' into task_monitoring_changes
sgup432 May 23, 2023
385fb2a
Addressing comments
sgup432 May 31, 2023
8bd8aa7
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 1, 2023
eaf1718
Merging task completion/cancellation listeners into one
sgup432 Jun 6, 2023
dc65e48
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 6, 2023
51f4369
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 7, 2023
ee0d61a
Updating java docs for stats classes
sgup432 Jun 7, 2023
ba766f3
Updating cancel tracker eviction logic and adding desired UTs
sgup432 Jun 8, 2023
8077a04
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Provide mechanism to configure XContent parsing constraints (after update to Jackson 2.15.0 and above) ([#7550](https://github.com/opensearch-project/OpenSearch/pull/7550))
- Support to clear filecache using clear indices cache API ([#7498](https://github.com/opensearch-project/OpenSearch/pull/7498))
- Create NamedRoute to map extension routes to a shortened name ([#6870](https://github.com/opensearch-project/OpenSearch/pull/6870))
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
- Added @dbwiddis as on OpenSearch maintainer ([#7665](https://github.com/opensearch-project/OpenSearch/pull/7665))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -134,6 +135,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private FileCacheStats fileCacheStats;

@Nullable
private TaskCancellationStats taskCancellationStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -180,6 +184,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
fileCacheStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
taskCancellationStats = in.readOptionalWriteable(TaskCancellationStats::new);
} else {
taskCancellationStats = null;
}
}

public NodeStats(
Expand All @@ -204,7 +213,8 @@ public NodeStats(
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -228,6 +238,7 @@ public NodeStats(
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -355,6 +366,11 @@ public FileCacheStats getFileCacheStats() {
return fileCacheStats;
}

@Nullable
public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -392,6 +408,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalWriteable(fileCacheStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
}

@Override
Expand Down Expand Up @@ -476,6 +495,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getFileCacheStats() != null) {
getFileCacheStats().toXContent(builder, params);
}
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public enum Metric {
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache");
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
Expand Down Expand Up @@ -648,7 +649,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Related to task cancellation
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
)
)
);
Expand Down
18 changes: 17 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
Expand Down Expand Up @@ -965,6 +967,16 @@ protected Node(
client,
FeatureFlags.isEnabled(SEARCH_PIPELINE)
);
final TaskCancellationMonitoringSettings taskCancellationMonitoringSettings = new TaskCancellationMonitoringSettings(
settings,
clusterService.getClusterSettings()
);
final TaskCancellationMonitoringService taskCancellationMonitoringService = new TaskCancellationMonitoringService(
threadPool,
transportService.getTaskManager(),
taskResourceTrackingService,
taskCancellationMonitoringSettings
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -985,7 +997,8 @@ protected Node(
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService,
searchPipelineService,
fileCache
fileCache,
taskCancellationMonitoringService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1215,6 +1228,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1373,6 +1387,7 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
nodeService.getTaskCancellationMonitoringService().stop();

pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
Expand Down Expand Up @@ -1436,6 +1451,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(nodeService.getTaskCancellationMonitoringService());

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -90,6 +91,7 @@ public class NodeService implements Closeable {
private final ClusterService clusterService;
private final Discovery discovery;
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;

NodeService(
Settings settings,
Expand All @@ -111,7 +113,8 @@ public class NodeService implements Closeable {
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -133,6 +136,7 @@ public class NodeService implements Closeable {
this.searchPipelineService = searchPipelineService;
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -211,7 +215,8 @@ public NodeStats stats(
boolean searchBackpressure,
boolean clusterManagerThrottling,
boolean weightedRoutingStats,
boolean fileCacheStats
boolean fileCacheStats,
boolean taskCancellation
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,7 +242,8 @@ public NodeStats stats(
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null
);
}

Expand All @@ -253,6 +259,10 @@ public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

public TaskCancellationMonitoringService getTaskCancellationMonitoringService() {
return taskCancellationMonitoringService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

public class SearchShardTaskCancellationStats implements ToXContentObject, Writeable {

private final long currentLongRunningCancelledTaskCount;
private final long totalLongRunningCancelledTaskCount;
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

public SearchShardTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
this.currentLongRunningCancelledTaskCount = currentTaskCount;
this.totalLongRunningCancelledTaskCount = totalTaskCount;
}

public SearchShardTaskCancellationStats(StreamInput in) throws IOException {
this.currentLongRunningCancelledTaskCount = in.readVLong();
this.totalLongRunningCancelledTaskCount = in.readVLong();
}

// package private for testing
protected long getCurrentLongRunningCancelledTaskCount() {
return this.currentLongRunningCancelledTaskCount;
}

// package private for testing
protected long getTotalLongRunningCancelledTaskCount() {
return this.totalLongRunningCancelledTaskCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
return builder.endObject();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentLongRunningCancelledTaskCount);
out.writeVLong(totalLongRunningCancelledTaskCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardTaskCancellationStats that = (SearchShardTaskCancellationStats) o;
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
}

@Override
public int hashCode() {
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
}
}