Skip to content

Commit

Permalink
Add average query concurrency metric for concurrent segment search (#…
Browse files Browse the repository at this point in the history
…9670)

Signed-off-by: Jay Deng <jayd0104@gmail.com>
  • Loading branch information
jed326 committed Sep 2, 2023
1 parent ff4b23b commit 079c3e9
Show file tree
Hide file tree
Showing 10 changed files with 313 additions and 163 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))
- Add average concurrency metric for concurrent segment search ([#9670](https://github.com/opensearch-project/OpenSearch/issues/9670))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,105 +1,13 @@
"Help":
- skip:
version: " - 2.99.99"
reason: concurrent search stats were added in 3.0.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "3.0.0 - "

- match:
$body: |
/^ index .+ \n
shard .+ \n
prirep .+ \n
state .+ \n
docs .+ \n
store .+ \n
ip .+ \n
id .+ \n
node .+ \n
sync_id .+ \n
unassigned.reason .+ \n
unassigned.at .+ \n
unassigned.for .+ \n
unassigned.details .+ \n
recoverysource.type .+ \n
completion.size .+ \n
fielddata.memory_size .+ \n
fielddata.evictions .+ \n
query_cache.memory_size .+ \n
query_cache.evictions .+ \n
flush.total .+ \n
flush.total_time .+ \n
get.current .+ \n
get.time .+ \n
get.total .+ \n
get.exists_time .+ \n
get.exists_total .+ \n
get.missing_time .+ \n
get.missing_total .+ \n
indexing.delete_current .+ \n
indexing.delete_time .+ \n
indexing.delete_total .+ \n
indexing.index_current .+ \n
indexing.index_time .+ \n
indexing.index_total .+ \n
indexing.index_failed .+ \n
merges.current .+ \n
merges.current_docs .+ \n
merges.current_size .+ \n
merges.total .+ \n
merges.total_docs .+ \n
merges.total_size .+ \n
merges.total_time .+ \n
refresh.total .+ \n
refresh.time .+ \n
refresh.external_total .+ \n
refresh.external_time .+ \n
refresh.listeners .+ \n
search.fetch_current .+ \n
search.fetch_time .+ \n
search.fetch_total .+ \n
search.open_contexts .+ \n
search.query_current .+ \n
search.query_time .+ \n
search.query_total .+ \n
search.concurrent_query_current .+ \n
search.concurrent_query_time .+ \n
search.concurrent_query_total .+ \n
search.scroll_current .+ \n
search.scroll_time .+ \n
search.scroll_total .+ \n
search.point_in_time_current .+ \n
search.point_in_time_time .+ \n
search.point_in_time_total .+ \n
segments.count .+ \n
segments.memory .+ \n
segments.index_writer_memory .+ \n
segments.version_map_memory .+ \n
segments.fixed_bitset_memory .+ \n
seq_no.max .+ \n
seq_no.local_checkpoint .+ \n
seq_no.global_checkpoint .+ \n
warmer.current .+ \n
warmer.total .+ \n
warmer.total_time .+ \n
path.data .+ \n
path.state .+ \n
$/
---
"Help between 2.4.0 - 2.99.99":
- skip:
version: " - 2.3.99 , 3.0.0 - "
version: " - 2.3.99"
reason: point in time stats were added in 2.4.0
features: node_selector
- do:
cat.shards:
help: true
node_selector:
version: "2.4.0 - 2.99.99"
version: "2.4.0 - "

- match:
$body: |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.opensearch.search.stats;

import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse;
import org.opensearch.action.admin.indices.stats.IndexStats;
import org.opensearch.action.admin.indices.stats.IndicesStatsRequestBuilder;
import org.opensearch.action.admin.indices.stats.IndicesStatsResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -21,22 +23,26 @@
import org.opensearch.script.MockScriptPlugin;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptType;
import org.opensearch.search.SearchService;
import org.opensearch.test.InternalSettingsPlugin;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Map;
import java.util.function.Function;

import static org.opensearch.index.query.QueryBuilders.scriptQuery;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.lessThan;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.SUITE, numDataNodes = 2, numClientNodes = 0)
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 1, numClientNodes = 0, supportsDedicatedMasters = false)
public class ConcurrentSearchStatsIT extends OpenSearchIntegTestCase {

private final int SEGMENT_SLICE_COUNT = 4;

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(ScriptedDelayedPlugin.class, InternalSettingsPlugin.class);
Expand All @@ -49,6 +55,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
.put(super.nodeSettings(nodeOrdinal))
.put(IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), "1ms")
.put(IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.getKey(), true)
.put(SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_KEY, SEGMENT_SLICE_COUNT)
.build();
}

Expand All @@ -68,17 +75,19 @@ protected Settings featureFlagSettings() {
}

public void testConcurrentQueryCount() throws Exception {
String INDEX_1 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
String INDEX_2 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
int NUM_SHARDS = randomIntBetween(1, 5);
createIndex(
"test1",
INDEX_1,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
createIndex(
"test2",
INDEX_2,
Settings.builder()
.put(indexSettings())
.put("search.concurrent_segment_search.enabled", false)
Expand All @@ -92,24 +101,24 @@ public void testConcurrentQueryCount() throws Exception {
indexRandom(
false,
true,
client().prepareIndex("test1").setId("1").setSource("foo", "bar"),
client().prepareIndex("test1").setId("2").setSource("foo", "baz"),
client().prepareIndex("test2").setId("1").setSource("foo", "bar"),
client().prepareIndex("test2").setId("2").setSource("foo", "baz")
client().prepareIndex(INDEX_1).setId("1").setSource("foo", "bar"),
client().prepareIndex(INDEX_1).setId("2").setSource("foo", "baz"),
client().prepareIndex(INDEX_2).setId("1").setSource("foo", "bar"),
client().prepareIndex(INDEX_2).setId("2").setSource("foo", "baz")
);

refresh();

// Search with custom plugin to ensure that queryTime is significant
client().prepareSearch("_all")
client().prepareSearch(INDEX_1, INDEX_2)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();
client().prepareSearch("test1")
client().prepareSearch(INDEX_1)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();
client().prepareSearch("test2")
client().prepareSearch(INDEX_2)
.setQuery(scriptQuery(new Script(ScriptType.INLINE, "mockscript", ScriptedDelayedPlugin.SCRIPT_NAME, Collections.emptyMap())))
.execute()
.actionGet();
Expand All @@ -127,6 +136,177 @@ public void testConcurrentQueryCount() throws Exception {
);
}

/**
* Test average concurrency is correctly calculated across indices for the same node
*/
public void testAvgConcurrencyNodeLevel() throws InterruptedException {
int NUM_SHARDS = 1;
String INDEX_1 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
String INDEX_2 = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);

// Create index test1 with 4 segments
createIndex(
INDEX_1,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
for (int i = 0; i < 4; i++) {
client().prepareIndex(INDEX_1).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}

client().prepareSearch(INDEX_1).execute().actionGet();
NodesStatsResponse nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
double expectedConcurrency = SEGMENT_SLICE_COUNT;
assertEquals(
SEGMENT_SLICE_COUNT,
nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(),
0
);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch(INDEX_1).execute().actionGet();

nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1) / 2.0;
assertEquals(
expectedConcurrency,
nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(),
0
);

// Create second index test2 with 4 segments
createIndex(
INDEX_2,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
for (int i = 0; i < 4; i++) {
client().prepareIndex(INDEX_2).setId(Integer.toString(i)).setSource("field", "value" + i).get();
refresh();
}

client().prepareSearch(INDEX_2).execute().actionGet();
nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT) / 3.0;
assertEquals(
expectedConcurrency,
nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(),
0
);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch(INDEX_2).execute().actionGet();
nodesStatsResponse = client().admin().cluster().prepareNodesStats().execute().actionGet();

assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
expectedConcurrency = (SEGMENT_SLICE_COUNT + 1 + SEGMENT_SLICE_COUNT + 1) / 4.0;
assertEquals(
expectedConcurrency,
nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(),
0
);

// Check that non-concurrent search requests do not affect the average concurrency
client().admin()
.indices()
.prepareUpdateSettings(INDEX_1)
.setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false))
.execute()
.actionGet();
client().admin()
.indices()
.prepareUpdateSettings(INDEX_2)
.setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false))
.execute()
.actionGet();
client().prepareSearch(INDEX_1).execute().actionGet();
client().prepareSearch(INDEX_2).execute().actionGet();
assertEquals(1, nodesStatsResponse.getNodes().size(), 0);
assertEquals(
expectedConcurrency,
nodesStatsResponse.getNodes().get(0).getIndices().getSearch().getTotal().getConcurrentAvgSliceCount(),
0
);
}

/**
* Test average concurrency is correctly calculated across shard for the same index
*/
public void testAvgConcurrencyIndexLevel() throws InterruptedException {
int NUM_SHARDS = 2;
String INDEX = "test-" + randomAlphaOfLength(5).toLowerCase(Locale.ROOT);
createIndex(
INDEX,
Settings.builder()
.put(indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, NUM_SHARDS)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.build()
);
ensureGreen();
// Create 4 segments on each shard
for (int i = 0; i < 4; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("0").get();
refresh();
}
for (int i = 4; i < 8; i++) {
client().prepareIndex(INDEX).setId(Integer.toString(i)).setSource("field", "value" + i).setRouting("1").get();
refresh();
}
client().prepareSearch(INDEX).execute().actionGet();
IndicesStatsResponse indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet();

IndexStats stats = indicesStatsResponse.getIndices().get(INDEX);
assertNotNull(stats);
double expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS) / (double) NUM_SHARDS;
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);

forceMerge();
// Sleep to make sure force merge completes
Thread.sleep(1000);
client().prepareSearch(INDEX).execute().actionGet();

indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet();
stats = indicesStatsResponse.getIndices().get(INDEX);
assertNotNull(stats);
expectedConcurrency = (SEGMENT_SLICE_COUNT * NUM_SHARDS + 1 * NUM_SHARDS) / (NUM_SHARDS * 2.0);
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);

// Check that non-concurrent search requests do not affect the average concurrency
client().admin()
.indices()
.prepareUpdateSettings(INDEX)
.setSettings(Settings.builder().put("search.concurrent_segment_search.enabled", false))
.execute()
.actionGet();

client().prepareSearch(INDEX).execute().actionGet();

indicesStatsResponse = client().admin().indices().prepareStats().execute().actionGet();
stats = indicesStatsResponse.getIndices().get(INDEX);
assertNotNull(stats);
assertEquals(expectedConcurrency, stats.getTotal().getSearch().getTotal().getConcurrentAvgSliceCount(), 0);
}

public static class ScriptedDelayedPlugin extends MockScriptPlugin {
static final String SCRIPT_NAME = "search_timeout";

Expand Down

0 comments on commit 079c3e9

Please sign in to comment.