Skip to content

Commit

Permalink
[Search Pipelines] Add stats for search pipelines
Browse files Browse the repository at this point in the history
This adds statistics on executions and time spent on search pipeline
operations, similar to the stats that are available for ingest
pipelines.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jun 15, 2023
1 parent bb1f7e2 commit a6a5784
Show file tree
Hide file tree
Showing 18 changed files with 1,062 additions and 56 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Change `com.amazonaws.sdk.ec2MetadataServiceEndpointOverride` to `aws.ec2MetadataServiceEndpoint` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/))
- Change `com.amazonaws.sdk.stsEndpointOverride` to `aws.stsEndpointOverride` ([7372](https://github.com/opensearch-project/OpenSearch/pull/7372/))
- Align range and default value for deletes_pct_allowed in merge policy ([#7730](https://github.com/opensearch-project/OpenSearch/pull/7730))
- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025))
- Rename QueryPhase actors like Suggest, Rescore to be processors rather than phase ([#8025](https://github.com/opensearch-project/OpenSearch/pull/8025))

### Deprecated

Expand Down Expand Up @@ -102,6 +102,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053))
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.pipeline.SearchPipelineStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;
Expand Down Expand Up @@ -138,6 +139,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private TaskCancellationStats taskCancellationStats;

@Nullable
private SearchPipelineStats searchPipelineStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -189,6 +193,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
taskCancellationStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO Update to 2_9_0 when we backport to 2.x
searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new);
} else {
searchPipelineStats = null;

Check warning on line 199 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L199

Added line #L199 was not covered by tests
}
}

public NodeStats(
Expand All @@ -214,7 +223,8 @@ public NodeStats(
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -239,6 +249,7 @@ public NodeStats(
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -371,6 +382,11 @@ public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Nullable
public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;

Check warning on line 387 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L387

Added line #L387 was not covered by tests
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -411,6 +427,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update to 2_9_0 once we backport to 2.x
out.writeOptionalWriteable(searchPipelineStats);
}
}

@Override
Expand Down Expand Up @@ -498,6 +517,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);

Check warning on line 521 in server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/action/admin/cluster/node/stats/NodeStats.java#L521

Added line #L521 was not covered by tests
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public enum Metric {
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void inc(long n) {
sum.add(n);
}

public void add(MeanMetric other) {
counter.add(other.counter.sum());
sum.add(other.sum.sum());
}

public void dec(long n) {
counter.decrement();
sum.add(-n);
Expand Down
14 changes: 4 additions & 10 deletions server/src/main/java/org/opensearch/ingest/IngestMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
/**
* <p>Metrics to measure ingest actions.
* <p>This counts measure documents and timings for a given scope.
* The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline,
* The scope is determined by the calling code. For example, you can use this class to count all documents across all pipelines,
* or you can use this class to count documents for a given pipeline or a specific processor.
* This class does not make assumptions about it's given scope.
* This class does not make assumptions about its given scope.
*
* @opensearch.internal
*/
Expand All @@ -57,10 +57,6 @@ class IngestMetric {
* Useful when aggregating multiple metrics to see how many things are in flight.
*/
private final AtomicLong ingestCurrent = new AtomicLong();
/**
* The ever increasing count of things being measured
*/
private final CounterMetric ingestCount = new CounterMetric();
/**
* The only increasing count of failures
*/
Expand All @@ -80,7 +76,6 @@ void preIngest() {
void postIngest(long ingestTimeInMillis) {
ingestCurrent.decrementAndGet();
ingestTime.inc(ingestTimeInMillis);
ingestCount.inc();
}

/**
Expand All @@ -98,15 +93,14 @@ void ingestFailed() {
* @param metrics The metric to add.
*/
void add(IngestMetric metrics) {
ingestCount.inc(metrics.ingestCount.count());
ingestTime.inc(metrics.ingestTime.sum());
ingestTime.add(metrics.ingestTime);
ingestFailed.inc(metrics.ingestFailed.count());
}

/**
* Creates a serializable representation for these metrics.
*/
IngestStats.Stats createStats() {
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count());
return new IngestStats.Stats(ingestTime.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count());
}
}
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public NodeStats stats(
boolean clusterManagerThrottling,
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation
boolean taskCancellation,
boolean searchPipelineStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -243,7 +244,8 @@ public NodeStats stats(
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null
);
}

Expand Down

0 comments on commit a6a5784

Please sign in to comment.