diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index aac65908..cc71f002 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -15,10 +15,12 @@ package com.amazon.opendistroforelasticsearch.ad; +import static org.apache.http.entity.ContentType.APPLICATION_JSON; import static org.elasticsearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.elasticsearch.test.ESTestCase.randomAlphaOfLength; +import static org.elasticsearch.test.ESTestCase.randomBoolean; import static org.elasticsearch.test.ESTestCase.randomDouble; import static org.elasticsearch.test.ESTestCase.randomInt; import static org.elasticsearch.test.ESTestCase.randomIntBetween; @@ -44,7 +46,7 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; -import org.apache.http.entity.ContentType; +import org.apache.http.entity.StringEntity; import org.apache.http.nio.entity.NStringEntity; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -53,11 +55,13 @@ import org.elasticsearch.Version; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse; +import org.elasticsearch.action.admin.indices.create.CreateIndexRequest; import org.elasticsearch.action.admin.indices.create.CreateIndexResponse; import org.elasticsearch.action.admin.indices.mapping.get.GetFieldMappingsResponse.FieldMappingMetadata; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.client.AdminClient; import org.elasticsearch.client.Client; import org.elasticsearch.client.Request; import org.elasticsearch.client.RequestOptions; @@ -109,7 +113,6 @@ import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; import com.amazon.opendistroforelasticsearch.ad.constant.CommonValue; - import com.amazon.opendistroforelasticsearch.ad.feature.Features; import com.amazon.opendistroforelasticsearch.ad.ml.ThresholdingResult; import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; @@ -144,7 +147,7 @@ public static Response makeRequest( String jsonEntity, List
headers ) throws IOException { - HttpEntity httpEntity = Strings.isBlank(jsonEntity) ? null : new NStringEntity(jsonEntity, ContentType.APPLICATION_JSON); + HttpEntity httpEntity = Strings.isBlank(jsonEntity) ? null : new NStringEntity(jsonEntity, APPLICATION_JSON); return makeRequest(client, method, endpoint, params, httpEntity, headers); } @@ -220,35 +223,12 @@ public static AnomalyDetector randomAnomalyDetector(Map uiMetada public static AnomalyDetector randomAnomalyDetector(List features, Map uiMetadata, Instant lastUpdateTime) throws IOException { - return randomAnomalyDetector(features, uiMetadata, lastUpdateTime, null, null); - } - - public static AnomalyDetector randomAnomalyDetector( - List features, - Map uiMetadata, - Instant lastUpdateTime, - String detectorType, - DetectionDateRange dateRange - ) throws IOException { - return randomAnomalyDetector(features, uiMetadata, lastUpdateTime, detectorType, dateRange, true); - } - - public static AnomalyDetector randomAnomalyDetector( - List features, - Map uiMetadata, - Instant lastUpdateTime, - String detectorType, - DetectionDateRange dateRange, - boolean withUser - ) throws IOException { return randomAnomalyDetector( ImmutableList.of(randomAlphaOfLength(10).toLowerCase()), features, uiMetadata, lastUpdateTime, - detectorType, - dateRange, - withUser + randomBoolean() ); } @@ -257,8 +237,6 @@ public static AnomalyDetector randomAnomalyDetector( List features, Map uiMetadata, Instant lastUpdateTime, - String detectorType, - DetectionDateRange dateRange, boolean withUser ) throws IOException { User user = withUser ? randomUser() : null; @@ -685,6 +663,11 @@ public static void createIndex(RestClient client, String indexName, HttpEntity d ); } + public static CreateIndexResponse createIndex(AdminClient adminClient, String indexName, String indexMapping) { + CreateIndexRequest request = new CreateIndexRequest(indexName).mapping(AnomalyDetector.TYPE, indexMapping, XContentType.JSON); + return adminClient.indices().create(request).actionGet(5_000); + } + public static GetResponse createGetResponse(ToXContentObject o, String id, String indexName) throws IOException { XContentBuilder content = o.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); return new GetResponse( diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java deleted file mode 100644 index b5ba3681..00000000 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ /dev/null @@ -1,381 +0,0 @@ -/* - * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"). - * You may not use this file except in compliance with the License. - * A copy of the License is located at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * or in the "license" file accompanying this file. This file is distributed - * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either - * express or implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package com.amazon.opendistroforelasticsearch.ad.transport; - -import static com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages.DETECTOR_IS_RUNNING; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; -import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.PROFILE; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.START_JOB; -import static com.amazon.opendistroforelasticsearch.ad.util.RestHandlerUtils.STOP_JOB; -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM; -import static org.elasticsearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; - -import java.io.IOException; -import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.List; -import java.util.Set; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.elasticsearch.ElasticsearchStatusException; -import org.elasticsearch.action.ActionListener; -import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; -import org.elasticsearch.common.lucene.uid.Versions; -import org.elasticsearch.common.settings.Settings; -import org.elasticsearch.index.IndexNotFoundException; -import org.elasticsearch.test.ESIntegTestCase; -import org.junit.After; -import org.junit.Before; - -import com.amazon.opendistroforelasticsearch.ad.HistoricalDetectorIntegTestCase; -import com.amazon.opendistroforelasticsearch.ad.TestHelpers; -import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; -import com.amazon.opendistroforelasticsearch.ad.mock.transport.MockAnomalyDetectorJobAction; -import com.amazon.opendistroforelasticsearch.ad.model.ADTask; -import com.amazon.opendistroforelasticsearch.ad.model.ADTaskProfile; -import com.amazon.opendistroforelasticsearch.ad.model.ADTaskState; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; -import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; -import com.amazon.opendistroforelasticsearch.ad.model.DetectionDateRange; -import com.amazon.opendistroforelasticsearch.ad.stats.StatNames; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; - -@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST, numDataNodes = 2) -public class AnomalyDetectorJobTransportActionTests extends HistoricalDetectorIntegTestCase { - private Instant startTime; - private Instant endTime; - private String type = "error"; - private int maxOldAdTaskDocsPerDetector = 2; - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - startTime = Instant.now().minus(10, ChronoUnit.DAYS); - endTime = Instant.now(); - ingestTestData(testIndex, startTime, detectionIntervalInMinutes, type, 2000); - createDetectorIndex(); - } - - @Override - protected Settings nodeSettings(int nodeOrdinal) { - return Settings - .builder() - .put(super.nodeSettings(nodeOrdinal)) - .put(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 1) - .put(MAX_BATCH_TASK_PER_NODE.getKey(), 1) - .put(MAX_OLD_AD_TASK_DOCS_PER_DETECTOR.getKey(), maxOldAdTaskDocsPerDetector) - .build(); - } - - public void testDetectorIndexNotFound() { - deleteDetectorIndex(); - String detectorId = randomAlphaOfLength(5); - AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId); - IndexNotFoundException exception = expectThrows( - IndexNotFoundException.class, - () -> client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(3000) - ); - assertTrue(exception.getMessage().contains("no such index [.opendistro-anomaly-detectors]")); - } - - public void testDetectorNotFound() { - String detectorId = randomAlphaOfLength(5); - AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId); - ElasticsearchStatusException exception = expectThrows( - ElasticsearchStatusException.class, - () -> client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000) - ); - assertTrue(exception.getMessage().contains("AnomalyDetector is not found")); - } - - public void testValidHistoricalDetector() throws IOException, InterruptedException { - ADTask adTask = startHistoricalDetector(startTime, endTime); - Thread.sleep(10000); - ADTask finishedTask = getADTask(adTask.getTaskId()); - assertEquals(ADTaskState.FINISHED.name(), finishedTask.getState()); - } - - public void testStartHistoricalDetectorWithUser() throws IOException { - DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); - AnomalyDetector detector = TestHelpers - .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = new AnomalyDetectorJobRequest( - detectorId, - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - START_JOB - ); - Client nodeClient = getDataNodeClient(); - if (nodeClient != null) { - AnomalyDetectorJobResponse response = nodeClient.execute(MockAnomalyDetectorJobAction.INSTANCE, request).actionGet(100000); - ADTask adTask = getADTask(response.getId()); - assertNotNull(adTask.getStartedBy()); - assertNotNull(adTask.getUser()); - } - } - - public void testRunMultipleTasksForHistoricalDetector() throws IOException, InterruptedException { - DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); - AnomalyDetector detector = TestHelpers - .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - assertNotNull(response.getId()); - ElasticsearchStatusException exception = expectThrows( - ElasticsearchStatusException.class, - () -> client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000) - ); - assertTrue(exception.getMessage().contains(DETECTOR_IS_RUNNING)); - assertEquals(DETECTOR_IS_RUNNING, exception.getMessage()); - Thread.sleep(10000); - List adTasks = searchADTasks(detectorId, null, 100); - assertEquals(1, adTasks.size()); - assertEquals(ADTaskState.FINISHED.name(), adTasks.get(0).getState()); - } - - public void testRaceConditionByStartingMultipleTasks() throws IOException, InterruptedException { - DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); - AnomalyDetector detector = TestHelpers - .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = new AnomalyDetectorJobRequest( - detectorId, - UNASSIGNED_SEQ_NO, - UNASSIGNED_PRIMARY_TERM, - START_JOB - ); - client().execute(AnomalyDetectorJobAction.INSTANCE, request); - client().execute(AnomalyDetectorJobAction.INSTANCE, request); - - Thread.sleep(5000); - List adTasks = searchADTasks(detectorId, null, 100); - - assertEquals(1, adTasks.size()); - assertTrue(adTasks.get(0).getLatest()); - assertNotEquals(ADTaskState.FAILED.name(), adTasks.get(0).getState()); - } - - public void testCleanOldTaskDocs() throws InterruptedException, IOException { - DetectionDateRange dateRange = new DetectionDateRange(startTime, endTime); - AnomalyDetector detector = TestHelpers - .randomDetector(dateRange, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - - createDetectionStateIndex(); - List states = ImmutableList.of(ADTaskState.FAILED, ADTaskState.FINISHED, ADTaskState.STOPPED); - for (ADTaskState state : states) { - ADTask task = randomADTask(randomAlphaOfLength(5), detector, detectorId, state); - createADTask(task); - } - long count = countDocs(CommonName.DETECTION_STATE_INDEX); - assertEquals(states.size(), count); - - AnomalyDetectorJobRequest request = new AnomalyDetectorJobRequest(detectorId, randomLong(), randomLong(), START_JOB); - - AtomicReference response = new AtomicReference<>(); - CountDownLatch latch = new CountDownLatch(1); - client().execute(AnomalyDetectorJobAction.INSTANCE, request, ActionListener.wrap(r -> { - latch.countDown(); - response.set(r); - }, e -> { latch.countDown(); })); - latch.await(); - Thread.sleep(10000); - count = countDocs(CommonName.DETECTION_STATE_INDEX); - // we have one latest task, so total count should add 1 - assertEquals(maxOldAdTaskDocsPerDetector + 1, count); - } - - @After - @Override - public void tearDown() throws Exception { - super.tearDown(); - // delete index will clear search context, this can avoid in-flight contexts error - deleteIndexIfExists(AnomalyDetector.ANOMALY_DETECTORS_INDEX); - deleteIndexIfExists(CommonName.DETECTION_STATE_INDEX); - } - - public void testStartRealtimeDetector() throws IOException { - String detectorId = startRealtimeDetector(); - GetResponse doc = getDoc(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId); - AnomalyDetectorJob job = toADJob(doc); - assertTrue(job.isEnabled()); - assertEquals(detectorId, job.getName()); - } - - private String startRealtimeDetector() throws IOException { - AnomalyDetector detector = TestHelpers - .randomDetector(null, ImmutableList.of(maxValueFeature()), testIndex, detectionIntervalInMinutes, timeField); - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId); - AnomalyDetectorJobResponse response = client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - assertEquals(detectorId, response.getId()); - return response.getId(); - } - - public void testRealtimeDetectorWithoutFeature() throws IOException { - AnomalyDetector detector = TestHelpers.randomDetector(null, ImmutableList.of(), testIndex, detectionIntervalInMinutes, timeField); - testInvalidDetector(detector, "Can't start detector job as no features configured"); - } - - public void testHistoricalDetectorWithoutFeature() throws IOException { - AnomalyDetector detector = TestHelpers - .randomDetector( - new DetectionDateRange(startTime, endTime), - ImmutableList.of(), - testIndex, - detectionIntervalInMinutes, - timeField - ); - testInvalidDetector(detector, "Can't start detector job as no features configured"); - } - - public void testRealtimeDetectorWithoutEnabledFeature() throws IOException { - AnomalyDetector detector = TestHelpers - .randomDetector(null, ImmutableList.of(TestHelpers.randomFeature(false)), testIndex, detectionIntervalInMinutes, timeField); - testInvalidDetector(detector, "Can't start detector job as no enabled features configured"); - } - - public void testHistoricalDetectorWithoutEnabledFeature() throws IOException { - AnomalyDetector detector = TestHelpers - .randomDetector( - new DetectionDateRange(startTime, endTime), - ImmutableList.of(TestHelpers.randomFeature(false)), - testIndex, - detectionIntervalInMinutes, - timeField - ); - testInvalidDetector(detector, "Can't start detector job as no enabled features configured"); - } - - private void testInvalidDetector(AnomalyDetector detector, String error) throws IOException { - String detectorId = createDetector(detector); - AnomalyDetectorJobRequest request = startDetectorJobRequest(detectorId); - ElasticsearchStatusException exception = expectThrows( - ElasticsearchStatusException.class, - () -> client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000) - ); - assertEquals(error, exception.getMessage()); - } - - private AnomalyDetectorJobRequest startDetectorJobRequest(String detectorId) { - return new AnomalyDetectorJobRequest(detectorId, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, START_JOB); - } - - private AnomalyDetectorJobRequest stopDetectorJobRequest(String detectorId) { - return new AnomalyDetectorJobRequest(detectorId, UNASSIGNED_SEQ_NO, UNASSIGNED_PRIMARY_TERM, STOP_JOB); - } - - public void testStopRealtimeDetector() throws IOException { - String detectorId = startRealtimeDetector(); - AnomalyDetectorJobRequest request = stopDetectorJobRequest(detectorId); - client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - GetResponse doc = getDoc(AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX, detectorId); - AnomalyDetectorJob job = toADJob(doc); - assertFalse(job.isEnabled()); - assertEquals(detectorId, job.getName()); - } - - public void testStopHistoricalDetector() throws IOException, InterruptedException { - updateTransientSettings(ImmutableMap.of(BATCH_TASK_PIECE_INTERVAL_SECONDS.getKey(), 5)); - ADTask adTask = startHistoricalDetector(startTime, endTime); - assertEquals(ADTaskState.INIT.name(), adTask.getState()); - assertNull(adTask.getStartedBy()); - assertNull(adTask.getUser()); - AnomalyDetectorJobRequest request = stopDetectorJobRequest(adTask.getDetectorId()); - client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000); - waitUntil(() -> { - try { - ADTask task = getADTask(adTask.getTaskId()); - return !TestHelpers.historicalDetectorRunningStats.contains(task.getState()); - } catch (IOException e) { - return false; - } - }, 20, TimeUnit.SECONDS); - ADTask stoppedTask = getADTask(adTask.getTaskId()); - assertEquals(ADTaskState.STOPPED.name(), stoppedTask.getState()); - assertEquals(0, getExecutingADTask()); - } - - public void testProfileHistoricalDetector() throws IOException, InterruptedException { - ADTask adTask = startHistoricalDetector(startTime, endTime); - GetAnomalyDetectorRequest request = taskProfileRequest(adTask.getDetectorId()); - GetAnomalyDetectorResponse response = client().execute(GetAnomalyDetectorAction.INSTANCE, request).actionGet(10000); - assertNotNull(response.getDetectorProfile().getAdTaskProfile()); - - ADTask finishedTask = getADTask(adTask.getTaskId()); - int i = 0; - while (TestHelpers.historicalDetectorRunningStats.contains(finishedTask.getState()) && i < 10) { - finishedTask = getADTask(adTask.getTaskId()); - Thread.sleep(2000); - i++; - } - assertEquals(ADTaskState.FINISHED.name(), finishedTask.getState()); - - response = client().execute(GetAnomalyDetectorAction.INSTANCE, request).actionGet(10000); - assertNull(response.getDetectorProfile().getAdTaskProfile().getNodeId()); - ADTask profileAdTask = response.getDetectorProfile().getAdTaskProfile().getAdTask(); - assertEquals(finishedTask.getTaskId(), profileAdTask.getTaskId()); - assertEquals(finishedTask.getDetectorId(), profileAdTask.getDetectorId()); - assertEquals(finishedTask.getDetector(), profileAdTask.getDetector()); - assertEquals(finishedTask.getState(), profileAdTask.getState()); - } - - public void testProfileWithMultipleRunningTask() throws IOException { - ADTask adTask1 = startHistoricalDetector(startTime, endTime); - ADTask adTask2 = startHistoricalDetector(startTime, endTime); - - GetAnomalyDetectorRequest request1 = taskProfileRequest(adTask1.getDetectorId()); - GetAnomalyDetectorRequest request2 = taskProfileRequest(adTask2.getDetectorId()); - GetAnomalyDetectorResponse response1 = client().execute(GetAnomalyDetectorAction.INSTANCE, request1).actionGet(10000); - GetAnomalyDetectorResponse response2 = client().execute(GetAnomalyDetectorAction.INSTANCE, request2).actionGet(10000); - ADTaskProfile taskProfile1 = response1.getDetectorProfile().getAdTaskProfile(); - ADTaskProfile taskProfile2 = response2.getDetectorProfile().getAdTaskProfile(); - assertNotNull(taskProfile1.getNodeId()); - assertNotNull(taskProfile2.getNodeId()); - assertNotEquals(taskProfile1.getNodeId(), taskProfile2.getNodeId()); - } - - private GetAnomalyDetectorRequest taskProfileRequest(String detectorId) throws IOException { - return new GetAnomalyDetectorRequest(detectorId, Versions.MATCH_ANY, false, false, "", PROFILE, true, null); - } - - private long getExecutingADTask() { - ADStatsRequest adStatsRequest = new ADStatsRequest(getDataNodesArray()); - Set validStats = ImmutableSet.of(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName()); - adStatsRequest.addAll(validStats); - StatsAnomalyDetectorResponse statsResponse = client().execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest).actionGet(5000); - AtomicLong totalExecutingTask = new AtomicLong(0); - statsResponse - .getAdStatsResponse() - .getADStatsNodesResponse() - .getNodes() - .forEach( - node -> { totalExecutingTask.getAndAdd((Long) node.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())); } - ); - return totalExecutingTask.get(); - } -}