Skip to content

Commit

Permalink
Add average query concurrency metric for concurrent segment search
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 authored and Jay Deng committed Aug 31, 2023
1 parent 082d425 commit c82df27
Show file tree
Hide file tree
Showing 9 changed files with 177 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
search.concurrent_query_current .+ \n
search.concurrent_query_time .+ \n
search.concurrent_query_total .+ \n
search.avg_query_concurrency .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -21,6 +23,7 @@
import org.opensearch.script.MockScriptPlugin;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

Expand All @@ -34,9 +37,11 @@
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class ConcurrentSearchStatsIT extends OpenSearchIntegTestCase {

private final int SEGMENT_SLICE_COUNT = 4;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ScriptedDelayedPlugin.class, InternalSettingsPlugin.class);
Expand All @@ -49,6 +54,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put(IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), "1ms")
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, SEGMENT_SLICE_COUNT)
.build();
}

Expand Down Expand Up @@ -127,6 +133,122 @@ public void testConcurrentQueryCount() throws Exception {
);
}

/**
* Test average concurrency is correctly calculated across indices for the same node
*/
public void testAvgConcurrencyNodeLevel() throws InterruptedException {
int NUM_SHARDS = 1;

// Create index test1 with 10 segments
createIndex(
"test1",
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test1").setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}

client().prepareSearch("test1").execute().actionGet();
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
double expectedConcurrency = SEGMENT_SLICE_COUNT;
assertEquals(SEGMENT_SLICE_COUNT, nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getAvgConcurrency(), 0);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch("test1").execute().actionGet();

nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1) / 2.0;
assertEquals(expectedConcurrency, nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getAvgConcurrency(), 0);

// Create second index test2 with 10 segments
createIndex(
"test2",
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
for (int i = 0; i < 10; i++) {
client().prepareIndex("test2").setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}

client().prepareSearch("test2").execute().actionGet();
nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT) / 3.0;
assertEquals(expectedConcurrency, nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getAvgConcurrency(), 0);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch("test2").execute().actionGet();
nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT + 1) / 4.0;
assertEquals(expectedConcurrency, nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getAvgConcurrency(), 0);
}

/**
* Test average concurrency is correctly calculated across shard for the same index
*/
public void testAvgConcurrencyIndexLevel() throws InterruptedException {
int NUM_SHARDS = 2;
String INDEX_NAME = "test-index-stats-avg-concurrency";
createIndex(
INDEX_NAME,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
// Create 5 segments on each shard
for (int i = 0; i < 5; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("0").get();
refresh();
}
for (int i = 5; i < 10; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("1").get();
refresh();
}
client().prepareSearch(INDEX_NAME).execute().actionGet();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet();

IndexStats stats = indicesStatsResponse.getIndices().get(INDEX_NAME);
assertNotNull(stats);
double expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS) / (double) NUM_SHARDS;
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getAvgConcurrency(), 0);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch(INDEX_NAME).execute().actionGet();

indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet();
stats = indicesStatsResponse.getIndices().get(INDEX_NAME);
assertNotNull(stats);
expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS + 1 * NUM_SHARDS) / (NUM_SHARDS * 2.0);
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getAvgConcurrency(), 0);
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ public static class Stats implements Writeable, ToXContentFragment {
private long concurrentQueryCount;
private long concurrentQueryTimeInMillis;
private long concurrentQueryCurrent;
private long queryConcurrency;

private long fetchCount;
private long fetchTimeInMillis;
Expand Down Expand Up @@ -98,6 +99,7 @@ public Stats(
long concurrentQueryCount,
long concurrentQueryTimeInMillis,
long concurrentQueryCurrent,
long queryConcurrency,
long fetchCount,
long fetchTimeInMillis,
long fetchCurrent,
Expand All @@ -118,6 +120,7 @@ public Stats(
this.concurrentQueryCount = concurrentQueryCount;
this.concurrentQueryTimeInMillis = concurrentQueryTimeInMillis;
this.concurrentQueryCurrent = concurrentQueryCurrent;
this.queryConcurrency = queryConcurrency;

this.fetchCount = fetchCount;
this.fetchTimeInMillis = fetchTimeInMillis;
Expand Down Expand Up @@ -163,6 +166,7 @@ private Stats(StreamInput in) throws IOException {
concurrentQueryCount = in.readVLong();
concurrentQueryTimeInMillis = in.readVLong();
concurrentQueryCurrent = in.readVLong();
queryConcurrency = in.readVLong();
}
}

Expand All @@ -174,6 +178,7 @@ public void add(Stats stats) {
concurrentQueryCount += stats.concurrentQueryCount;
concurrentQueryTimeInMillis += stats.concurrentQueryTimeInMillis;
concurrentQueryCurrent += stats.concurrentQueryCurrent;
queryConcurrency += stats.queryConcurrency;

fetchCount += stats.fetchCount;
fetchTimeInMillis += stats.fetchTimeInMillis;
Expand Down Expand Up @@ -213,6 +218,7 @@ public void addForClosingShard(Stats stats) {
pitCount += stats.pitCount;
pitTimeInMillis += stats.pitTimeInMillis;
pitCurrent += stats.pitCurrent;
queryConcurrency += stats.queryConcurrency;
}

public long getQueryCount() {
Expand All @@ -239,6 +245,14 @@ public TimeValue getConcurrentQueryTime() {
return new TimeValue(concurrentQueryTimeInMillis);
}

public double getAvgConcurrency() {
if (concurrentQueryCount == 0) {
return 0;
} else {
return queryConcurrency / (double) concurrentQueryCount;
}
}

public long getConcurrentQueryTimeInMillis() {
return concurrentQueryTimeInMillis;
}
Expand All @@ -247,6 +261,10 @@ public long getConcurrentQueryCurrent() {
return concurrentQueryCurrent;
}

public long getQueryConcurrency() {
return queryConcurrency;
}

public long getFetchCount() {
return fetchCount;
}
Expand Down Expand Up @@ -343,6 +361,7 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(concurrentQueryCount);
out.writeVLong(concurrentQueryTimeInMillis);
out.writeVLong(concurrentQueryCurrent);
out.writeVLong(queryConcurrency);
}
}

Expand All @@ -355,6 +374,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Fields.CONCURRENT_QUERY_TOTAL, concurrentQueryCount);
builder.humanReadableField(Fields.CONCURRENT_QUERY_TIME_IN_MILLIS, Fields.CONCURRENT_QUERY_TIME, getConcurrentQueryTime());
builder.field(Fields.CONCURRENT_QUERY_CURRENT, concurrentQueryCurrent);
builder.field(Fields.AVG_QUERY_CONCURRENCY, getAvgConcurrency());

builder.field(Fields.FETCH_TOTAL, fetchCount);
builder.humanReadableField(Fields.FETCH_TIME_IN_MILLIS, Fields.FETCH_TIME, getFetchTime());
Expand Down Expand Up @@ -484,6 +504,7 @@ static final class Fields {
static final String CONCURRENT_QUERY_TIME = "concurrent_query_time";
static final String CONCURRENT_QUERY_TIME_IN_MILLIS = "concurrent_query_time_in_millis";
static final String CONCURRENT_QUERY_CURRENT = "concurrent_query_current";
static final String AVG_QUERY_CONCURRENCY = "avg_query_concurrency";
static final String FETCH_TOTAL = "fetch_total";
static final String FETCH_TIME = "fetch_time";
static final String FETCH_TIME_IN_MILLIS = "fetch_time_in_millis";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public void onFailedQueryPhase(SearchContext searchContext) {
if (searchContext.shouldUseConcurrentSearch()) {
statsHolder.concurrentQueryCurrent.dec();
assert statsHolder.concurrentQueryCurrent.count() >= 0;
if (searchContext.searcher().getSlices() != null) {
statsHolder.queryConcurrencyMetric.inc(searchContext.searcher().getSlices().length);
}
}
}
});
Expand All @@ -130,6 +133,8 @@ public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
statsHolder.concurrentQueryMetric.inc(tookInNanos);
statsHolder.concurrentQueryCurrent.dec();
assert statsHolder.concurrentQueryCurrent.count() >= 0;
assert searchContext.searcher().getSlices() != null;
statsHolder.queryConcurrencyMetric.inc(searchContext.searcher().getSlices().length);
}
}
});
Expand Down Expand Up @@ -219,6 +224,7 @@ public void onFreePitContext(ReaderContext readerContext) {
static final class StatsHolder {
final MeanMetric queryMetric = new MeanMetric();
final MeanMetric concurrentQueryMetric = new MeanMetric();
final MeanMetric queryConcurrencyMetric = new MeanMetric();
final MeanMetric fetchMetric = new MeanMetric();
/* We store scroll statistics in microseconds because with nanoseconds we run the risk of overflowing the total stats if there are
* many scrolls. For example, on a system with 2^24 scrolls that have been executed, each executing for 2^10 seconds, then using
Expand All @@ -245,6 +251,7 @@ SearchStats.Stats stats() {
concurrentQueryMetric.count(),
TimeUnit.NANOSECONDS.toMillis(concurrentQueryMetric.sum()),
concurrentQueryCurrent.count(),
queryConcurrencyMetric.sum(),
fetchMetric.count(),
TimeUnit.NANOSECONDS.toMillis(fetchMetric.sum()),
fetchCurrent.count(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,12 @@ protected Table getTableWithHeader(final RestRequest request) {
);
table.addCell("pri.search.concurrent_query_total", "default:false;text-align:right;desc:total query phase ops");

table.addCell(
"search.avg_query_concurrency",
"sibling:pri;alias:saqc,searchAvgQueryConcurrency;default:false;text-align:right;desc:average query concurrency"
);
table.addCell("pri.search.avg_query_concurrency", "default:false;text-align:right;desc:average query concurrency");

table.addCell(
"search.scroll_current",
"sibling:pri;alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts"
Expand Down Expand Up @@ -916,6 +922,9 @@ Table buildTable(
table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getConcurrentQueryCount());
table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getConcurrentQueryCount());

table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getQueryConcurrency());
table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getQueryConcurrency());

table.addCell(totalStats.getSearch() == null ? null : totalStats.getSearch().getTotal().getScrollCurrent());
table.addCell(primaryStats.getSearch() == null ? null : primaryStats.getSearch().getTotal().getScrollCurrent());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ protected Table getTableWithHeader(final RestRequest request) {
"search.concurrent_query_total",
"alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops"
);
table.addCell(
"search.avg_query_concurrency",
"alias:saqc,searchAvgQueryConcurrency;default:false;text-align:right;desc:average query concurrency"
);
table.addCell("search.scroll_current", "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts");
table.addCell(
"search.scroll_time",
Expand Down Expand Up @@ -544,6 +548,7 @@ Table buildTable(
table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryCurrent());
table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryTime());
table.addCell(searchStats == null ? null : searchStats.getTotal().getConcurrentQueryCount());
table.addCell(searchStats == null ? null : searchStats.getTotal().getQueryConcurrency());
table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollCurrent());
table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollTime());
table.addCell(searchStats == null ? null : searchStats.getTotal().getScrollCount());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ protected Table getTableWithHeader(final RestRequest request) {
"search.concurrent_query_total",
"alias:scqto,searchConcurrentQueryTotal;default:false;text-align:right;desc:total concurrent query phase ops"
);
table.addCell(
"search.avg_query_concurrency",
"alias:saqc,searchAvgQueryConcurrency;default:false;text-align:right;desc:average query concurrency"
);
table.addCell("search.scroll_current", "alias:scc,searchScrollCurrent;default:false;text-align:right;desc:open scroll contexts");
table.addCell(
"search.scroll_time",
Expand Down Expand Up @@ -414,6 +418,7 @@ Table buildTable(RestRequest request, ClusterStateResponse state, IndicesStatsRe
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCurrent()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryTime()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getConcurrentQueryCount()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getQueryConcurrency()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCurrent()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollTime()));
table.addCell(getOrNull(commonStats, CommonStats::getSearch, i -> i.getTotal().getScrollCount()));
Expand Down

0 comments on commit c82df27

Please sign in to comment.