Skip to content

Commit

Permalink
[Backport 2.x] [Search Pipelines] Add stats for search pipelines (#8376)
Browse files Browse the repository at this point in the history
backport commit 46c9a21 to `2.x` branch.
* [Search Pipelines] Add stats for search pipelines (#8053)

This adds statistics on executions and time spent on search pipeline
operations, similar to the stats that are available for ingest
pipelines.

Signed-off-by: Mingshi Liu <mingshl@amazon.com>
Co-authored-by: Michael Froh <froh@amazon.com>
  • Loading branch information
mingshl and msfroh committed Jul 5, 2023
1 parent 876dd16 commit d150117
Show file tree
Hide file tree
Showing 33 changed files with 1,399 additions and 459 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- [Search pipelines] Added search pipelines output to node stats ([#8053](https://github.com/opensearch-project/OpenSearch/pull/8053))
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))
- Add API to initialize extensions ([#8029]()https://github.com/opensearch-project/OpenSearch/pull/8029)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public void testFailureInConditionalProcessor() {
for (int k = 0; k < nodeCount; k++) {
List<IngestStats.ProcessorStat> stats = r.getNodes().get(k).getIngestStats().getProcessorStats().get(pipelineId);
for (IngestStats.ProcessorStat st : stats) {
assertThat(st.getStats().getIngestCurrent(), greaterThanOrEqualTo(0L));
assertThat(st.getStats().getCurrent(), greaterThanOrEqualTo(0L));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.pipeline.SearchPipelineStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;
Expand Down Expand Up @@ -139,6 +140,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private TaskCancellationStats taskCancellationStats;

@Nullable
private SearchPipelineStats searchPipelineStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -202,6 +206,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
taskCancellationStats = null;
}
if (in.getVersion().onOrAfter(Version.V_2_9_0)) {
searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new);
} else {
searchPipelineStats = null;
}
}

public NodeStats(
Expand All @@ -227,7 +236,8 @@ public NodeStats(
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -252,6 +262,7 @@ public NodeStats(
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -384,6 +395,11 @@ public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Nullable
public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -430,6 +446,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
if (out.getVersion().onOrAfter(Version.V_2_9_0)) {
out.writeOptionalWriteable(searchPipelineStats);
}
}

@Override
Expand Down Expand Up @@ -517,6 +536,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ public enum Metric {
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodeRole;
import org.opensearch.common.Strings;
import org.opensearch.common.metrics.OperationStats;
import org.opensearch.common.network.NetworkModule;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.transport.TransportAddress;
Expand Down Expand Up @@ -800,18 +801,18 @@ static class IngestStats implements ToXContentFragment {
pipelineIds.add(processorStats.getKey());
for (org.opensearch.ingest.IngestStats.ProcessorStat stat : processorStats.getValue()) {
stats.compute(stat.getType(), (k, v) -> {
org.opensearch.ingest.IngestStats.Stats nodeIngestStats = stat.getStats();
OperationStats nodeIngestStats = stat.getStats();
if (v == null) {
return new long[] {
nodeIngestStats.getIngestCount(),
nodeIngestStats.getIngestFailedCount(),
nodeIngestStats.getIngestCurrent(),
nodeIngestStats.getIngestTimeInMillis() };
nodeIngestStats.getCount(),
nodeIngestStats.getFailedCount(),
nodeIngestStats.getCurrent(),
nodeIngestStats.getTotalTimeInMillis() };
} else {
v[0] += nodeIngestStats.getIngestCount();
v[1] += nodeIngestStats.getIngestFailedCount();
v[2] += nodeIngestStats.getIngestCurrent();
v[3] += nodeIngestStats.getIngestTimeInMillis();
v[0] += nodeIngestStats.getCount();
v[1] += nodeIngestStats.getFailedCount();
v[2] += nodeIngestStats.getCurrent();
v[3] += nodeIngestStats.getTotalTimeInMillis();
return v;
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void inc(long n) {
sum.add(n);
}

public void add(MeanMetric other) {
counter.add(other.counter.sum());
sum.add(other.sum.sum());
}

public void dec(long n) {
counter.decrement();
sum.add(-n);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import java.util.concurrent.atomic.AtomicLong;

/**
* Mutable tracker of a repeated operation.
*
* @opensearch.internal
*/
public class OperationMetrics {
/**
* The mean time it takes to complete the measured item.
*/
private final MeanMetric time = new MeanMetric();
/**
* The current count of things being measured.
* Useful when aggregating multiple metrics to see how many things are in flight.
*/
private final AtomicLong current = new AtomicLong();
/**
* The non-decreasing count of failures
*/
private final CounterMetric failed = new CounterMetric();

/**
* Invoked before the given operation begins.
*/
public void before() {
current.incrementAndGet();
}

/**
* Invoked upon completion (success or failure) of the given operation
* @param currentTime elapsed time of the operation
*/
public void after(long currentTime) {
current.decrementAndGet();
time.inc(currentTime);
}

/**
* Invoked upon failure of the operation.
*/
public void failed() {
failed.inc();
}

public void add(OperationMetrics other) {
// Don't try copying over current, since in-flight requests will be linked to the existing metrics instance.
failed.inc(other.failed.count());
time.add(other.time);
}

/**
* @return an immutable snapshot of the current metric values.
*/
public OperationStats createStats() {
return new OperationStats(time.count(), time.sum(), current.get(), failed.count());
}
}
107 changes: 107 additions & 0 deletions server/src/main/java/org/opensearch/common/metrics/OperationStats.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common.metrics;

import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
import org.opensearch.common.io.stream.Writeable;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;

import java.io.IOException;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

/**
* An immutable representation of a {@link OperationMetrics}
*/
public class OperationStats implements Writeable, ToXContentFragment {
private final long count;
private final long totalTimeInMillis;
private final long current;
private final long failedCount;

public OperationStats(long count, long totalTimeInMillis, long current, long failedCount) {
this.count = count;
this.totalTimeInMillis = totalTimeInMillis;
this.current = current;
this.failedCount = failedCount;
}

/**
* Read from a stream.
*/
public OperationStats(StreamInput in) throws IOException {
count = in.readVLong();
totalTimeInMillis = in.readVLong();
current = in.readVLong();
failedCount = in.readVLong();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(count);
out.writeVLong(totalTimeInMillis);
out.writeVLong(current);
out.writeVLong(failedCount);
}

/**
* @return The total number of executed operations.
*/
public long getCount() {
return count;
}

/**
* @return The total time spent of in millis.
*/
public long getTotalTimeInMillis() {
return totalTimeInMillis;
}

/**
* @return The total number of operations currently executing.
*/
public long getCurrent() {
return current;
}

/**
* @return The total number of operations that have failed.
*/
public long getFailedCount() {
return failedCount;
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
return builder.field("count", count)
.humanReadableField("time_in_millis", "time", new TimeValue(totalTimeInMillis, TimeUnit.MILLISECONDS))
.field("current", current)
.field("failed", failedCount);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
OperationStats that = (OperationStats) o;
return Objects.equals(count, that.count)
&& Objects.equals(totalTimeInMillis, that.totalTimeInMillis)
&& Objects.equals(failedCount, that.failedCount)
&& Objects.equals(current, that.current);
}

@Override
public int hashCode() {
return Objects.hash(count, totalTimeInMillis, failedCount, current);
}
}

0 comments on commit d150117

Please sign in to comment.