Skip to content

Commit

Permalink
Use more robust assertion in SimpleThreadPoolIT.testThreadPoolMetrics (
Browse files Browse the repository at this point in the history
…elastic#106624)

Assert using greaterThanOrEqualTo to allow for additional scheduled background threads to 
appear in collected measurements after the thread pool stats have already been pulled, 
e.g. this could be the case for the cluster coordination thread pool.
  • Loading branch information
mosche committed Mar 28, 2024
1 parent aeeb597 commit 54ae1e5
Showing 1 changed file with 9 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import static org.elasticsearch.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.in;
import static org.hamcrest.Matchers.matchesRegex;
Expand Down Expand Up @@ -117,7 +118,6 @@ public void testThreadNames() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/104652")
public void testThreadPoolMetrics() throws Exception {
internalCluster().startNode();

Expand Down Expand Up @@ -148,15 +148,17 @@ public void testThreadPoolMetrics() throws Exception {
assertNoFailures(prepareSearch("idx").setQuery(QueryBuilders.termQuery("str_value", "s" + i)));
assertNoFailures(prepareSearch("idx").setQuery(QueryBuilders.termQuery("l_value", i)));
}

final var tp = internalCluster().getInstance(ThreadPool.class, dataNodeName);
final var tps = new ThreadPoolStats[1];
// wait for all threads to complete so that we get deterministic results
waitUntil(() -> tp.stats().stats().stream().allMatch(s -> s.active() == 0));
ThreadPoolStats tps = tp.stats();
waitUntil(() -> (tps[0] = tp.stats()).stats().stream().allMatch(s -> s.active() == 0));

plugin.collect();
ArrayList<String> registeredMetrics = plugin.getRegisteredMetrics(InstrumentType.LONG_GAUGE);
registeredMetrics.addAll(plugin.getRegisteredMetrics(InstrumentType.LONG_ASYNC_COUNTER));

tps.forEach(stats -> {
tps[0].forEach(stats -> {
Map<String, Long> threadPoolStats = List.of(
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_COMPLETED, stats.completed()),
Map.entry(ThreadPool.THREAD_POOL_METRIC_NAME_ACTIVE, (long) stats.active()),
Expand All @@ -182,7 +184,9 @@ public void testThreadPoolMetrics() throws Exception {
logger.info("Stats of `{}`: {}", stats.name(), threadPoolStats);
logger.info("Measurements of `{}`: {}", stats.name(), measurements);

threadPoolStats.forEach((metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(equalTo(value)))));
threadPoolStats.forEach(
(metric, value) -> assertThat(measurements, hasEntry(equalTo(metric), contains(greaterThanOrEqualTo(value))))
);
});
}

Expand Down

0 comments on commit 54ae1e5

Please sign in to comment.