Skip to content

Commit

Permalink
[Search Pipelines] Add stats for search pipelines
Browse files Browse the repository at this point in the history
This adds statistics on executions and time spent on search pipeline
operations, similar to the stats that are available for ingest
pipelines.

Signed-off-by: Michael Froh <froh@amazon.com>
  • Loading branch information
msfroh committed Jun 14, 2023
1 parent a81ef5a commit a94b09a
Show file tree
Hide file tree
Showing 17 changed files with 1,060 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.opensearch.script.ScriptCacheStats;
import org.opensearch.script.ScriptStats;
import org.opensearch.search.backpressure.stats.SearchBackpressureStats;
import org.opensearch.search.pipeline.SearchPipelineStats;
import org.opensearch.tasks.TaskCancellationStats;
import org.opensearch.threadpool.ThreadPoolStats;
import org.opensearch.transport.TransportStats;
Expand Down Expand Up @@ -138,6 +139,9 @@ public class NodeStats extends BaseNodeResponse implements ToXContentFragment {
@Nullable
private TaskCancellationStats taskCancellationStats;

@Nullable
private SearchPipelineStats searchPipelineStats;

public NodeStats(StreamInput in) throws IOException {
super(in);
timestamp = in.readVLong();
Expand Down Expand Up @@ -189,6 +193,11 @@ public NodeStats(StreamInput in) throws IOException {
} else {
taskCancellationStats = null;
}
if (in.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO Update to 2_9_0 when we backport to 2.x
searchPipelineStats = in.readOptionalWriteable(SearchPipelineStats::new);
} else {
searchPipelineStats = null;
}
}

public NodeStats(
Expand All @@ -214,7 +223,8 @@ public NodeStats(
@Nullable ClusterManagerThrottlingStats clusterManagerThrottlingStats,
@Nullable WeightedRoutingStats weightedRoutingStats,
@Nullable FileCacheStats fileCacheStats,
@Nullable TaskCancellationStats taskCancellationStats
@Nullable TaskCancellationStats taskCancellationStats,
@Nullable SearchPipelineStats searchPipelineStats
) {
super(node);
this.timestamp = timestamp;
Expand All @@ -239,6 +249,7 @@ public NodeStats(
this.weightedRoutingStats = weightedRoutingStats;
this.fileCacheStats = fileCacheStats;
this.taskCancellationStats = taskCancellationStats;
this.searchPipelineStats = searchPipelineStats;
}

public long getTimestamp() {
Expand Down Expand Up @@ -371,6 +382,11 @@ public TaskCancellationStats getTaskCancellationStats() {
return taskCancellationStats;
}

@Nullable
public SearchPipelineStats getSearchPipelineStats() {
return searchPipelineStats;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
Expand Down Expand Up @@ -411,6 +427,9 @@ public void writeTo(StreamOutput out) throws IOException {
if (out.getVersion().onOrAfter(Version.V_3_0_0)) {
out.writeOptionalWriteable(taskCancellationStats);
}
if (out.getVersion().onOrAfter(Version.V_3_0_0)) { // TODO: Update to 2_9_0 once we backport to 2.x
out.writeOptionalWriteable(searchPipelineStats);
}
}

@Override
Expand Down Expand Up @@ -498,6 +517,9 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (getTaskCancellationStats() != null) {
getTaskCancellationStats().toXContent(builder, params);
}
if (getSearchPipelineStats() != null) {
getSearchPipelineStats().toXContent(builder, params);
}

return builder;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ public enum Metric {
CLUSTER_MANAGER_THROTTLING("cluster_manager_throttling"),
WEIGHTED_ROUTING_STATS("weighted_routing"),
FILE_CACHE_STATS("file_cache"),
TASK_CANCELLATION("task_cancellation");
TASK_CANCELLATION("task_cancellation"),
SEARCH_PIPELINE("search_pipeline");

private String metricName;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ protected NodeStats nodeOperation(NodeStatsRequest nodeStatsRequest) {
NodesStatsRequest.Metric.CLUSTER_MANAGER_THROTTLING.containedIn(metrics),
NodesStatsRequest.Metric.WEIGHTED_ROUTING_STATS.containedIn(metrics),
NodesStatsRequest.Metric.FILE_CACHE_STATS.containedIn(metrics),
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics)
NodesStatsRequest.Metric.TASK_CANCELLATION.containedIn(metrics),
NodesStatsRequest.Metric.SEARCH_PIPELINE.containedIn(metrics)
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ protected ClusterStatsNodeResponse nodeOperation(ClusterStatsNodeRequest nodeReq
false,
false,
false,
false,
false
);
List<ShardStats> shardsStats = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void inc(long n) {
sum.add(n);
}

public void add(MeanMetric other) {
counter.add(other.counter.sum());
sum.add(other.sum.sum());
}

public void dec(long n) {
counter.decrement();
sum.add(-n);
Expand Down
14 changes: 4 additions & 10 deletions server/src/main/java/org/opensearch/ingest/IngestMetric.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@
/**
* <p>Metrics to measure ingest actions.
* <p>This counts measure documents and timings for a given scope.
* The scope is determined by the calling code. For example you can use this class to count all documents across all pipeline,
* The scope is determined by the calling code. For example, you can use this class to count all documents across all pipelines,
* or you can use this class to count documents for a given pipeline or a specific processor.
* This class does not make assumptions about it's given scope.
* This class does not make assumptions about its given scope.
*
* @opensearch.internal
*/
Expand All @@ -57,10 +57,6 @@ class IngestMetric {
* Useful when aggregating multiple metrics to see how many things are in flight.
*/
private final AtomicLong ingestCurrent = new AtomicLong();
/**
* The ever increasing count of things being measured
*/
private final CounterMetric ingestCount = new CounterMetric();
/**
* The only increasing count of failures
*/
Expand All @@ -80,7 +76,6 @@ void preIngest() {
void postIngest(long ingestTimeInMillis) {
ingestCurrent.decrementAndGet();
ingestTime.inc(ingestTimeInMillis);
ingestCount.inc();
}

/**
Expand All @@ -98,15 +93,14 @@ void ingestFailed() {
* @param metrics The metric to add.
*/
void add(IngestMetric metrics) {
ingestCount.inc(metrics.ingestCount.count());
ingestTime.inc(metrics.ingestTime.sum());
ingestTime.add(metrics.ingestTime);
ingestFailed.inc(metrics.ingestFailed.count());
}

/**
* Creates a serializable representation for these metrics.
*/
IngestStats.Stats createStats() {
return new IngestStats.Stats(ingestCount.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count());
return new IngestStats.Stats(ingestTime.count(), ingestTime.sum(), ingestCurrent.get(), ingestFailed.count());
}
}
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/node/NodeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ public NodeStats stats(
boolean clusterManagerThrottling,
boolean weightedRoutingStats,
boolean fileCacheStats,
boolean taskCancellation
boolean taskCancellation,
boolean searchPipelineStats
) {
// for indices stats we want to include previous allocated shards stats as well (it will
// only be applied to the sensible ones to use, like refresh/merge/flush/indexing stats)
Expand All @@ -243,7 +244,8 @@ public NodeStats stats(
clusterManagerThrottling ? this.clusterService.getClusterManagerService().getThrottlingStats() : null,
weightedRoutingStats ? WeightedRoutingStats.getInstance() : null,
fileCacheStats && fileCache != null ? fileCache.fileCacheStats() : null,
taskCancellation ? this.taskCancellationMonitoringService.stats() : null
taskCancellation ? this.taskCancellationMonitoringService.stats() : null,
searchPipelineStats ? this.searchPipelineService.stats() : null
);
}

Expand Down

0 comments on commit a94b09a

Please sign in to comment.