Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task cancellation monitoring service #7642

Merged
merged 29 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
706fbb8
Adding task cancellation time in task API
sgup432 May 5, 2023
fc49144
Fixing unit tests and addressing comments
sgup432 May 8, 2023
3994b25
Adding change log for unreleased 2.x
sgup432 May 9, 2023
966d079
Removing running time cancel info from task API
sgup432 May 17, 2023
c74514c
Merge branch 'main' into task_api_changes
sgup432 May 17, 2023
31eeb91
Adding task cancellation monitoring service
sgup432 May 19, 2023
79be38e
Changing interval and duration setting
sgup432 May 19, 2023
5ea1936
Replacing long primitive with Long object
sgup432 May 19, 2023
9ad838d
Merge branch 'main' into task_api_changes
sgup432 May 19, 2023
2836093
Making cancelledAt field human readable
sgup432 May 19, 2023
6b32e98
Fixing failing test
sgup432 May 19, 2023
4d23a8e
Removing the feature from unreleased 3.x
sgup432 May 19, 2023
9fc264c
Fixing ListTasksResponseTests failure
sgup432 May 20, 2023
fd8e542
Test failure fix
sgup432 May 20, 2023
55d54ea
Changing naming convention to cancellationStartTime
sgup432 May 22, 2023
1d1e919
Merge branch 'main' into task_api_changes
sgup432 May 22, 2023
befdf86
Merge branch 'task_api_changes' into task_monitoring_changes
sgup432 May 23, 2023
fef0c4e
Adding task cancellation stats tests
sgup432 May 23, 2023
68d913c
Merge branch 'main' into task_monitoring_changes
sgup432 May 23, 2023
faf24c9
Updating change log
sgup432 May 23, 2023
838c02b
Merge branch 'main' into task_monitoring_changes
sgup432 May 23, 2023
385fb2a
Addressing comments
sgup432 May 31, 2023
8bd8aa7
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 1, 2023
eaf1718
Merging task completion/cancellation listeners into one
sgup432 Jun 6, 2023
dc65e48
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 6, 2023
51f4369
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 7, 2023
ee0d61a
Updating java docs for stats classes
sgup432 Jun 7, 2023
ba766f3
Updating cancel tracker eviction logic and adding desired UTs
sgup432 Jun 8, 2023
8077a04
Merge branch 'main' into task_monitoring_changes
sgup432 Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased 2.x]
### Added
- Add task cancellation monitoring service ([#7642](https://github.com/opensearch-project/OpenSearch/pull/7642))
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;

Expand Down Expand Up @@ -134,6 +135,9 @@
@Nullable
private FileCacheStats fileCacheStats;

@Nullable
private TaskCancellationStats taskCancellationStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -180,6 +184,11 @@
} else {
fileCacheStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) {
taskCancellationStats = in.readOptionalWriteable(TaskCancellationStats::new);
} else {
taskCancellationStats = null;

Check warning on line 190 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L190

Added line #L190 was not covered by tests
}
}

public NodeStats(
Expand All @@ -204,7 +213,8 @@
@Nullable SearchBackpressureStats searchBackpressureStats,
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -228,6 +238,7 @@
this.clusterManagerThrottlingStats = clusterManagerThrottlingStats;
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -355,6 +366,11 @@
return fileCacheStats;
}

@Nullable
public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;

Check warning on line 371 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L371

Added line #L371 was not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -392,6 +408,9 @@
if (out.getVersion().onOrAfter(Version.V_2_7_0)) {
out.writeOptionalWriteable(fileCacheStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
}

@Override
Expand Down Expand Up @@ -476,6 +495,9 @@
if (getFileCacheStats() != null) {
getFileCacheStats().toXContent(builder, params);
}
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);

Check warning on line 499 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L499

Added line #L499 was not covered by tests
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ public enum Metric {
SEARCH_BACKPRESSURE("search_backpressure"),
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache");
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.SEARCH_BACKPRESSURE.containedIn(metrics),
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics)
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
sgup432 marked this conversation as resolved.
Show resolved Hide resolved
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.backpressure.settings.SearchShardTaskSettings;
import org.opensearch.search.backpressure.settings.SearchTaskSettings;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskManager;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
Expand Down Expand Up @@ -649,7 +650,11 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteRefreshSegmentPressureSettings.MIN_CONSECUTIVE_FAILURES_LIMIT,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_BYTES_PER_SEC_MOVING_AVERAGE_WINDOW_SIZE,
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE
RemoteRefreshSegmentPressureSettings.UPLOAD_TIME_MOVING_AVERAGE_WINDOW_SIZE,

// Related to monitoring of task cancellation
TaskCancellationMonitoringSettings.IS_ENABLED_SETTING,
TaskCancellationMonitoringSettings.DURATION_MILLIS_SETTING
)
)
);
Expand Down
17 changes: 16 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.backpressure.settings.SearchBackpressureSettings;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.tasks.TaskCancellationMonitoringSettings;
import org.opensearch.tasks.TaskResourceTrackingService;
import org.opensearch.tasks.consumer.TopNSearchTasksLogger;
import org.opensearch.threadpool.RunnableTaskExecutionListener;
Expand Down Expand Up @@ -972,6 +974,15 @@ protected Node(
client,
FeatureFlags.isEnabled(SEARCH_PIPELINE)
);
final TaskCancellationMonitoringSettings taskCancellationMonitoringSettings = new TaskCancellationMonitoringSettings(
settings,
clusterService.getClusterSettings()
);
final TaskCancellationMonitoringService taskCancellationMonitoringService = new TaskCancellationMonitoringService(
threadPool,
transportService.getTaskManager(),
taskCancellationMonitoringSettings
);
this.nodeService = new NodeService(
settings,
threadPool,
Expand All @@ -992,7 +1003,8 @@ protected Node(
searchModule.getValuesSourceRegistry().getUsageService(),
searchBackpressureService,
searchPipelineService,
fileCache
fileCache,
taskCancellationMonitoringService
);

final SearchService searchService = newSearchService(
Expand Down Expand Up @@ -1222,6 +1234,7 @@ public Node start() throws NodeValidationException {
injector.getInstance(FsHealthService.class).start();
nodeService.getMonitorService().start();
nodeService.getSearchBackpressureService().start();
nodeService.getTaskCancellationMonitoringService().start();

final ClusterService clusterService = injector.getInstance(ClusterService.class);

Expand Down Expand Up @@ -1380,6 +1393,7 @@ private Node stop() {
injector.getInstance(GatewayService.class).stop();
injector.getInstance(SearchService.class).stop();
injector.getInstance(TransportService.class).stop();
nodeService.getTaskCancellationMonitoringService().stop();

pluginLifecycleComponents.forEach(LifecycleComponent::stop);
// we should stop this last since it waits for resources to get released
Expand Down Expand Up @@ -1443,6 +1457,7 @@ public synchronized void close() throws IOException {
toClose.add(injector.getInstance(SearchService.class));
toClose.add(() -> stopWatch.stop().start("transport"));
toClose.add(injector.getInstance(TransportService.class));
toClose.add(nodeService.getTaskCancellationMonitoringService());

for (LifecycleComponent plugin : pluginLifecycleComponents) {
toClose.add(() -> stopWatch.stop().start("plugin(" + plugin.getClass().getName() + ")"));
Expand Down
16 changes: 13 additions & 3 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import org.opensearch.search.aggregations.support.AggregationUsageService;
import org.opensearch.search.backpressure.SearchBackpressureService;
import org.opensearch.search.pipeline.SearchPipelineService;
import org.opensearch.tasks.TaskCancellationMonitoringService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

Expand Down Expand Up @@ -90,6 +91,7 @@ public class NodeService implements Closeable {
private final ClusterService clusterService;
private final Discovery discovery;
private final FileCache fileCache;
private final TaskCancellationMonitoringService taskCancellationMonitoringService;

NodeService(
Settings settings,
Expand All @@ -111,7 +113,8 @@ public class NodeService implements Closeable {
AggregationUsageService aggregationUsageService,
SearchBackpressureService searchBackpressureService,
SearchPipelineService searchPipelineService,
FileCache fileCache
FileCache fileCache,
TaskCancellationMonitoringService taskCancellationMonitoringService
) {
this.settings = settings;
this.threadPool = threadPool;
Expand All @@ -133,6 +136,7 @@ public class NodeService implements Closeable {
this.searchPipelineService = searchPipelineService;
this.clusterService = clusterService;
this.fileCache = fileCache;
this.taskCancellationMonitoringService = taskCancellationMonitoringService;
clusterService.addStateApplier(ingestService);
clusterService.addStateApplier(searchPipelineService);
}
Expand Down Expand Up @@ -211,7 +215,8 @@ public NodeStats stats(
boolean searchBackpressure,
boolean clusterManagerThrottling,
boolean weightedRoutingStats,
boolean fileCacheStats
boolean fileCacheStats,
boolean taskCancellation
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -237,7 +242,8 @@ public NodeStats stats(
searchBackpressure ? this.searchBackpressureService.nodeStats() : null,
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null
);
}

Expand All @@ -253,6 +259,10 @@ public SearchBackpressureService getSearchBackpressureService() {
return searchBackpressureService;
}

public TaskCancellationMonitoringService getTaskCancellationMonitoringService() {
return taskCancellationMonitoringService;
}

@Override
public void close() throws IOException {
IOUtils.close(indicesService);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.tasks;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;

/**
* Holds monitoring service stats specific to search shard task.
*/
public class SearchShardTaskCancellationStats implements ToXContentObject, Writeable {

private final long currentLongRunningCancelledTaskCount;
private final long totalLongRunningCancelledTaskCount;
Bukhtawar marked this conversation as resolved.
Show resolved Hide resolved

public SearchShardTaskCancellationStats(long currentTaskCount, long totalTaskCount) {
this.currentLongRunningCancelledTaskCount = currentTaskCount;
this.totalLongRunningCancelledTaskCount = totalTaskCount;
}

public SearchShardTaskCancellationStats(StreamInput in) throws IOException {
this.currentLongRunningCancelledTaskCount = in.readVLong();
this.totalLongRunningCancelledTaskCount = in.readVLong();
}

// package private for testing
protected long getCurrentLongRunningCancelledTaskCount() {
return this.currentLongRunningCancelledTaskCount;
}

// package private for testing
protected long getTotalLongRunningCancelledTaskCount() {
return this.totalLongRunningCancelledTaskCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("current_count_post_cancel", currentLongRunningCancelledTaskCount);
builder.field("total_count_post_cancel", totalLongRunningCancelledTaskCount);
return builder.endObject();

Check warning on line 53 in server/src/main/java/org/opensearch/tasks/SearchShardTaskCancellationStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/tasks/SearchShardTaskCancellationStats.java#L50-L53

Added lines #L50 - L53 were not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(currentLongRunningCancelledTaskCount);
out.writeVLong(totalLongRunningCancelledTaskCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SearchShardTaskCancellationStats that = (SearchShardTaskCancellationStats) o;
return currentLongRunningCancelledTaskCount == that.currentLongRunningCancelledTaskCount
&& totalLongRunningCancelledTaskCount == that.totalLongRunningCancelledTaskCount;
}

@Override
public int hashCode() {
return Objects.hash(currentLongRunningCancelledTaskCount, totalLongRunningCancelledTaskCount);
}
}