diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 8d31aa227..27a653d5f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -47,6 +47,7 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.aggregations.metrics.CardinalityAggregationBuilder import org.opensearch.search.aggregations.support.MultiTermsValuesSourceConfig import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase import java.net.URLEncoder import java.time.Instant import java.time.ZonedDateTime @@ -55,6 +56,7 @@ import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.DAYS import java.time.temporal.ChronoUnit.MILLIS import java.time.temporal.ChronoUnit.MINUTES +import java.util.concurrent.TimeUnit class MonitorRunnerServiceIT : AlertingRestTestCase() { @@ -138,7 +140,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { verifyAlert(firstRunAlert, monitor) // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // see lastNotificationTime change. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) val secondRunAlert = searchAlerts(monitor).single() verifyAlert(secondRunAlert, monitor) @@ -265,7 +269,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let lastNotificationTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) val response = executeMonitor(monitor.id) val output = entityAsMap(response) @@ -765,7 +771,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { verifyAlert(activeAlert1.single(), monitor, ACTIVE) val actionResults1 = verifyActionExecutionResultInAlert(activeAlert1[0], mutableMapOf(Pair(actionThrottleEnabled.id, 0))) - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) updateMonitor(monitor.copy(triggers = listOf(trigger.copy(condition = NEVER_RUN)), id = monitor.id)) executeMonitor(monitor.id) val completedAlert = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN).single() @@ -1398,7 +1406,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let lastNotificationTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) // Check that the lastNotification time of the acknowledged Alert wasn't updated and the active Alert's was @@ -1418,7 +1428,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { ) // Execute Monitor and check that both Alerts were updated - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) executeMonitor(monitor.id) currentAlerts = searchAlerts(monitor, AlertIndices.ALL_ALERT_INDEX_PATTERN) val completedAlerts = currentAlerts.filter { it.state == COMPLETED } @@ -1940,7 +1952,9 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // Runner uses ThreadPool.CachedTimeThread thread which only updates once every 200 ms. Wait a bit to // let Action executionTime change. W/o this sleep the test can result in a false negative. - Thread.sleep(200) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 200, TimeUnit.MILLISECONDS) val monitorRunResultThrottled = entityAsMap(executeMonitor(monitor.id)) verifyActionThrottleResultsForBucketLevelMonitor( monitorRunResult = monitorRunResultThrottled, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index 3aedc420d..69a7e0363 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -140,7 +140,9 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor) // Allow for a rollover index. - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil (getAlertIndices().size >= 3) + }, 2, TimeUnit.SECONDS) assertTrue("Did not find 3 alert indices", getAlertIndices().size >= 3) } @@ -157,7 +159,9 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor.id) // Allow for a rollover index. - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil (getFindingIndices().size >= 2) + }, 2, TimeUnit.SECONDS) assertTrue("Did not find 2 alert indices", getFindingIndices().size >= 2) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt index 68abd0548..2c77fd480 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt @@ -16,6 +16,8 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.rest.RestStatus import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase +import java.util.concurrent.TimeUnit class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { @@ -69,8 +71,21 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { // the test execution by a lot (might have to wait for Job Scheduler plugin integration first) // Waiting a minute to ensure the Monitor ran again at least once before checking if the job is running // on time - Thread.sleep(60000) - verifyMonitorStats("/_plugins/_alerting") + var passed = false + OpenSearchTestCase.waitUntil({ + try { + // Run verifyMonitorStats until all assertion test passes + verifyMonitorStats("/_plugins/_alerting") + passed = true + return@waitUntil true + } catch (e: AssertionError) { + return@waitUntil false + } + }, 1, TimeUnit.MINUTES) + if (!passed) { + // if it hit the max time (1 minute), run verifyMonitorStats again to make sure all the tests pass + verifyMonitorStats("/_plugins/_alerting") + } } } break diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt index eccea22f5..25e8d319d 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/MonitorRestApiIT.kt @@ -811,7 +811,11 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Delete request not successful", RestStatus.OK, deleteResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alerts = searchAlerts(monitor) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) + return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1) + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -842,7 +846,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -870,7 +876,11 @@ class MonitorRestApiIT : AlertingRestTestCase() { assertEquals("Update request not successful", RestStatus.OK, updateResponse.restStatus()) // Wait 5 seconds for event to be processed and alerts moved - Thread.sleep(5000) + OpenSearchTestCase.waitUntil({ + val alerts = searchAlerts(monitor) + val historyAlerts = searchAlerts(monitor, AlertIndices.ALERT_HISTORY_WRITE_INDEX) + return@waitUntil (alerts.isEmpty() && historyAlerts.size == 1) + }, 5, TimeUnit.SECONDS) val alerts = searchAlerts(monitor) assertEquals("Active alert was not deleted", 0, alerts.size) @@ -956,10 +966,13 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test monitor stats when disabling and re-enabling scheduled jobs with existing monitor`() { // Enable Monitor jobs + enableScheduledJob() val monitorId = createMonitor(randomQueryLevelMonitor(enabled = true), refresh = true).id - if (isMultiNode) Thread.sleep(2000) + if (isMultiNode) OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.SECONDS) var alertingStats = getAlertingStats() assertAlertingStatsSweeperEnabled(alertingStats, true) assertEquals("Scheduled job index does not exist", true, alertingStats["scheduled_job_index_exists"]) @@ -992,7 +1005,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { enableScheduledJob() // Sleep briefly so sweep can reschedule the Monitor - Thread.sleep(2000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.SECONDS) alertingStats = getAlertingStats() assertAlertingStatsSweeperEnabled(alertingStats, true) @@ -1015,10 +1030,13 @@ class MonitorRestApiIT : AlertingRestTestCase() { fun `test monitor stats jobs`() { // Enable the Monitor plugin. + enableScheduledJob() createRandomMonitor(refresh = true) - if (isMultiNode) Thread.sleep(2000) + if (isMultiNode) OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.SECONDS) val responseMap = getAlertingStats() assertAlertingStatsSweeperEnabled(responseMap, true) assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"]) @@ -1051,7 +1069,9 @@ class MonitorRestApiIT : AlertingRestTestCase() { enableScheduledJob() createRandomMonitor(refresh = true) - if (isMultiNode) Thread.sleep(2000) + if (isMultiNode) OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.SECONDS) val responseMap = getAlertingStats("/jobs_info") assertAlertingStatsSweeperEnabled(responseMap, true) assertEquals("Scheduled job index does not exist", true, responseMap["scheduled_job_index_exists"]) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 8c073c4b6..cf48720af 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -35,12 +35,14 @@ import org.opensearch.index.query.QueryBuilders import org.opensearch.script.Script import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder +import org.opensearch.test.OpenSearchTestCase import org.opensearch.test.junit.annotations.TestLogging import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale import java.util.UUID +import java.util.concurrent.TimeUnit @TestLogging("level:DEBUG", reason = "Debug for tests.") @Suppress("UNCHECKED_CAST") @@ -1180,7 +1182,10 @@ class WorkflowRestApiIT : AlertingRestTestCase() { }""" indexDoc(index, "1", testDoc) - Thread.sleep(80000) + OpenSearchTestCase.waitUntil({ + val findings = searchFindings(monitor.copy(id = monitorResponse.id)) + return@waitUntil (findings.size == 1) + }, 80, TimeUnit.SECONDS) val findings = searchFindings(monitor.copy(id = monitorResponse.id)) assertEquals("Findings saved for test monitor", 1, findings.size) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt index f9c40e465..903eedb44 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/util/destinationmigration/DestinationMigrationUtilServiceIT.kt @@ -18,8 +18,10 @@ import org.opensearch.alerting.util.DestinationType import org.opensearch.client.ResponseException import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.core.rest.RestStatus +import org.opensearch.test.OpenSearchTestCase import java.time.Instant import java.util.UUID +import java.util.concurrent.TimeUnit class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { @@ -80,7 +82,9 @@ class DestinationMigrationUtilServiceIT : AlertingRestTestCase() { // Create cluster change event and wait for migration service to complete migrating data over client().updateSettings("indices.recovery.max_bytes_per_sec", "40mb") - Thread.sleep(120000) + OpenSearchTestCase.waitUntil({ + return@waitUntil false + }, 2, TimeUnit.MINUTES) for (id in ids) { val response = client().makeRequest(