Skip to content

Commit

Permalink
Adding concurrent search versions of query count and time metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 30, 2023
1 parent 8cfde6c commit 04a13c5
Show file tree
Hide file tree
Showing 10 changed files with 267 additions and 84 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Segment Replication] Support realtime reads for GET requests ([#9212](https://github.com/opensearch-project/OpenSearch/pull/9212))
- [Feature] Expose term frequency in Painless script score context ([#9081](https://github.com/opensearch-project/OpenSearch/pull/9081))
- Add support for reading partial files to HDFS repository ([#9513](https://github.com/opensearch-project/OpenSearch/issues/9513))
- Add concurrent segment search related metrics to node and index stats ([#9622](https://github.com/opensearch-project/OpenSearch/issues/9622))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,92 +1,94 @@
---
"Help":
- skip:
version: " - 2.3.99"
reason: point in time stats were added in 2.4.0
version: " - 2.9.99"
reason: concurrent search stats were added in 2.10.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.4.0 - "
version: "2.10.0 - "

- match:
$body: |
/^ index .+ \n
shard .+ \n
prirep .+ \n
state .+ \n
docs .+ \n
store .+ \n
ip .+ \n
id .+ \n
node .+ \n
sync_id .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
recoverysource.type .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n
query_cache.memory_size .+ \n
query_cache.evictions .+ \n
flush.total .+ \n
flush.total_time .+ \n
get.current .+ \n
get.time .+ \n
get.total .+ \n
get.exists_time .+ \n
get.exists_total .+ \n
get.missing_time .+ \n
get.missing_total .+ \n
indexing.delete_current .+ \n
indexing.delete_time .+ \n
indexing.delete_total .+ \n
indexing.index_current .+ \n
indexing.index_time .+ \n
indexing.index_total .+ \n
indexing.index_failed .+ \n
merges.current .+ \n
merges.current_docs .+ \n
merges.current_size .+ \n
merges.total .+ \n
merges.total_docs .+ \n
merges.total_size .+ \n
merges.total_time .+ \n
refresh.total .+ \n
refresh.time .+ \n
refresh.external_total .+ \n
refresh.external_time .+ \n
refresh.listeners .+ \n
search.fetch_current .+ \n
search.fetch_time .+ \n
search.fetch_total .+ \n
search.open_contexts .+ \n
search.query_current .+ \n
search.query_time .+ \n
search.query_total .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
search.point_in_time_current .+ \n
search.point_in_time_time .+ \n
search.point_in_time_total .+ \n
segments.count .+ \n
segments.memory .+ \n
segments.index_writer_memory .+ \n
segments.version_map_memory .+ \n
segments.fixed_bitset_memory .+ \n
seq_no.max .+ \n
seq_no.local_checkpoint .+ \n
seq_no.global_checkpoint .+ \n
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
$/
/^ index .+ \n
shard .+ \n
prirep .+ \n
state .+ \n
docs .+ \n
store .+ \n
ip .+ \n
id .+ \n
node .+ \n
sync_id .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
recoverysource.type .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n
query_cache.memory_size .+ \n
query_cache.evictions .+ \n
flush.total .+ \n
flush.total_time .+ \n
get.current .+ \n
get.time .+ \n
get.total .+ \n
get.exists_time .+ \n
get.exists_total .+ \n
get.missing_time .+ \n
get.missing_total .+ \n
indexing.delete_current .+ \n
indexing.delete_time .+ \n
indexing.delete_total .+ \n
indexing.index_current .+ \n
indexing.index_time .+ \n
indexing.index_total .+ \n
indexing.index_failed .+ \n
merges.current .+ \n
merges.current_docs .+ \n
merges.current_size .+ \n
merges.total .+ \n
merges.total_docs .+ \n
merges.total_size .+ \n
merges.total_time .+ \n
refresh.total .+ \n
refresh.time .+ \n
refresh.external_total .+ \n
refresh.external_time .+ \n
refresh.listeners .+ \n
search.fetch_current .+ \n
search.fetch_time .+ \n
search.fetch_total .+ \n
search.open_contexts .+ \n
search.query_current .+ \n
search.query_time .+ \n
search.query_total .+ \n
search.concurrent_query_current .+ \n
search.concurrent_query_time .+ \n
search.concurrent_query_total .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
search.point_in_time_current .+ \n
search.point_in_time_time .+ \n
search.point_in_time_total .+ \n
segments.count .+ \n
segments.memory .+ \n
segments.index_writer_memory .+ \n
segments.version_map_memory .+ \n
segments.fixed_bitset_memory .+ \n
seq_no.max .+ \n
seq_no.local_checkpoint .+ \n
seq_no.global_checkpoint .+ \n
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
$/
---
"Help before - 2.4.0":
- skip:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.core.action.support.DefaultShardOperationFailedException;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.core.common.io.stream.StreamOutput;
Expand Down Expand Up @@ -143,6 +144,12 @@ public Settings indexSettings() {
.build();
}

@Override
// Do not parameterize this test, instead let individual test methods modify concurrent search settings as needed.
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.CONCURRENT_SEGMENT_SEARCH, "true").build();
}

private Settings.Builder settingsBuilder() {
return Settings.builder().put(indexSettings());
}
Expand Down Expand Up @@ -1090,6 +1097,49 @@ public void testGroupsParam() throws Exception {

}

public void testConcurrentQueryCount() throws Exception {
int NUM_SHARDS = randomIntBetween(1, 5);
createIndex(
"test1",
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
createIndex(
"test2",
Settings.builder()
.put(indexSettings())
.put("search.concurrent_segment_search.enabled", false)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);

ensureGreen();

client().prepareIndex("test1").setId(Integer.toString(1)).setSource("foo", "bar").execute().actionGet();
client().prepareIndex("test2").setId(Integer.toString(1)).setSource("foo", "bar").execute().actionGet();
refresh();

client().prepareSearch("_all").execute().actionGet();
client().prepareSearch("test1").execute().actionGet();
client().prepareSearch("test2").execute().actionGet();

IndicesStatsRequestBuilder builder = client().admin().indices().prepareStats();
IndicesStatsResponse stats = builder.execute().actionGet();

assertEquals(4 * NUM_SHARDS, stats.getTotal().search.getTotal().getQueryCount());
assertEquals(2 * NUM_SHARDS, stats.getTotal().search.getTotal().getConcurrentQueryCount());
assertThat(stats.getTotal().search.getTotal().getQueryTimeInMillis(), greaterThan(0L));
assertThat(stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(), greaterThan(0L));
assertThat(
stats.getTotal().search.getTotal().getConcurrentQueryTimeInMillis(),
lessThan(stats.getTotal().search.getTotal().getQueryTimeInMillis())
);
}

private static void set(Flag flag, IndicesStatsRequestBuilder builder, boolean set) {
switch (flag) {
case Docs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public static class Stats implements Writeable, ToXContentFragment {
private long queryTimeInMillis;
private long queryCurrent;

private long concurrentQueryCount;
private long concurrentQueryTimeInMillis;
private long concurrentQueryCurrent;

private long fetchCount;
private long fetchTimeInMillis;
private long fetchCurrent;
Expand All @@ -91,6 +95,9 @@ public Stats(
long queryCount,
long queryTimeInMillis,
long queryCurrent,
long concurrentQueryCount,
long concurrentQueryTimeInMillis,
long concurrentQueryCurrent,
long fetchCount,
long fetchTimeInMillis,
long fetchCurrent,
Expand All @@ -108,6 +115,10 @@ public Stats(
this.queryTimeInMillis = queryTimeInMillis;
this.queryCurrent = queryCurrent;

this.concurrentQueryCount = concurrentQueryCount;
this.concurrentQueryTimeInMillis = concurrentQueryTimeInMillis;
this.concurrentQueryCurrent = concurrentQueryCurrent;

this.fetchCount = fetchCount;
this.fetchTimeInMillis = fetchTimeInMillis;
this.fetchCurrent = fetchCurrent;
Expand All @@ -130,6 +141,12 @@ private Stats(StreamInput in) throws IOException {
queryTimeInMillis = in.readVLong();
queryCurrent = in.readVLong();

if (in.getVersion().onOrAfter(Version.V_2_10_0)) {
concurrentQueryCount = in.readVLong();
concurrentQueryTimeInMillis = in.readVLong();
concurrentQueryCurrent = in.readVLong();
}

fetchCount = in.readVLong();
fetchTimeInMillis = in.readVLong();
fetchCurrent = in.readVLong();
Expand All @@ -154,6 +171,10 @@ public void add(Stats stats) {
queryTimeInMillis += stats.queryTimeInMillis;
queryCurrent += stats.queryCurrent;

concurrentQueryCount += stats.concurrentQueryCount;
concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis;
concurrentQueryCurrent += stats.concurrentQueryCurrent;

fetchCount += stats.fetchCount;
fetchTimeInMillis += stats.fetchTimeInMillis;
fetchCurrent += stats.fetchCurrent;
Expand All @@ -175,6 +196,9 @@ public void addForClosingShard(Stats stats) {
queryCount += stats.queryCount;
queryTimeInMillis += stats.queryTimeInMillis;

concurrentQueryCount += stats.concurrentQueryCount;
concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis;

fetchCount += stats.fetchCount;
fetchTimeInMillis += stats.fetchTimeInMillis;

Expand Down Expand Up @@ -207,6 +231,22 @@ public long getQueryCurrent() {
return queryCurrent;
}

public long getConcurrentQueryCount() {
return concurrentQueryCount;
}

public TimeValue getConcurrentQueryTime() {
return new TimeValue(concurrentQueryTimeInMillis);
}

public long getConcurrentQueryTimeInMillis() {
return concurrentQueryTimeInMillis;
}

public long getConcurrentQueryCurrent() {
return concurrentQueryCurrent;
}

public long getFetchCount() {
return fetchCount;
}
Expand Down Expand Up @@ -281,6 +321,12 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(queryTimeInMillis);
out.writeVLong(queryCurrent);

if (out.getVersion().onOrAfter(Version.V_2_10_0)) {
out.writeVLong(concurrentQueryCount);
out.writeVLong(concurrentQueryTimeInMillis);
out.writeVLong(concurrentQueryCurrent);
}

out.writeVLong(fetchCount);
out.writeVLong(fetchTimeInMillis);
out.writeVLong(fetchCurrent);
Expand All @@ -306,6 +352,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.humanReadableField(Fields.QUERY_TIME_IN_MILLIS, Fields.QUERY_TIME, getQueryTime());
builder.field(Fields.QUERY_CURRENT, queryCurrent);

builder.field(Fields.CONCURRENT_QUERY_TOTAL, concurrentQueryCount);
builder.humanReadableField(Fields.CONCURRENT_QUERY_TIME_IN_MILLIS, Fields.CONCURRENT_QUERY_TIME, getConcurrentQueryTime());
builder.field(Fields.CONCURRENT_QUERY_CURRENT, concurrentQueryCurrent);

builder.field(Fields.FETCH_TOTAL, fetchCount);
builder.humanReadableField(Fields.FETCH_TIME_IN_MILLIS, Fields.FETCH_TIME, getFetchTime());
builder.field(Fields.FETCH_CURRENT, fetchCurrent);
Expand Down Expand Up @@ -430,6 +480,10 @@ static final class Fields {
static final String QUERY_TIME = "query_time";
static final String QUERY_TIME_IN_MILLIS = "query_time_in_millis";
static final String QUERY_CURRENT = "query_current";
static final String CONCURRENT_QUERY_TOTAL = "concurrent_query_total";
static final String CONCURRENT_QUERY_TIME = "concurrent_query_time";
static final String CONCURRENT_QUERY_TIME_IN_MILLIS = "concurrent_query_time_in_millis";
static final String CONCURRENT_QUERY_CURRENT = "concurrent_query_current";
static final String FETCH_TOTAL = "fetch_total";
static final String FETCH_TIME = "fetch_time";
static final String FETCH_TIME_IN_MILLIS = "fetch_time_in_millis";
Expand Down
Loading

0 comments on commit 04a13c5

Please sign in to comment.