diff --git a/.github/workflows/test_build_multi_platform.yml b/.github/workflows/test_build_multi_platform.yml index 0f5d2f764..801601832 100644 --- a/.github/workflows/test_build_multi_platform.yml +++ b/.github/workflows/test_build_multi_platform.yml @@ -16,7 +16,7 @@ jobs: Build-ad-windows: strategy: matrix: - java: [ 11, 17 ] + java: [ 11, 17, 21 ] name: Build and Test Anomaly Detection Plugin on Windows runs-on: windows-latest env: @@ -49,7 +49,7 @@ jobs: needs: Get-CI-Image-Tag strategy: matrix: - java: [11,17] + java: [11, 17, 21] fail-fast: false name: Build and Test Anomaly detection Plugin runs-on: ubuntu-latest @@ -89,7 +89,7 @@ jobs: Build-ad-macos: strategy: matrix: - java: [11,17,20] + java: [11,17,21] fail-fast: false name: Build and Test Anomaly detection Plugin diff --git a/.github/workflows/test_bwc.yml b/.github/workflows/test_bwc.yml index bc166633b..953f62009 100644 --- a/.github/workflows/test_bwc.yml +++ b/.github/workflows/test_bwc.yml @@ -17,7 +17,7 @@ jobs: needs: Get-CI-Image-Tag strategy: matrix: - java: [11,17] + java: [11,17,21] fail-fast: false name: Test Anomaly detection BWC diff --git a/.github/workflows/test_security.yml b/.github/workflows/test_security.yml index 5c1df108a..352d2568f 100644 --- a/.github/workflows/test_security.yml +++ b/.github/workflows/test_security.yml @@ -11,7 +11,7 @@ jobs: Build-ad: strategy: matrix: - java: [11,17] + java: [11,17,21] fail-fast: false name: Security test workflow for Anomaly Detection diff --git a/build.gradle b/build.gradle index 951d4af60..eb9aaee9e 100644 --- a/build.gradle +++ b/build.gradle @@ -61,11 +61,10 @@ buildscript { } plugins { - id 'nebula.ospackage' version "8.3.0" apply false - id "com.diffplug.gradle.spotless" version "3.26.1" + id 'com.netflix.nebula.ospackage' version "11.6.0" + id "com.diffplug.spotless" version "6.24.0" id 'java-library' - // Gradle 7.6 support was added in test-retry 1.4.0. - id 'org.gradle.test-retry' version '1.4.1' + id 'org.gradle.test-retry' version '1.5.7' } tasks.withType(JavaCompile) { @@ -119,11 +118,16 @@ allprojects { version = "${opensearch_build}" - plugins.withId('java') { - sourceCompatibility = targetCompatibility = JavaVersion.VERSION_11 + plugins.withId('jacoco') { + jacoco.toolVersion = '0.8.11' } } +java { + targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_11 +} + ext { projectSubstitutions = [:] licenseFile = rootProject.file('LICENSE.txt') @@ -146,10 +150,10 @@ configurations.all { force "org.apache.httpcomponents:httpcore:${versions.httpcore}" force "commons-codec:commons-codec:${versions.commonscodec}" - force "org.mockito:mockito-core:2.25.0" - force "org.objenesis:objenesis:3.0.1" - force "net.bytebuddy:byte-buddy:1.9.15" - force "net.bytebuddy:byte-buddy-agent:1.9.15" + force "org.mockito:mockito-core:5.9.0" + force "org.objenesis:objenesis:3.3" + force "net.bytebuddy:byte-buddy:1.14.9" + force "net.bytebuddy:byte-buddy-agent:1.14.9" force "com.google.code.gson:gson:2.8.9" force "junit:junit:4.13.2" } @@ -727,8 +731,8 @@ jacocoTestCoverageVerification { jacocoTestReport { reports { - xml.enabled = true - html.enabled = true + xml.required = true + html.required = true } } @@ -755,8 +759,8 @@ dependencies { implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0' // force Jackson version to avoid version conflict issue - implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1" - implementation "com.fasterxml.jackson.core:jackson-annotations:2.14.1" + implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1" + implementation "com.fasterxml.jackson.core:jackson-annotations:2.16.1" // used for serializing/deserializing rcf models. implementation group: 'io.protostuff', name: 'protostuff-core', version: '1.8.0' @@ -773,31 +777,21 @@ dependencies { } testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1' - testImplementation group: 'org.mockito', name: 'mockito-core', version: '2.25.0' - testImplementation group: 'org.powermock', name: 'powermock-api-mockito2', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-module-junit4-common', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-core', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-api-support', version: '2.0.2' - testImplementation group: 'org.powermock', name: 'powermock-reflect', version: '2.0.2' - testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.0.1' - testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.9.15' - testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.9.15' + testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0' + testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3' + testImplementation group: 'net.bytebuddy', name: 'byte-buddy', version: '1.14.9' + testImplementation group: 'net.bytebuddy', name: 'byte-buddy-agent', version: '1.14.9' testCompileOnly 'org.apiguardian:apiguardian-api:1.1.0' - testImplementation 'org.junit.jupiter:junit-jupiter-api:5.7.2' - testImplementation 'org.junit.jupiter:junit-jupiter-params:5.7.2' - testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.7.2' - testRuntimeOnly 'org.junit.vintage:junit-vintage-engine:5.7.2' + testImplementation 'org.junit.jupiter:junit-jupiter-api:5.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-params:5.8.2' + testImplementation 'org.junit.jupiter:junit-jupiter-engine:5.8.2' + testRuntimeOnly 'org.junit.vintage:junit-vintage-engine:5.8.2' testCompileOnly 'junit:junit:4.13.2' } compileJava.options.compilerArgs << "-Xlint:-deprecation,-rawtypes,-serial,-try,-unchecked" -test { - useJUnitPlatform() -} - -apply plugin: 'nebula.ospackage' +apply plugin: 'com.netflix.nebula.ospackage' // This is afterEvaluate because the bundlePlugin ZIP task is updated afterEvaluate and changes the ZIP name to match the plugin name afterEvaluate { @@ -838,9 +832,8 @@ afterEvaluate { task renameRpm(type: Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.rpm" - doLast { delete file("$buildDir/distributions/$archiveName") } + rename "$archiveFileName", "${packageName}-${archiveVersion}.rpm" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } @@ -851,9 +844,8 @@ afterEvaluate { task renameDeb(type: Copy) { from("$buildDir/distributions") into("$buildDir/distributions") - include archiveName - rename archiveName, "${packageName}-${version}.deb" - doLast { delete file("$buildDir/distributions/$archiveName") } + rename "$archiveFileName", "${packageName}-${archiveVersion}.deb" + doLast { delete file("$buildDir/distributions/$archiveFileName") } } } @@ -898,4 +890,13 @@ task updateVersion { // Include the required files that needs to be updated with new Version ant.replaceregexp(file:'build.gradle', match: '"opensearch.version", "\\d.*"', replace: '"opensearch.version", "' + newVersion.tokenize('-')[0] + '-SNAPSHOT"', flags:'g', byline:true) } -} \ No newline at end of file +} + +tasks.withType(AbstractPublishToMaven) { + def predicate = provider { + publication.name == "pluginZip" + } + onlyIf("Publishing only ZIP distributions") { + predicate.get() + } +} diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index 943f0cbfa..7f93135c4 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 508322917..3499ded5c 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-7.6.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 65dcd68d6..1aa94a426 100755 --- a/gradlew +++ b/gradlew @@ -83,10 +83,8 @@ done # This is normally unused # shellcheck disable=SC2034 APP_BASE_NAME=${0##*/} -APP_HOME=$( cd "${APP_HOME:-./}" && pwd -P ) || exit - -# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. -DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' +# Discard cd standard output in case $CDPATH is set (https://github.com/gradle/gradle/issues/25036) +APP_HOME=$( cd "${APP_HOME:-./}" > /dev/null && pwd -P ) || exit # Use the maximum available, or set MAX_FD != -1 to use that value. MAX_FD=maximum @@ -133,10 +131,13 @@ location of your Java installation." fi else JAVACMD=java - which java >/dev/null 2>&1 || die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. + if ! command -v java >/dev/null 2>&1 + then + die "ERROR: JAVA_HOME is not set and no 'java' command could be found in your PATH. Please set the JAVA_HOME variable in your environment to match the location of your Java installation." + fi fi # Increase the maximum file descriptors if we can. @@ -144,7 +145,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then case $MAX_FD in #( max*) # In POSIX sh, ulimit -H is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 MAX_FD=$( ulimit -H -n ) || warn "Could not query maximum file descriptor limit" esac @@ -152,7 +153,7 @@ if ! "$cygwin" && ! "$darwin" && ! "$nonstop" ; then '' | soft) :;; #( *) # In POSIX sh, ulimit -n is undefined. That's why the result is checked to see if it worked. - # shellcheck disable=SC3045 + # shellcheck disable=SC2039,SC3045 ulimit -n "$MAX_FD" || warn "Could not set maximum file descriptor limit to $MAX_FD" esac @@ -197,11 +198,15 @@ if "$cygwin" || "$msys" ; then done fi -# Collect all arguments for the java command; -# * $DEFAULT_JVM_OPTS, $JAVA_OPTS, and $GRADLE_OPTS can contain fragments of -# shell script including quotes and variable substitutions, so put them in -# double quotes to make sure that they get re-expanded; and -# * put everything else in single quotes, so that it's not re-expanded. + +# Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script. +DEFAULT_JVM_OPTS='"-Xmx64m" "-Xms64m"' + +# Collect all arguments for the java command: +# * DEFAULT_JVM_OPTS, JAVA_OPTS, JAVA_OPTS, and optsEnvironmentVar are not allowed to contain shell fragments, +# and any embedded shellness will be escaped. +# * For example: A user cannot expect ${Hostname} to be expanded, as it is an environment variable and will be +# treated as '${Hostname}' itself on the command line. set -- \ "-Dorg.gradle.appname=$APP_BASE_NAME" \ diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java index 2fe51f2d3..4e62548e5 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java @@ -304,38 +304,11 @@ private void runAnomalyDetectionJob( detectionStartTime.toEpochMilli(), executionStartTime.toEpochMilli() ); - client - .execute( - AnomalyResultAction.INSTANCE, - request, - ActionListener - .wrap( - response -> { - indexAnomalyResult( - jobParameter, - lockService, - lock, - detectionStartTime, - executionStartTime, - response, - recorder, - detector - ); - }, - exception -> { - handleAdException( - jobParameter, - lockService, - lock, - detectionStartTime, - executionStartTime, - exception, - recorder, - detector - ); - } - ) - ); + client.execute(AnomalyResultAction.INSTANCE, request, ActionListener.wrap(response -> { + indexAnomalyResult(jobParameter, lockService, lock, detectionStartTime, executionStartTime, response, recorder, detector); + }, exception -> { + handleAdException(jobParameter, lockService, lock, detectionStartTime, executionStartTime, exception, recorder, detector); + })); } catch (Exception e) { indexAnomalyResultException( jobParameter, @@ -669,11 +642,9 @@ private void releaseLock(AnomalyDetectorJob jobParameter, LockService lockServic lockService .release( lock, - ActionListener - .wrap( - released -> { log.info("Released lock for AD job {}", jobParameter.getName()); }, - exception -> { log.error("Failed to release lock for AD job: " + jobParameter.getName(), exception); } - ) + ActionListener.wrap(released -> { log.info("Released lock for AD job {}", jobParameter.getName()); }, exception -> { + log.error("Failed to release lock for AD job: " + jobParameter.getName(), exception); + }) ); } } diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java index 6828f479a..22d3e4850 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java @@ -83,11 +83,9 @@ public void executeDetector( listener.onResponse(Collections.emptyList()); return; } - ActionListener entityAnomalyResultListener = ActionListener - .wrap( - entityAnomalyResult -> { listener.onResponse(entityAnomalyResult.getAnomalyResults()); }, - e -> onFailure(e, listener, detector.getDetectorId()) - ); + ActionListener entityAnomalyResultListener = ActionListener.wrap(entityAnomalyResult -> { + listener.onResponse(entityAnomalyResult.getAnomalyResults()); + }, e -> onFailure(e, listener, detector.getDetectorId())); MultiResponsesDelegateActionListener multiEntitiesResponseListener = new MultiResponsesDelegateActionListener( entityAnomalyResultListener, diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 0a5d3abf7..b64fc6d0e 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -133,12 +133,9 @@ public PriorityCache( Duration inactiveEntityTtl = DateUtils.toDuration(checkpointTtl.get(settings)); this.inActiveEntities = createInactiveCache(inactiveEntityTtl, maxInactiveStates); - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - checkpointTtl, - it -> { this.inActiveEntities = createInactiveCache(DateUtils.toDuration(it), maxInactiveStates); } - ); + clusterService.getClusterSettings().addSettingsUpdateConsumer(checkpointTtl, it -> { + this.inActiveEntities = createInactiveCache(DateUtils.toDuration(it), maxInactiveStates); + }); this.threadPool = threadPool; this.random = new Random(42); @@ -163,19 +160,15 @@ public ModelState get(String modelId, AnomalyDetector detector) { // during maintenance period, stop putting new entries if (!maintenanceLock.isLocked() && modelState == null) { if (EnabledSetting.isDoorKeeperInCacheEnabled()) { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, - AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, - detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } + DoorKeeper doorKeeper = doorKeepers.computeIfAbsent(detectorId, id -> { + // reset every 60 intervals + return new DoorKeeper( + AnomalyDetectorSettings.DOOR_KEEPER_FOR_CACHE_MAX_INSERTION, + AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, + detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock ); + }); // first hit, ignore // since door keeper may get reset during maintenance, it is possible diff --git a/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java b/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java index 3db598c7a..60bb274ab 100644 --- a/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java +++ b/src/main/java/org/opensearch/ad/client/AnomalyDetectionNodeClient.java @@ -27,22 +27,16 @@ public AnomalyDetectionNodeClient(Client client) { @Override public void searchAnomalyDetectors(SearchRequest searchRequest, ActionListener listener) { - this.client - .execute( - SearchAnomalyDetectorAction.INSTANCE, - searchRequest, - ActionListener.wrap(searchResponse -> { listener.onResponse(searchResponse); }, listener::onFailure) - ); + this.client.execute(SearchAnomalyDetectorAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> { + listener.onResponse(searchResponse); + }, listener::onFailure)); } @Override public void searchAnomalyResults(SearchRequest searchRequest, ActionListener listener) { - this.client - .execute( - SearchAnomalyResultAction.INSTANCE, - searchRequest, - ActionListener.wrap(searchResponse -> { listener.onResponse(searchResponse); }, listener::onFailure) - ); + this.client.execute(SearchAnomalyResultAction.INSTANCE, searchRequest, ActionListener.wrap(searchResponse -> { + listener.onResponse(searchResponse); + }, listener::onFailure)); } @Override @@ -55,8 +49,9 @@ public void getDetectorProfile(GetAnomalyDetectorRequest profileRequest, ActionL private ActionListener getAnomalyDetectorResponseActionListener( ActionListener listener ) { - ActionListener internalListener = ActionListener - .wrap(getAnomalyDetectorResponse -> { listener.onResponse(getAnomalyDetectorResponse); }, listener::onFailure); + ActionListener internalListener = ActionListener.wrap(getAnomalyDetectorResponse -> { + listener.onResponse(getAnomalyDetectorResponse); + }, listener::onFailure); ActionListener actionListener = wrapActionListener(internalListener, actionResponse -> { GetAnomalyDetectorResponse response = GetAnomalyDetectorResponse.fromActionResponse(actionResponse); return response; diff --git a/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java b/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java index 91594cf0f..4f629c7bb 100644 --- a/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java +++ b/src/main/java/org/opensearch/ad/cluster/ADClusterEventListener.java @@ -72,19 +72,9 @@ public void clusterChanged(ClusterChangedEvent event) { if (delta.removed() || delta.added()) { LOG.info(NODE_CHANGED_MSG + ", node removed: {}, node added: {}", delta.removed(), delta.added()); hashRing.addNodeChangeEvent(); - hashRing - .buildCircles( - delta, - ActionListener - .runAfter( - ActionListener - .wrap( - hasRingBuildDone -> { LOG.info("Hash ring build result: {}", hasRingBuildDone); }, - e -> { LOG.error("Failed updating AD version hash ring", e); } - ), - () -> inProgress.release() - ) - ); + hashRing.buildCircles(delta, ActionListener.runAfter(ActionListener.wrap(hasRingBuildDone -> { + LOG.info("Hash ring build result: {}", hasRingBuildDone); + }, e -> { LOG.error("Failed updating AD version hash ring", e); }), () -> inProgress.release())); } else { inProgress.release(); } diff --git a/src/main/java/org/opensearch/ad/cluster/DailyCron.java b/src/main/java/org/opensearch/ad/cluster/DailyCron.java index a5e11f3e4..a7f380dbb 100644 --- a/src/main/java/org/opensearch/ad/cluster/DailyCron.java +++ b/src/main/java/org/opensearch/ad/cluster/DailyCron.java @@ -58,26 +58,17 @@ public void run() { ) ) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN); - clientUtil - .execute( - DeleteByQueryAction.INSTANCE, - deleteRequest, - ActionListener - .wrap( - response -> { - // if 0 docs get deleted, it means our query cannot find any matching doc - LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted()); - }, - exception -> { - if (exception instanceof IndexNotFoundException) { - LOG.info(CHECKPOINT_NOT_EXIST_MSG); - } else { - // Gonna eventually delete in maintenance window. - LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception); - } - } - ) - ); + clientUtil.execute(DeleteByQueryAction.INSTANCE, deleteRequest, ActionListener.wrap(response -> { + // if 0 docs get deleted, it means our query cannot find any matching doc + LOG.info("{} " + CHECKPOINT_DELETED_MSG, response.getDeleted()); + }, exception -> { + if (exception instanceof IndexNotFoundException) { + LOG.info(CHECKPOINT_NOT_EXIST_MSG); + } else { + // Gonna eventually delete in maintenance window. + LOG.error(CANNOT_DELETE_OLD_CHECKPOINT_MSG, exception); + } + })); } } diff --git a/src/main/java/org/opensearch/ad/cluster/HashRing.java b/src/main/java/org/opensearch/ad/cluster/HashRing.java index e268f4645..8d1813ea6 100644 --- a/src/main/java/org/opensearch/ad/cluster/HashRing.java +++ b/src/main/java/org/opensearch/ad/cluster/HashRing.java @@ -178,13 +178,9 @@ public void buildCirclesForRealtimeAD() { if (nodeChangeEvents.isEmpty()) { return; } - buildCircles( - ActionListener - .wrap( - r -> { LOG.debug("build circles on AD versions successfully"); }, - e -> { LOG.error("Failed to build circles on AD versions", e); } - ) - ); + buildCircles(ActionListener.wrap(r -> { LOG.debug("build circles on AD versions successfully"); }, e -> { + LOG.error("Failed to build circles on AD versions", e); + })); } /** diff --git a/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java b/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java index afac0f48f..0f56e0805 100644 --- a/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java +++ b/src/main/java/org/opensearch/ad/cluster/diskcleanup/ModelCheckpointIndexRetention.java @@ -66,12 +66,12 @@ public void run() { .lte(clock.millis() - defaultCheckpointTtl.toMillis()) .format(CommonName.EPOCH_MILLIS_FORMAT) ), - ActionListener - .wrap( - response -> { cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); }, - // The docs will be deleted in next scheduled windows. No need for retrying. - exception -> LOG.error("delete docs by query fails for checkpoint index", exception) - ) + ActionListener.wrap(response -> { + cleanupBasedOnShardSize(defaultCheckpointTtl.minusDays(1)); + }, + // The docs will be deleted in next scheduled windows. No need for retrying. + exception -> LOG.error("delete docs by query fails for checkpoint index", exception) + ) ); } diff --git a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java index 21f6d8e43..6218fa748 100644 --- a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java @@ -488,8 +488,9 @@ public void getEntityMinDataTime(AnomalyDetector detector, Entity entity, Action .trackTotalHits(false) .size(0); SearchRequest searchRequest = new SearchRequest().indices(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder); - final ActionListener searchResponseListener = ActionListener - .wrap(response -> { listener.onResponse(parseMinDataTime(response)); }, listener::onFailure); + final ActionListener searchResponseListener = ActionListener.wrap(response -> { + listener.onResponse(parseMinDataTime(response)); + }, listener::onFailure); // inject user role while searching. clientUtil .asyncRequestWithInjectedSecurity( @@ -545,11 +546,9 @@ public void getFeaturesForPeriodByBatch( logger.debug("Batch query for detector {}: {} ", detector.getDetectorId(), searchSourceBuilder); SearchRequest searchRequest = new SearchRequest(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder); - final ActionListener searchResponseListener = ActionListener - .wrap( - response -> { listener.onResponse(parseBucketAggregationResponse(response, detector.getEnabledFeatureIds())); }, - listener::onFailure - ); + final ActionListener searchResponseListener = ActionListener.wrap(response -> { + listener.onResponse(parseBucketAggregationResponse(response, detector.getEnabledFeatureIds())); + }, listener::onFailure); // inject user role while searching. clientUtil .asyncRequestWithInjectedSecurity( diff --git a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java index cea72c215..64b242834 100644 --- a/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java +++ b/src/main/java/org/opensearch/ad/indices/AnomalyDetectionIndices.java @@ -202,9 +202,9 @@ public AnomalyDetectionIndices( historyRolloverPeriod = it; rescheduleRollover(); }); - this.clusterService - .getClusterSettings() - .addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { historyRetentionPeriod = it; }); + this.clusterService.getClusterSettings().addSettingsUpdateConsumer(AD_RESULT_HISTORY_RETENTION_PERIOD, it -> { + historyRetentionPeriod = it; + }); this.clusterService.getClusterSettings().addSettingsUpdateConsumer(MAX_PRIMARY_SHARDS, it -> maxPrimaryShards = it); diff --git a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java index f85c8ce33..78c406032 100644 --- a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java +++ b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java @@ -621,14 +621,9 @@ private void deserializeTRCFModel( } String thresholdingModelId = SingleStreamModelIdMapper.getThresholdModelIdFromRCFModelId(rcfModelId); // query for threshold model and combinne rcf and threshold model into a ThresholdedRandomCutForest - getThresholdModel( - thresholdingModelId, - ActionListener - .wrap( - thresholdingModel -> { listener.onResponse(convertToTRCF(forest, thresholdingModel)); }, - listener::onFailure - ) - ); + getThresholdModel(thresholdingModelId, ActionListener.wrap(thresholdingModel -> { + listener.onResponse(convertToTRCF(forest, thresholdingModel)); + }, listener::onFailure)); } } catch (Exception e) { logger.error(new ParameterizedMessage("Unexpected error when deserializing [{}]", rcfModelId), e); @@ -645,12 +640,9 @@ private void deserializeTRCFModel( * @param listener Listener to return a pair of entity model and its last checkpoint time */ public void deserializeModelCheckpoint(String modelId, ActionListener>> listener) { - clientUtil - .asyncRequest( - new GetRequest(indexName, modelId), - client::get, - ActionListener.wrap(response -> { listener.onResponse(processGetResponse(response, modelId)); }, listener::onFailure) - ); + clientUtil.asyncRequest(new GetRequest(indexName, modelId), client::get, ActionListener.wrap(response -> { + listener.onResponse(processGetResponse(response, modelId)); + }, listener::onFailure)); } /** @@ -679,18 +671,14 @@ public void getTRCFModel(String modelId, ActionListenerasyncRequest( new GetRequest(indexName, modelId), client::get, - ActionListener - .wrap( - response -> deserializeTRCFModel(response, modelId, listener), - exception -> { - // expected exception, don't print stack trace - if (exception instanceof IndexNotFoundException) { - listener.onResponse(Optional.empty()); - } else { - listener.onFailure(exception); - } - } - ) + ActionListener.wrap(response -> deserializeTRCFModel(response, modelId, listener), exception -> { + // expected exception, don't print stack trace + if (exception instanceof IndexNotFoundException) { + listener.onResponse(Optional.empty()); + } else { + listener.onFailure(exception); + } + }) ); } @@ -715,16 +703,14 @@ public void getThresholdModel(String modelId, ActionListener { - // expected exception, don't print stack trace - if (exception instanceof IndexNotFoundException) { - listener.onResponse(Optional.empty()); - } else { - listener.onFailure(exception); - } + }, exception -> { + // expected exception, don't print stack trace + if (exception instanceof IndexNotFoundException) { + listener.onResponse(Optional.empty()); + } else { + listener.onFailure(exception); } - )); + })); } private Optional processThresholdModelCheckpoint(GetResponse response) { diff --git a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java index 491d86ecb..67b750833 100644 --- a/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java +++ b/src/main/java/org/opensearch/ad/ml/EntityColdStarter.java @@ -246,19 +246,15 @@ private void coldStart( boolean earlyExit = true; try { - DoorKeeper doorKeeper = doorKeepers - .computeIfAbsent( - detectorId, - id -> { - // reset every 60 intervals - return new DoorKeeper( - AnomalyDetectorSettings.DOOR_KEEPER_FOR_COLD_STARTER_MAX_INSERTION, - AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, - detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), - clock - ); - } + DoorKeeper doorKeeper = doorKeepers.computeIfAbsent(detectorId, id -> { + // reset every 60 intervals + return new DoorKeeper( + AnomalyDetectorSettings.DOOR_KEEPER_FOR_COLD_STARTER_MAX_INSERTION, + AnomalyDetectorSettings.DOOR_KEEPER_FAULSE_POSITIVE_RATE, + detector.getDetectionIntervalDuration().multipliedBy(AnomalyDetectorSettings.DOOR_KEEPER_MAINTENANCE_FREQ), + clock ); + }); // Won't retry cold start within 60 intervals for an entity if (doorKeeper.mightContain(modelId)) { diff --git a/src/main/java/org/opensearch/ad/ml/ModelManager.java b/src/main/java/org/opensearch/ad/ml/ModelManager.java index 91f95cb0b..24349eb66 100644 --- a/src/main/java/org/opensearch/ad/ml/ModelManager.java +++ b/src/main/java/org/opensearch/ad/ml/ModelManager.java @@ -660,12 +660,16 @@ public Map getModelSize(String detectorId) { .entrySet() .stream() .filter(entry -> SingleStreamModelIdMapper.getDetectorIdForModelId(entry.getKey()).equals(detectorId)) - .forEach(entry -> { res.put(entry.getKey(), memoryTracker.estimateTRCFModelSize(entry.getValue().getModel())); }); + .forEach(entry -> { + res.put(entry.getKey(), memoryTracker.estimateTRCFModelSize(entry.getValue().getModel())); + }); thresholds .entrySet() .stream() .filter(entry -> SingleStreamModelIdMapper.getDetectorIdForModelId(entry.getKey()).equals(detectorId)) - .forEach(entry -> { res.put(entry.getKey(), (long) memoryTracker.getThresholdModelBytes()); }); + .forEach(entry -> { + res.put(entry.getKey(), (long) memoryTracker.getThresholdModelBytes()); + }); return res; } diff --git a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java index 1263296c7..b920e671d 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java @@ -911,17 +911,18 @@ protected void validateAnomalyDetectorFeatures(String detectorId, boolean indexi return; } // checking runtime error from feature query - ActionListener>> validateFeatureQueriesListener = ActionListener - .wrap(response -> { checkADNameExists(detectorId, indexingDryRun); }, exception -> { - listener - .onFailure( - new ADValidationException( - exception.getMessage(), - DetectorValidationIssueType.FEATURE_ATTRIBUTES, - ValidationAspect.DETECTOR - ) - ); - }); + ActionListener>> validateFeatureQueriesListener = ActionListener.wrap(response -> { + checkADNameExists(detectorId, indexingDryRun); + }, exception -> { + listener + .onFailure( + new ADValidationException( + exception.getMessage(), + DetectorValidationIssueType.FEATURE_ATTRIBUTES, + ValidationAspect.DETECTOR + ) + ); + }); MultiResponsesDelegateActionListener>> multiFeatureQueriesResponseListener = new MultiResponsesDelegateActionListener>>( validateFeatureQueriesListener, diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index d6b873ef2..47c9b2dbc 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -255,21 +255,12 @@ private void onGetAnomalyDetectorJobForWrite( ); // Get latest realtime task and check its state before index job. Will reset running realtime task // as STOPPED first if job disabled, then start new job and create new realtime task. - adTaskManager - .startDetector( - detector, - null, - job.getUser(), - transportService, - ActionListener - .wrap( - r -> { indexAnomalyDetectorJob(newJob, null, listener); }, - e -> { - // Have logged error message in ADTaskManager#startDetector - listener.onFailure(e); - } - ) - ); + adTaskManager.startDetector(detector, null, job.getUser(), transportService, ActionListener.wrap(r -> { + indexAnomalyDetectorJob(newJob, null, listener); + }, e -> { + // Have logged error message in ADTaskManager#startDetector + listener.onFailure(e); + })); } } catch (IOException e) { String message = "Failed to parse anomaly detector job " + job.getName(); @@ -277,14 +268,9 @@ private void onGetAnomalyDetectorJobForWrite( listener.onFailure(new OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR)); } } else { - adTaskManager - .startDetector( - detector, - null, - job.getUser(), - transportService, - ActionListener.wrap(r -> { indexAnomalyDetectorJob(job, null, listener); }, e -> listener.onFailure(e)) - ); + adTaskManager.startDetector(detector, null, job.getUser(), transportService, ActionListener.wrap(r -> { + indexAnomalyDetectorJob(job, null, listener); + }, e -> listener.onFailure(e))); } } diff --git a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java index ee6d52a69..4cd77942d 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/ModelValidationActionHandler.java @@ -738,17 +738,18 @@ private void processTopEntityResults(SearchResponse response, long latestTime) { } private void checkFeatureQueryDelegate(long latestTime) throws IOException { - ActionListener> validateFeatureQueriesListener = ActionListener - .wrap(response -> { windowDelayRecommendation(latestTime); }, exception -> { - listener - .onFailure( - new ADValidationException( - exception.getMessage(), - DetectorValidationIssueType.FEATURE_ATTRIBUTES, - ValidationAspect.MODEL - ) - ); - }); + ActionListener> validateFeatureQueriesListener = ActionListener.wrap(response -> { + windowDelayRecommendation(latestTime); + }, exception -> { + listener + .onFailure( + new ADValidationException( + exception.getMessage(), + DetectorValidationIssueType.FEATURE_ATTRIBUTES, + ValidationAspect.MODEL + ) + ); + }); MultiResponsesDelegateActionListener> multiFeatureQueriesResponseListener = new MultiResponsesDelegateActionListener<>( validateFeatureQueriesListener, diff --git a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java index e8a166566..18844f860 100644 --- a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java @@ -605,11 +605,9 @@ public void forwardOrExecuteADTask( .updateADTask( adTask.getTaskId(), updatedFields, - ActionListener - .wrap( - r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener), - e -> { workerNodeResponseListener.onFailure(e); } - ) + ActionListener.wrap(r -> forwardOrExecuteEntityTask(adTask, transportService, workerNodeResponseListener), e -> { + workerNodeResponseListener.onFailure(e); + }) ); } } catch (Exception e) { diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index ce0a0b838..7a10dd738 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -254,18 +254,9 @@ public ADTaskManager( .withType(TransportRequestOptions.Type.REG) .withTimeout(REQUEST_TIMEOUT.get(settings)) .build(); - clusterService - .getClusterSettings() - .addSettingsUpdateConsumer( - REQUEST_TIMEOUT, - it -> { - transportRequestOptions = TransportRequestOptions - .builder() - .withType(TransportRequestOptions.Type.REG) - .withTimeout(it) - .build(); - } - ); + clusterService.getClusterSettings().addSettingsUpdateConsumer(REQUEST_TIMEOUT, it -> { + transportRequestOptions = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.REG).withTimeout(it).build(); + }); this.threadPool = threadPool; this.checkingTaskSlot = new Semaphore(1); this.scaleEntityTaskLane = new Semaphore(1); @@ -1249,16 +1240,12 @@ private void stopHistoricalAnalysis( String userName = user == null ? null : user.getName(); ADCancelTaskRequest cancelTaskRequest = new ADCancelTaskRequest(detectorId, taskId, userName, dataNodes); - client - .execute( - ADCancelTaskAction.INSTANCE, - cancelTaskRequest, - ActionListener - .wrap(response -> { listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); }, e -> { - logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, e); - listener.onFailure(e); - }) - ); + client.execute(ADCancelTaskAction.INSTANCE, cancelTaskRequest, ActionListener.wrap(response -> { + listener.onResponse(new AnomalyDetectorJobResponse(taskId, 0, 0, 0, RestStatus.OK)); + }, e -> { + logger.error("Failed to cancel AD task " + taskId + ", detector id: " + detectorId, e); + listener.onFailure(e); + })); } private boolean lastUpdateTimeOfHistoricalTaskExpired(ADTask adTask) { @@ -1367,16 +1354,9 @@ public void cleanDetectorCache( protected void cleanDetectorCache(ADTask adTask, TransportService transportService, AnomalyDetectorFunction function) { String detectorId = adTask.getDetectorId(); String taskId = adTask.getTaskId(); - cleanDetectorCache( - adTask, - transportService, - function, - ActionListener - .wrap( - r -> { logger.debug("Successfully cleaned cache for detector {}, task {}", detectorId, taskId); }, - e -> { logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, e); } - ) - ); + cleanDetectorCache(adTask, transportService, function, ActionListener.wrap(r -> { + logger.debug("Successfully cleaned cache for detector {}, task {}", detectorId, taskId); + }, e -> { logger.error("Failed to clean cache for detector " + detectorId + ", task " + taskId, e); })); } /** @@ -1848,14 +1828,9 @@ public void updateADTask(String taskId, Map updatedFields, Actio * @param taskId AD task id */ public void deleteADTask(String taskId) { - deleteADTask( - taskId, - ActionListener - .wrap( - r -> { logger.info("Deleted AD task {} with status: {}", taskId, r.status()); }, - e -> { logger.error("Failed to delete AD task " + taskId, e); } - ) - ); + deleteADTask(taskId, ActionListener.wrap(r -> { logger.info("Deleted AD task {} with status: {}", taskId, r.status()); }, e -> { + logger.error("Failed to delete AD task " + taskId, e); + })); } /** @@ -1933,16 +1908,12 @@ private void deleteADResultOfDetector(String detectorId) { logger.info("Start to delete AD results of detector {}", detectorId); DeleteByQueryRequest deleteADResultsRequest = new DeleteByQueryRequest(ALL_AD_RESULTS_INDEX_PATTERN); deleteADResultsRequest.setQuery(new TermQueryBuilder(DETECTOR_ID_FIELD, detectorId)); - client - .execute( - DeleteByQueryAction.INSTANCE, - deleteADResultsRequest, - ActionListener - .wrap(response -> { logger.debug("Successfully deleted AD results of detector " + detectorId); }, exception -> { - logger.error("Failed to delete AD results of detector " + detectorId, exception); - adTaskCacheManager.addDeletedDetector(detectorId); - }) - ); + client.execute(DeleteByQueryAction.INSTANCE, deleteADResultsRequest, ActionListener.wrap(response -> { + logger.debug("Successfully deleted AD results of detector " + detectorId); + }, exception -> { + logger.error("Failed to delete AD results of detector " + detectorId, exception); + adTaskCacheManager.addDeletedDetector(detectorId); + })); } /** @@ -2505,15 +2476,9 @@ public void runNextEntityForHCADHistorical( ActionListener listener ) { String detectorId = adTask.getDetectorId(); - int scaleDelta = scaleTaskSlots( - adTask, - transportService, - ActionListener - .wrap( - r -> { logger.debug("Scale up task slots done for detector {}, task {}", detectorId, adTask.getTaskId()); }, - e -> { logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), e); } - ) - ); + int scaleDelta = scaleTaskSlots(adTask, transportService, ActionListener.wrap(r -> { + logger.debug("Scale up task slots done for detector {}, task {}", detectorId, adTask.getTaskId()); + }, e -> { logger.error("Failed to scale up task slots for task " + adTask.getTaskId(), e); })); if (scaleDelta < 0) { logger .warn( @@ -2740,16 +2705,12 @@ public ADTaskProfile getLocalADTaskProfilesByDetectorId(String detectorId) { detectorTaskProfile.setDetectorTaskSlots(1); } } - threadPool - .executor(AD_BATCH_TASK_THREAD_POOL_NAME) - .execute( - () -> { - // Clean expired HC batch task run states as it may exists after HC historical analysis done if user cancel - // before querying top entities done. We will clean it in hourly cron, check "maintainRunningHistoricalTasks" - // method. Clean it up here when get task profile to release memory earlier. - adTaskCacheManager.cleanExpiredHCBatchTaskRunStates(); - } - ); + threadPool.executor(AD_BATCH_TASK_THREAD_POOL_NAME).execute(() -> { + // Clean expired HC batch task run states as it may exists after HC historical analysis done if user cancel + // before querying top entities done. We will clean it in hourly cron, check "maintainRunningHistoricalTasks" + // method. Clean it up here when get task profile to release memory earlier. + adTaskCacheManager.cleanExpiredHCBatchTaskRunStates(); + }); logger.debug("Local AD task profile of detector {}: {}", detectorId, detectorTaskProfile); return detectorTaskProfile; } @@ -3056,21 +3017,9 @@ private void maintainRunningHistoricalTask(ConcurrentLinkedQueue taskQue resetHistoricalDetectorTaskState(ImmutableList.of(adTask), () -> { logger.debug("Finished maintaining running historical task {}", adTask.getTaskId()); maintainRunningHistoricalTask(taskQueue, transportService); - }, - transportService, - ActionListener - .wrap( - r -> { - logger - .debug( - "Reset historical task state done for task {}, detector {}", - adTask.getTaskId(), - adTask.getDetectorId() - ); - }, - e -> { logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), e); } - ) - ); + }, transportService, ActionListener.wrap(r -> { + logger.debug("Reset historical task state done for task {}, detector {}", adTask.getTaskId(), adTask.getDetectorId()); + }, e -> { logger.error("Failed to reset historical task state for task " + adTask.getTaskId(), e); })); }, TimeValue.timeValueSeconds(DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), AD_BATCH_TASK_THREAD_POOL_NAME); } diff --git a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java index bc70ebae8..e3d489e72 100644 --- a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java @@ -154,25 +154,17 @@ private void deleteAnomalyDetectorJobDoc(String detectorId, ActionListener listener) { LOG.info("Delete detector info {}", detectorId); DeleteRequest deleteRequest = new DeleteRequest(CommonName.DETECTION_STATE_INDEX, detectorId); - client - .delete( - deleteRequest, - ActionListener - .wrap( - response -> { - // whether deleted state doc or not, continue as state doc may not exist - deleteAnomalyDetectorDoc(detectorId, listener); - }, - exception -> { - if (exception instanceof IndexNotFoundException) { - deleteAnomalyDetectorDoc(detectorId, listener); - } else { - LOG.error("Failed to delete detector state", exception); - listener.onFailure(exception); - } - } - ) - ); + client.delete(deleteRequest, ActionListener.wrap(response -> { + // whether deleted state doc or not, continue as state doc may not exist + deleteAnomalyDetectorDoc(detectorId, listener); + }, exception -> { + if (exception instanceof IndexNotFoundException) { + deleteAnomalyDetectorDoc(detectorId, listener); + } else { + LOG.error("Failed to delete detector state", exception); + listener.onFailure(exception); + } + })); } private void deleteAnomalyDetectorDoc(String detectorId, ActionListener listener) { diff --git a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java index 7d6ee8f76..1786904c6 100644 --- a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java @@ -171,37 +171,27 @@ protected void getExecute(GetAnomalyDetectorRequest request, ActionListener { - listener - .onResponse( - new GetAnomalyDetectorResponse( - 0, - null, - 0, - 0, - null, - null, - false, - null, - null, - false, - null, - null, - profile, - true - ) - ); - }, - e -> listener.onFailure(e) + profileRunner.profile(detectorID, entity, entityProfilesToCollect, ActionListener.wrap(profile -> { + listener + .onResponse( + new GetAnomalyDetectorResponse( + 0, + null, + 0, + 0, + null, + null, + false, + null, + null, + false, + null, + null, + profile, + true ) - ); + ); + }, e -> listener.onFailure(e))); } else { Set profilesToCollect = getProfilesToCollect(typesStr, all); AnomalyDetectorProfileRunner profileRunner = new AnomalyDetectorProfileRunner( diff --git a/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java index 1a6557e1d..f1a115741 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchAnomalyResultTransportAction.java @@ -184,20 +184,12 @@ void multiSearch( context.restore(); // Send multiple search to check which index a user has permission to read. If search all indices directly, // search request will throw exception if user has no permission to search any index. - client - .multiSearch( - multiSearchRequest, - ActionListener - .wrap( - multiSearchResponse -> { - processMultiSearchResponse(multiSearchResponse, targetIndices, readableIndices, request, listener); - }, - multiSearchException -> { - logger.error("Failed to search custom AD result indices", multiSearchException); - listener.onFailure(multiSearchException); - } - ) - ); + client.multiSearch(multiSearchRequest, ActionListener.wrap(multiSearchResponse -> { + processMultiSearchResponse(multiSearchResponse, targetIndices, readableIndices, request, listener); + }, multiSearchException -> { + logger.error("Failed to search custom AD result indices", multiSearchException); + listener.onFailure(multiSearchException); + })); } @VisibleForTesting diff --git a/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java b/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java index eda604ee5..6948be72a 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/AnomalyIndexHandler.java @@ -193,41 +193,34 @@ protected void save(T toSave, String detectorId, String indexName) { } void saveIteration(IndexRequest indexRequest, String detectorId, Iterator backoff) { - clientUtil - .asyncRequest( - indexRequest, - client::index, - ActionListener - .wrap( - response -> { LOG.debug(String.format(Locale.ROOT, SUCCESS_SAVING_MSG, detectorId)); }, - exception -> { - // OpenSearch has a thread pool and a queue for write per node. A thread - // pool will have N number of workers ready to handle the requests. When a - // request comes and if a worker is free , this is handled by the worker. Now by - // default the number of workers is equal to the number of cores on that CPU. - // When the workers are full and there are more write requests, the request - // will go to queue. The size of queue is also limited. If by default size is, - // say, 200 and if there happens more parallel requests than this, then those - // requests would be rejected as you can see OpenSearchRejectedExecutionException. - // So OpenSearchRejectedExecutionException is the way that OpenSearch tells us that - // it cannot keep up with the current indexing rate. - // When it happens, we should pause indexing a bit before trying again, ideally - // with randomized exponential backoff. - Throwable cause = ExceptionsHelper.unwrapCause(exception); - if (!(cause instanceof OpenSearchRejectedExecutionException) || !backoff.hasNext()) { - LOG.error(String.format(Locale.ROOT, FAIL_TO_SAVE_ERR_MSG, detectorId), cause); - } else { - TimeValue nextDelay = backoff.next(); - LOG.warn(String.format(Locale.ROOT, RETRY_SAVING_ERR_MSG, detectorId), cause); - threadPool - .schedule( - () -> saveIteration(BulkUtil.cloneIndexRequest(indexRequest), detectorId, backoff), - nextDelay, - ThreadPool.Names.SAME - ); - } - } - ) - ); + clientUtil.asyncRequest(indexRequest, client::index, ActionListener.wrap(response -> { + LOG.debug(String.format(Locale.ROOT, SUCCESS_SAVING_MSG, detectorId)); + }, exception -> { + // OpenSearch has a thread pool and a queue for write per node. A thread + // pool will have N number of workers ready to handle the requests. When a + // request comes and if a worker is free , this is handled by the worker. Now by + // default the number of workers is equal to the number of cores on that CPU. + // When the workers are full and there are more write requests, the request + // will go to queue. The size of queue is also limited. If by default size is, + // say, 200 and if there happens more parallel requests than this, then those + // requests would be rejected as you can see OpenSearchRejectedExecutionException. + // So OpenSearchRejectedExecutionException is the way that OpenSearch tells us that + // it cannot keep up with the current indexing rate. + // When it happens, we should pause indexing a bit before trying again, ideally + // with randomized exponential backoff. + Throwable cause = ExceptionsHelper.unwrapCause(exception); + if (!(cause instanceof OpenSearchRejectedExecutionException) || !backoff.hasNext()) { + LOG.error(String.format(Locale.ROOT, FAIL_TO_SAVE_ERR_MSG, detectorId), cause); + } else { + TimeValue nextDelay = backoff.next(); + LOG.warn(String.format(Locale.ROOT, RETRY_SAVING_ERR_MSG, detectorId), cause); + threadPool + .schedule( + () -> saveIteration(BulkUtil.cloneIndexRequest(indexRequest), detectorId, backoff), + nextDelay, + ThreadPool.Names.SAME + ); + } + })); } } diff --git a/src/main/java/org/opensearch/ad/util/ClientUtil.java b/src/main/java/org/opensearch/ad/util/ClientUtil.java index 58b23c968..f3f6ef68d 100644 --- a/src/main/java/org/opensearch/ad/util/ClientUtil.java +++ b/src/main/java/org/opensearch/ad/util/ClientUtil.java @@ -89,14 +89,9 @@ public Optional consumer .accept( request, - new LatchedActionListener( - ActionListener - .wrap( - response -> { respReference.set(response); }, - exception -> { LOG.error("Cannot get response for request {}, error: {}", request, exception); } - ), - latch - ) + new LatchedActionListener(ActionListener.wrap(response -> { respReference.set(response); }, exception -> { + LOG.error("Cannot get response for request {}, error: {}", request, exception); + }), latch) ); if (!latch.await(requestTimeout.getSeconds(), TimeUnit.SECONDS)) { @@ -142,12 +137,9 @@ public void exe Request request, ActionListener listener ) { - client - .execute( - action, - request, - ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { listener.onFailure(exception); }) - ); + client.execute(action, request, ActionListener.wrap(response -> { listener.onResponse(response); }, exception -> { + listener.onFailure(exception); + })); } /** @@ -249,15 +241,12 @@ public boolean hasRunningQuery(AnomalyDetector detector) { private void cancelRunningQuery(Client client, String detectorId, Logger LOG) { ListTasksRequest listTasksRequest = new ListTasksRequest(); listTasksRequest.setActions("*search*"); - client - .execute( - ListTasksAction.INSTANCE, - listTasksRequest, - ActionListener.wrap(response -> { onListTaskResponse(response, detectorId, LOG); }, exception -> { - LOG.error("List Tasks failed.", exception); - throw new InternalFailure(detectorId, "Failed to list current tasks", exception); - }) - ); + client.execute(ListTasksAction.INSTANCE, listTasksRequest, ActionListener.wrap(response -> { + onListTaskResponse(response, detectorId, LOG); + }, exception -> { + LOG.error("List Tasks failed.", exception); + throw new InternalFailure(detectorId, "Failed to list current tasks", exception); + })); } /** @@ -300,15 +289,12 @@ private void onListTaskResponse(ListTasksResponse listTasksResponse, String dete LOG.info("Start to cancel task for taskId: {}", matchedSingleTaskId.toString()); } - client - .execute( - CancelTasksAction.INSTANCE, - cancelTaskRequest, - ActionListener.wrap(response -> { onCancelTaskResponse(response, detectorId, LOG); }, exception -> { - LOG.error("Failed to cancel task for detectorId: " + detectorId, exception); - throw new InternalFailure(detectorId, "Failed to cancel current tasks", exception); - }) - ); + client.execute(CancelTasksAction.INSTANCE, cancelTaskRequest, ActionListener.wrap(response -> { + onCancelTaskResponse(response, detectorId, LOG); + }, exception -> { + LOG.error("Failed to cancel task for detectorId: " + detectorId, exception); + throw new InternalFailure(detectorId, "Failed to cancel current tasks", exception); + })); } /** diff --git a/src/main/java/org/opensearch/ad/util/ExceptionUtil.java b/src/main/java/org/opensearch/ad/util/ExceptionUtil.java index 1c2bd68b0..36f22ffa4 100644 --- a/src/main/java/org/opensearch/ad/util/ExceptionUtil.java +++ b/src/main/java/org/opensearch/ad/util/ExceptionUtil.java @@ -150,11 +150,9 @@ public static boolean isRetryAble(RestStatus status) { * @return the wrapped listener */ public static ActionListener wrapListener(ActionListener original, Exception exceptionToReturn, String detectorId) { - return ActionListener - .wrap( - r -> { original.onFailure(exceptionToReturn); }, - e -> { original.onFailure(selectHigherPriorityException(exceptionToReturn, e)); } - ); + return ActionListener.wrap(r -> { original.onFailure(exceptionToReturn); }, e -> { + original.onFailure(selectHigherPriorityException(exceptionToReturn, e)); + }); } /** diff --git a/src/test/java/org/opensearch/ad/NodeStateManagerTests.java b/src/test/java/org/opensearch/ad/NodeStateManagerTests.java index 8e0152096..1924e7cfe 100644 --- a/src/test/java/org/opensearch/ad/NodeStateManagerTests.java +++ b/src/test/java/org/opensearch/ad/NodeStateManagerTests.java @@ -11,12 +11,12 @@ package org.opensearch.ad; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyZeroInteractions; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BACKOFF_MINUTES; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_RETRY_FOR_UNRESPONSIVE_NODE; @@ -232,7 +232,7 @@ public void testShouldMute() { public void testMaintenanceDoNothing() { stateManager.maintenance(); - verifyZeroInteractions(clock); + verifyNoInteractions(clock); } public void testHasRunningQuery() throws IOException { diff --git a/src/test/java/org/opensearch/ad/TestHelpers.java b/src/test/java/org/opensearch/ad/TestHelpers.java index 3c3acbfcb..ea0109e68 100644 --- a/src/test/java/org/opensearch/ad/TestHelpers.java +++ b/src/test/java/org/opensearch/ad/TestHelpers.java @@ -12,14 +12,14 @@ package org.opensearch.ad; import static org.apache.http.entity.ContentType.APPLICATION_JSON; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder; import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO; import static org.opensearch.test.OpenSearchTestCase.*; -import static org.powermock.api.mockito.PowerMockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java b/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java index 1a51cb9be..6f30ed118 100644 --- a/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java +++ b/src/test/java/org/opensearch/ad/cluster/ADClusterEventListenerTests.java @@ -13,7 +13,7 @@ import static java.util.Collections.emptyMap; import static java.util.Collections.emptySet; -import static org.mockito.Matchers.any; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES; diff --git a/src/test/java/org/opensearch/ad/cluster/HashRingTests.java b/src/test/java/org/opensearch/ad/cluster/HashRingTests.java index 7c9a82896..3f3949558 100644 --- a/src/test/java/org/opensearch/ad/cluster/HashRingTests.java +++ b/src/test/java/org/opensearch/ad/cluster/HashRingTests.java @@ -207,7 +207,9 @@ public void testBuildAndGetOwningNodeWithSameLocalAdVersion() { .buildAndGetOwningNodeWithSameLocalAdVersion( "testModelId", node -> { assertTrue(node.isPresent()); }, - ActionListener.wrap(r -> {}, e -> { assertFalse("Failed to build hash ring", true); }) + ActionListener.wrap(r -> {}, e -> { + assertFalse("Failed to build hash ring", true); + }) ); } diff --git a/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java b/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java index 949dab85b..656881eeb 100644 --- a/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java +++ b/src/test/java/org/opensearch/ad/cluster/diskcleanup/IndexCleanupTests.java @@ -107,13 +107,9 @@ public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsTrue() throws Excep public void testDeleteDocsBasedOnShardSizeWithCleanupNeededAsFalse() throws Exception { long maxShardSize = 1000; when(storeStats.getSizeInBytes()).thenReturn(maxShardSize - 1); - indexCleanup - .deleteDocsBasedOnShardSize( - "indexname", - maxShardSize, - null, - ActionListener.wrap(Assert::assertFalse, exception -> { throw new RuntimeException(exception); }) - ); + indexCleanup.deleteDocsBasedOnShardSize("indexname", maxShardSize, null, ActionListener.wrap(Assert::assertFalse, exception -> { + throw new RuntimeException(exception); + })); } public void testDeleteDocsBasedOnShardSizeIndexNotExisted() throws Exception { diff --git a/src/test/java/org/opensearch/ad/dataprocessor/IntegerSensitiveSingleFeatureLinearUniformInterpolatorTests.java b/src/test/java/org/opensearch/ad/dataprocessor/IntegerSensitiveSingleFeatureLinearUniformInterpolatorTests.java index 045c05297..64324a0e4 100644 --- a/src/test/java/org/opensearch/ad/dataprocessor/IntegerSensitiveSingleFeatureLinearUniformInterpolatorTests.java +++ b/src/test/java/org/opensearch/ad/dataprocessor/IntegerSensitiveSingleFeatureLinearUniformInterpolatorTests.java @@ -15,13 +15,13 @@ import java.util.Arrays; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class IntegerSensitiveSingleFeatureLinearUniformInterpolatorTests { diff --git a/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java b/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java index a46e4b897..8a3795dcd 100644 --- a/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java +++ b/src/test/java/org/opensearch/ad/feature/FeatureManagerTests.java @@ -19,8 +19,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.Matchers.argThat; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -46,9 +46,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.atomic.AtomicBoolean; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -67,6 +64,9 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.threadpool.ThreadPool; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) @SuppressWarnings("unchecked") public class FeatureManagerTests { diff --git a/src/test/java/org/opensearch/ad/feature/FeaturesTests.java b/src/test/java/org/opensearch/ad/feature/FeaturesTests.java index 447bdd6c4..7a6b3b8e1 100644 --- a/src/test/java/org/opensearch/ad/feature/FeaturesTests.java +++ b/src/test/java/org/opensearch/ad/feature/FeaturesTests.java @@ -18,12 +18,12 @@ import java.util.List; import java.util.Map.Entry; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class FeaturesTests { diff --git a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java index 9d90435aa..bb2b97bee 100644 --- a/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/ad/feature/NoPowermockSearchFeatureDaoTests.java @@ -341,8 +341,9 @@ public void testGetHighestCountEntitiesExhaustedPages() throws InterruptedExcept when(emptyComposite.getName()).thenReturn(SearchFeatureDao.AGG_NAME_TOP); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); SearchResponse emptyResponse = new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); diff --git a/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoParamTests.java b/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoParamTests.java index 67d706159..0974be907 100644 --- a/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoParamTests.java +++ b/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoParamTests.java @@ -35,9 +35,6 @@ import java.util.Optional; import java.util.concurrent.ExecutorService; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.junit.Test; @@ -59,7 +56,6 @@ import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.settings.AnomalyDetectorSettings; -import org.opensearch.ad.util.ParseUtils; import org.opensearch.ad.util.SecurityClientUtil; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -82,13 +78,9 @@ import org.opensearch.search.aggregations.metrics.Percentile; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.threadpool.ThreadPool; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.modules.junit4.PowerMockRunnerDelegate; -import com.google.gson.Gson; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; /** * Due to https://tinyurl.com/2y265s2w, tests with and without @Parameters annotation @@ -96,10 +88,7 @@ * while SearchFeatureDaoTests do not use @Parameters. * */ -@PowerMockIgnore("javax.management.*") -@RunWith(PowerMockRunner.class) -@PowerMockRunnerDelegate(JUnitParamsRunner.class) -@PrepareForTest({ ParseUtils.class, Gson.class }) +@RunWith(JUnitParamsRunner.class) public class SearchFeatureDaoParamTests { private SearchFeatureDao searchFeatureDao; @@ -156,7 +145,6 @@ public class SearchFeatureDaoParamTests { @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); - PowerMockito.mockStatic(ParseUtils.class); interpolator = new LinearUniformInterpolator(new SingleFeatureLinearUniformInterpolator()); @@ -233,14 +221,13 @@ public void getFeaturesForPeriod_returnExpectedToListener(List aggs long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); when(searchResponse.getAggregations()).thenReturn(new Aggregations(aggs)); when(detector.getEnabledFeatureIds()).thenReturn(featureIds); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); diff --git a/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoTests.java b/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoTests.java index a85f9cbcd..a4142cf42 100644 --- a/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoTests.java +++ b/src/test/java/org/opensearch/ad/feature/SearchFeatureDaoTests.java @@ -12,6 +12,7 @@ package org.opensearch.ad.feature; import static java.util.Arrays.asList; +import static java.util.Collections.emptyMap; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.AnyOf.anyOf; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -44,7 +45,6 @@ import org.apache.lucene.search.TotalHits; import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -65,7 +65,6 @@ import org.opensearch.ad.model.Entity; import org.opensearch.ad.model.IntervalTimeConfiguration; import org.opensearch.ad.settings.AnomalyDetectorSettings; -import org.opensearch.ad.util.ParseUtils; import org.opensearch.ad.util.SecurityClientUtil; import org.opensearch.client.Client; import org.opensearch.common.action.ActionFuture; @@ -90,6 +89,7 @@ import org.opensearch.search.aggregations.AggregatorFactories; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.aggregations.bucket.MultiBucketsAggregation; +import org.opensearch.search.aggregations.metrics.InternalMax; import org.opensearch.search.aggregations.metrics.InternalMin; import org.opensearch.search.aggregations.metrics.InternalTDigestPercentiles; import org.opensearch.search.aggregations.metrics.Max; @@ -98,14 +98,7 @@ import org.opensearch.search.aggregations.metrics.Percentile; import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.threadpool.ThreadPool; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; - -@PowerMockIgnore("javax.management.*") -@RunWith(PowerMockRunner.class) -@PrepareForTest({ ParseUtils.class }) + public class SearchFeatureDaoTests { private SearchFeatureDao searchFeatureDao; @@ -159,7 +152,6 @@ public class SearchFeatureDaoTests { @Before public void setup() throws Exception { MockitoAnnotations.initMocks(this); - PowerMockito.mockStatic(ParseUtils.class); interpolator = new LinearUniformInterpolator(new SingleFeatureLinearUniformInterpolator()); @@ -265,7 +257,10 @@ public void getLatestDataTime_returnExpectedToListener() { return null; }).when(client).search(eq(searchRequest), any(ActionListener.class)); - when(ParseUtils.getLatestDataTime(eq(searchResponse))).thenReturn(Optional.of(epochTime)); + InternalMax maxAgg = new InternalMax(CommonName.AGG_NAME_MAX_TIME, epochTime, DocValueFormat.RAW, emptyMap()); + InternalAggregations internalAggregations = InternalAggregations.from(Collections.singletonList(maxAgg)); + when(searchResponse.getAggregations()).thenReturn(internalAggregations); + ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getLatestDataTime(detector, listener); @@ -296,13 +291,12 @@ public void getFeaturesForPeriod_throwToListener_whenResponseParsingFails() thro long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); when(detector.getEnabledFeatureIds()).thenReturn(null); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onResponse(searchResponse); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); @@ -316,12 +310,11 @@ public void getFeaturesForPeriod_throwToListener_whenSearchFails() throws Except long start = 100L; long end = 200L; - when(ParseUtils.generateInternalFeatureQuery(eq(detector), eq(start), eq(end), eq(xContent))).thenReturn(searchSourceBuilder); doAnswer(invocation -> { ActionListener listener = invocation.getArgument(1); listener.onFailure(new RuntimeException()); return null; - }).when(client).search(eq(searchRequest), any(ActionListener.class)); + }).when(client).search(any(SearchRequest.class), any(ActionListener.class)); ActionListener> listener = mock(ActionListener.class); searchFeatureDao.getFeaturesForPeriod(detector, start, end, listener); diff --git a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java index 5607a9b94..1272bdc20 100644 --- a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -82,28 +82,14 @@ public void testAnomalyDetectorIndexExists() throws IOException { } public void testAnomalyDetectorIndexExistsAndNotRecreate() throws IOException { - indices - .initAnomalyDetectorIndexIfAbsent( - TestHelpers - .createActionListener( - response -> response.isAcknowledged(), - failure -> { throw new RuntimeException("should not recreate index"); } - ) - ); + indices.initAnomalyDetectorIndexIfAbsent(TestHelpers.createActionListener(response -> response.isAcknowledged(), failure -> { + throw new RuntimeException("should not recreate index"); + })); TestHelpers.waitForIndexCreationToComplete(client(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); if (client().admin().indices().prepareExists(AnomalyDetector.ANOMALY_DETECTORS_INDEX).get().isExists()) { - indices - .initAnomalyDetectorIndexIfAbsent( - TestHelpers - .createActionListener( - response -> { - throw new RuntimeException("should not recreate index " + AnomalyDetector.ANOMALY_DETECTORS_INDEX); - }, - failure -> { - throw new RuntimeException("should not recreate index " + AnomalyDetector.ANOMALY_DETECTORS_INDEX); - } - ) - ); + indices.initAnomalyDetectorIndexIfAbsent(TestHelpers.createActionListener(response -> { + throw new RuntimeException("should not recreate index " + AnomalyDetector.ANOMALY_DETECTORS_INDEX); + }, failure -> { throw new RuntimeException("should not recreate index " + AnomalyDetector.ANOMALY_DETECTORS_INDEX); })); } } @@ -123,26 +109,15 @@ public void testAnomalyResultIndexExists() throws IOException { public void testAnomalyResultIndexExistsAndNotRecreate() throws IOException { indices .initDefaultAnomalyResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> logger.info("Acknowledged: " + response.isAcknowledged()), - failure -> { throw new RuntimeException("should not recreate index"); } - ) + TestHelpers.createActionListener(response -> logger.info("Acknowledged: " + response.isAcknowledged()), failure -> { + throw new RuntimeException("should not recreate index"); + }) ); TestHelpers.waitForIndexCreationToComplete(client(), CommonName.ANOMALY_RESULT_INDEX_ALIAS); if (client().admin().indices().prepareExists(CommonName.ANOMALY_RESULT_INDEX_ALIAS).get().isExists()) { - indices - .initDefaultAnomalyResultIndexIfAbsent( - TestHelpers - .createActionListener( - response -> { - throw new RuntimeException("should not recreate index " + CommonName.ANOMALY_RESULT_INDEX_ALIAS); - }, - failure -> { - throw new RuntimeException("should not recreate index " + CommonName.ANOMALY_RESULT_INDEX_ALIAS, failure); - } - ) - ); + indices.initDefaultAnomalyResultIndexIfAbsent(TestHelpers.createActionListener(response -> { + throw new RuntimeException("should not recreate index " + CommonName.ANOMALY_RESULT_INDEX_ALIAS); + }, failure -> { throw new RuntimeException("should not recreate index " + CommonName.ANOMALY_RESULT_INDEX_ALIAS, failure); })); } } diff --git a/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java b/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java index c41b94e10..ab6716b6f 100644 --- a/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java +++ b/src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java @@ -11,8 +11,8 @@ package org.opensearch.ad.ml; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -106,10 +106,6 @@ import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.test.OpenSearchTestCase; -import test.org.opensearch.ad.util.JsonDeserializer; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.RandomModelStateConfig; - import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.config.Precision; import com.amazon.randomcutforest.config.TransformMethod; @@ -125,6 +121,9 @@ import io.protostuff.LinkedBuffer; import io.protostuff.Schema; import io.protostuff.runtime.RuntimeSchema; +import test.org.opensearch.ad.util.JsonDeserializer; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; public class CheckpointDaoTests extends OpenSearchTestCase { private static final Logger logger = LogManager.getLogger(CheckpointDaoTests.class); diff --git a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java index 8ac6fe989..abbdb5c86 100644 --- a/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java +++ b/src/test/java/org/opensearch/ad/ml/EntityColdStarterTests.java @@ -59,16 +59,16 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException; -import test.org.opensearch.ad.util.LabelledAnomalyGenerator; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.MultiDimDataWithTime; - import com.amazon.randomcutforest.config.Precision; import com.amazon.randomcutforest.config.TransformMethod; import com.amazon.randomcutforest.parkservices.AnomalyDescriptor; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; import com.google.common.collect.ImmutableList; +import test.org.opensearch.ad.util.LabelledAnomalyGenerator; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.MultiDimDataWithTime; + public class EntityColdStarterTests extends AbstractCosineDataTest { @BeforeClass diff --git a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java index 534c5f756..a5a7dc287 100644 --- a/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java +++ b/src/test/java/org/opensearch/ad/ml/HCADModelPerfTests.java @@ -48,12 +48,12 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.test.ClusterServiceUtils; -import test.org.opensearch.ad.util.LabelledAnomalyGenerator; -import test.org.opensearch.ad.util.MultiDimDataWithTime; - import com.carrotsearch.randomizedtesting.annotations.TimeoutSuite; import com.google.common.collect.ImmutableList; +import test.org.opensearch.ad.util.LabelledAnomalyGenerator; +import test.org.opensearch.ad.util.MultiDimDataWithTime; + @TimeoutSuite(millis = 60 * TimeUnits.MINUTE) // rcf may be slow due to bounding box cache disabled public class HCADModelPerfTests extends AbstractCosineDataTest { diff --git a/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java b/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java index be8ff9525..5e6ffdb9b 100644 --- a/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java +++ b/src/test/java/org/opensearch/ad/ml/HybridThresholdingModelTests.java @@ -17,13 +17,13 @@ import java.util.Arrays; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.apache.commons.math3.distribution.NormalDistribution; import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class HybridThresholdingModelTests { diff --git a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java index d16776fe2..8cf80a89d 100644 --- a/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java +++ b/src/test/java/org/opensearch/ad/ml/ModelManagerTests.java @@ -17,7 +17,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Matchers.eq; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; @@ -44,9 +44,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -80,14 +77,16 @@ import org.opensearch.monitor.jvm.JvmService; import org.opensearch.threadpool.ThreadPool; -import test.org.opensearch.ad.util.MLUtil; -import test.org.opensearch.ad.util.RandomModelStateConfig; - import com.amazon.randomcutforest.RandomCutForest; import com.amazon.randomcutforest.parkservices.AnomalyDescriptor; import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; import com.amazon.randomcutforest.returntypes.DiVector; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import test.org.opensearch.ad.util.MLUtil; +import test.org.opensearch.ad.util.RandomModelStateConfig; + @RunWith(JUnitParamsRunner.class) @SuppressWarnings("unchecked") public class ModelManagerTests { diff --git a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java index 0daddea0d..1c73dceff 100644 --- a/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java +++ b/src/test/java/org/opensearch/ad/ml/ThresholdingResultTests.java @@ -13,12 +13,12 @@ import static org.junit.Assert.assertEquals; -import junitparams.JUnitParamsRunner; -import junitparams.Parameters; - import org.junit.Test; import org.junit.runner.RunWith; +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; + @RunWith(JUnitParamsRunner.class) public class ThresholdingResultTests { diff --git a/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java b/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java index 96bac0f5b..fed9b47ac 100644 --- a/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java +++ b/src/test/java/org/opensearch/ad/mock/plugin/MockReindexPlugin.java @@ -122,11 +122,9 @@ protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener .execute( BulkAction.INSTANCE, bulkRequest, - ActionListener - .wrap( - res -> { listener.onResponse(mockBulkByScrollResponse(totalHits)); }, - ex -> { listener.onFailure(ex); } - ) + ActionListener.wrap(res -> { listener.onResponse(mockBulkByScrollResponse(totalHits)); }, ex -> { + listener.onFailure(ex); + }) ); }, e -> { listener.onFailure(e); })); diff --git a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java index 5ea4a158f..9512aee98 100644 --- a/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java +++ b/src/test/java/org/opensearch/ad/ratelimit/CheckpointReadWorkerTests.java @@ -77,11 +77,11 @@ import org.opensearch.threadpool.ThreadPoolStats; import org.opensearch.threadpool.ThreadPoolStats.Stats; +import com.fasterxml.jackson.core.JsonParseException; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.fasterxml.jackson.core.JsonParseException; - public class CheckpointReadWorkerTests extends AbstractRateLimitingTest { CheckpointReadWorker worker; diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index 26a9f8f5f..7314144a6 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -1521,33 +1521,27 @@ public void testSearchTopAnomalyResultsWithInvalidInputs() throws IOException { ); // Missing start time - Exception missingStartTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getDetectorId(), false, "{\"end_time_ms\":2}", client()); } - ); + Exception missingStartTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), false, "{\"end_time_ms\":2}", client()); + }); assertTrue(missingStartTimeException.getMessage().contains("Must set both start time and end time with epoch of milliseconds")); // Missing end time - Exception missingEndTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1}", client()); } - ); + Exception missingEndTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1}", client()); + }); assertTrue(missingEndTimeException.getMessage().contains("Must set both start time and end time with epoch of milliseconds")); // Start time > end time - Exception invalidTimeException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":2, \"end_time_ms\":1}", client()); } - ); + Exception invalidTimeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":2, \"end_time_ms\":1}", client()); + }); assertTrue(invalidTimeException.getMessage().contains("Start time should be before end time")); // Invalid detector ID - Exception invalidDetectorIdException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults(detector.getDetectorId() + "-invalid", false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); - } - ); + Exception invalidDetectorIdException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId() + "-invalid", false, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); + }); assertTrue(invalidDetectorIdException.getMessage().contains("Can't find detector with id")); // Invalid order field @@ -1562,42 +1556,32 @@ public void testSearchTopAnomalyResultsWithInvalidInputs() throws IOException { assertTrue(invalidOrderException.getMessage().contains("Ordering by invalid-order is not a valid option")); // Negative size field - Exception negativeSizeException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":-1}", client()); - } - ); + Exception negativeSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":-1}", client()); + }); assertTrue(negativeSizeException.getMessage().contains("Size must be a positive integer")); // Zero size field - Exception zeroSizeException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":0}", client()); - } - ); + Exception zeroSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), false, "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":0}", client()); + }); assertTrue(zeroSizeException.getMessage().contains("Size must be a positive integer")); // Too large size field - Exception tooLargeSizeException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults( - detector.getDetectorId(), - false, - "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":9999999}", - client() - ); - } - ); + Exception tooLargeSizeException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults( + detector.getDetectorId(), + false, + "{\"start_time_ms\":1, \"end_time_ms\":2, \"size\":9999999}", + client() + ); + }); assertTrue(tooLargeSizeException.getMessage().contains("Size cannot exceed")); // No existing task ID for detector - Exception noTaskIdException = expectThrows( - IOException.class, - () -> { searchTopAnomalyResults(detector.getDetectorId(), true, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); } - ); + Exception noTaskIdException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults(detector.getDetectorId(), true, "{\"start_time_ms\":1, \"end_time_ms\":2}", client()); + }); assertTrue(noTaskIdException.getMessage().contains("No historical tasks found for detector ID " + detector.getDetectorId())); // Invalid category fields @@ -1627,17 +1611,14 @@ public void testSearchTopAnomalyResultsWithInvalidInputs() throws IOException { true, client() ); - Exception noCategoryFieldsException = expectThrows( - IOException.class, - () -> { - searchTopAnomalyResults( - detectorWithNoCategoryFields.getDetectorId(), - false, - "{\"start_time_ms\":1, \"end_time_ms\":2}", - client() - ); - } - ); + Exception noCategoryFieldsException = expectThrows(IOException.class, () -> { + searchTopAnomalyResults( + detectorWithNoCategoryFields.getDetectorId(), + false, + "{\"start_time_ms\":1, \"end_time_ms\":2}", + client() + ); + }); assertTrue( noCategoryFieldsException .getMessage() diff --git a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java index 5094a1fdc..850142726 100644 --- a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java +++ b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java @@ -339,12 +339,9 @@ public void testStartApiFilterByEnabled() throws IOException { // User Cat has AD full access, but is part of different backend role so Cat should not be able to access // Alice detector Instant now = Instant.now(); - Exception exception = expectThrows( - IOException.class, - () -> { - startAnomalyDetector(aliceDetector.getDetectorId(), new DetectionDateRange(now.minus(10, ChronoUnit.DAYS), now), catClient); - } - ); + Exception exception = expectThrows(IOException.class, () -> { + startAnomalyDetector(aliceDetector.getDetectorId(), new DetectionDateRange(now.minus(10, ChronoUnit.DAYS), now), catClient); + }); Assert .assertTrue( exception.getMessage().contains("User does not have permissions to access detector: " + aliceDetector.getDetectorId()) diff --git a/src/test/java/org/opensearch/ad/stats/ADStatsTests.java b/src/test/java/org/opensearch/ad/stats/ADStatsTests.java index 26846edcf..ee736e0b7 100644 --- a/src/test/java/org/opensearch/ad/stats/ADStatsTests.java +++ b/src/test/java/org/opensearch/ad/stats/ADStatsTests.java @@ -45,11 +45,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; +import com.amazon.randomcutforest.RandomCutForest; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.amazon.randomcutforest.RandomCutForest; - public class ADStatsTests extends OpenSearchTestCase { private Map> statsMap; diff --git a/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java b/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java index c0173593c..fd56ae427 100644 --- a/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java +++ b/src/test/java/org/opensearch/ad/stats/suppliers/ModelsOnNodeSupplierTests.java @@ -41,11 +41,11 @@ import org.opensearch.common.settings.Settings; import org.opensearch.test.OpenSearchTestCase; +import com.amazon.randomcutforest.RandomCutForest; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.amazon.randomcutforest.RandomCutForest; - public class ModelsOnNodeSupplierTests extends OpenSearchTestCase { private RandomCutForest rcf; private HybridThresholdingModel thresholdingModel; diff --git a/src/test/java/org/opensearch/ad/transport/ADStatsTests.java b/src/test/java/org/opensearch/ad/transport/ADStatsTests.java index 9f4459fdc..4d9f4f322 100644 --- a/src/test/java/org/opensearch/ad/transport/ADStatsTests.java +++ b/src/test/java/org/opensearch/ad/transport/ADStatsTests.java @@ -48,11 +48,11 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.test.OpenSearchTestCase; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class ADStatsTests extends OpenSearchTestCase { String node1, nodeName1, clusterName; Map clusterStats; diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 4a333907d..ba87e5faa 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -508,13 +508,9 @@ private long getExecutingADTask() { adStatsRequest.addAll(validStats); StatsAnomalyDetectorResponse statsResponse = client().execute(StatsAnomalyDetectorAction.INSTANCE, adStatsRequest).actionGet(5000); AtomicLong totalExecutingTask = new AtomicLong(0); - statsResponse - .getAdStatsResponse() - .getADStatsNodesResponse() - .getNodes() - .forEach( - node -> { totalExecutingTask.getAndAdd((Long) node.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())); } - ); + statsResponse.getAdStatsResponse().getADStatsNodesResponse().getNodes().forEach(node -> { + totalExecutingTask.getAndAdd((Long) node.getStatsMap().get(StatNames.AD_EXECUTING_BATCH_TASK_COUNT.getName())); + }); return totalExecutingTask.get(); } } diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java index 3ec561ecf..05916e0ca 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyResultTests.java @@ -125,10 +125,10 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class AnomalyResultTests extends AbstractADTest { private Settings settings; private TransportService transportService; diff --git a/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java index 44736018f..040a7bbb4 100644 --- a/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/CronTransportActionTests.java @@ -45,10 +45,10 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class CronTransportActionTests extends AbstractADTest { private CronTransportAction action; private String localNodeID; diff --git a/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java index 1ed63c279..3866ab962 100644 --- a/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/DeleteModelTransportActionTests.java @@ -48,10 +48,10 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class DeleteModelTransportActionTests extends AbstractADTest { private DeleteModelTransportAction action; private String localNodeID; diff --git a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java index 4a6cf1189..4257ba49e 100644 --- a/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/EntityResultTransportActionTests.java @@ -87,13 +87,13 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.transport.TransportService; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + import test.org.opensearch.ad.util.JsonDeserializer; import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; - public class EntityResultTransportActionTests extends AbstractADTest { EntityResultTransportAction entityResult; ActionFilters actionFilters; diff --git a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java index 60144c63c..3f51105dc 100644 --- a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorActionTests.java @@ -12,29 +12,43 @@ package org.opensearch.ad.transport; import java.io.IOException; +import java.util.Collection; +import java.util.List; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import org.junit.runner.RunWith; import org.mockito.Mockito; +import org.opensearch.ad.AnomalyDetectorPlugin; +import org.opensearch.ad.TestHelpers; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorJob; import org.opensearch.ad.model.DetectorProfile; +import org.opensearch.ad.model.Feature; import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.NamedWriteableAwareStreamInput; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.rest.RestStatus; -import org.powermock.api.mockito.PowerMockito; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; +import org.opensearch.plugins.Plugin; +import org.opensearch.test.InternalSettingsPlugin; +import org.opensearch.test.OpenSearchSingleNodeTestCase; -@RunWith(PowerMockRunner.class) -@PrepareForTest(GetAnomalyDetectorResponse.class) -public class GetAnomalyDetectorActionTests { - @Before - public void setUp() throws Exception { +/** + * Need to extend from OpenSearchSingleNodeTestCase and override getPlugins and writeableRegistry + * for testGetResponse. Without it, we will have exception "can't read named writeable from StreamInput" + * when deserializing AnomalyDetector. + * + */ +public class GetAnomalyDetectorActionTests extends OpenSearchSingleNodeTestCase { + @Override + protected Collection> getPlugins() { + return pluginList(InternalSettingsPlugin.class, AnomalyDetectorPlugin.class); + } + @Override + protected NamedWriteableRegistry writableRegistry() { + return getInstanceFromNode(NamedWriteableRegistry.class); } @Test @@ -51,9 +65,9 @@ public void testGetRequest() throws IOException { @Test public void testGetResponse() throws Exception { BytesStreamOutput out = new BytesStreamOutput(); - AnomalyDetector detector = Mockito.mock(AnomalyDetector.class); + Feature feature = TestHelpers.randomFeature(true); + AnomalyDetector detector = TestHelpers.randomAnomalyDetector(List.of(feature)); AnomalyDetectorJob detectorJob = Mockito.mock(AnomalyDetectorJob.class); - Mockito.doNothing().when(detector).writeTo(out); GetAnomalyDetectorResponse response = new GetAnomalyDetectorResponse( 1234, "4567", @@ -71,8 +85,8 @@ public void testGetResponse() throws Exception { false ); response.writeTo(out); - StreamInput input = out.bytes().streamInput(); - PowerMockito.whenNew(AnomalyDetector.class).withAnyArguments().thenReturn(detector); + + NamedWriteableAwareStreamInput input = new NamedWriteableAwareStreamInput(out.bytes().streamInput(), writableRegistry()); GetAnomalyDetectorResponse newResponse = new GetAnomalyDetectorResponse(input); Assert.assertNotNull(newResponse); } diff --git a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java index a2404f72d..7d7e4de47 100644 --- a/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/MultiEntityResultTests.java @@ -15,8 +15,8 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Matchers.argThat; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -134,11 +134,11 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; +import com.google.common.collect.ImmutableList; + import test.org.opensearch.ad.util.MLUtil; import test.org.opensearch.ad.util.RandomModelStateConfig; -import com.google.common.collect.ImmutableList; - public class MultiEntityResultTests extends AbstractADTest { private AnomalyResultTransportAction action; private AnomalyResultRequest request; @@ -584,8 +584,9 @@ private SearchResponse createEmptyResponse() { when(emptyComposite.getName()).thenReturn(CompositeRetriever.AGG_NAME_COMP); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); return new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); @@ -1015,8 +1016,9 @@ public void testNullFeatures() throws InterruptedException { when(emptyComposite.getName()).thenReturn(null); when(emptyComposite.afterKey()).thenReturn(null); // empty bucket - when(emptyComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return new ArrayList(); }); + when(emptyComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return new ArrayList(); + }); Aggregations emptyAggs = new Aggregations(Collections.singletonList(emptyComposite)); SearchResponseSections emptySections = new SearchResponseSections(SearchHits.empty(), emptyAggs, null, false, null, null, 1); SearchResponse nullResponse = new SearchResponse(emptySections, null, 1, 1, 0, 0, ShardSearchFailure.EMPTY_ARRAY, Clusters.EMPTY); @@ -1054,8 +1056,9 @@ public void testRetry() throws IOException, InterruptedException { when(emptyNonNullComposite.afterKey()).thenReturn(attrs3); List emptyNonNullCompositeBuckets = new ArrayList<>(); - when(emptyNonNullComposite.getBuckets()) - .thenAnswer((Answer>) invocation -> { return emptyNonNullCompositeBuckets; }); + when(emptyNonNullComposite.getBuckets()).thenAnswer((Answer>) invocation -> { + return emptyNonNullCompositeBuckets; + }); Aggregations emptyNonNullAggs = new Aggregations(Collections.singletonList(emptyNonNullComposite)); diff --git a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java index f10b5ea49..5a0abbd62 100644 --- a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java @@ -14,7 +14,6 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyObject; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -185,7 +184,7 @@ public void onFailure(Exception e) { ActionListener listener = responseMock.getArgument(3); listener.onResponse(TestHelpers.randomFeatures()); return null; - }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + }).when(featureManager).getPreviewFeatures(any(), anyLong(), anyLong(), any()); action.doExecute(task, request, previewResponse); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } @@ -396,7 +395,7 @@ public void onFailure(Exception e) { ActionListener listener = responseMock.getArgument(3); listener.onResponse(TestHelpers.randomFeatures()); return null; - }).when(featureManager).getPreviewFeatures(anyObject(), anyLong(), anyLong(), any()); + }).when(featureManager).getPreviewFeatures(any(), anyLong(), anyLong(), any()); action.doExecute(task, request, previewResponse); assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); } diff --git a/src/test/java/org/opensearch/ad/transport/ProfileTests.java b/src/test/java/org/opensearch/ad/transport/ProfileTests.java index c6fe1957e..0ece203f6 100644 --- a/src/test/java/org/opensearch/ad/transport/ProfileTests.java +++ b/src/test/java/org/opensearch/ad/transport/ProfileTests.java @@ -42,11 +42,11 @@ import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.test.OpenSearchTestCase; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.JsonArray; import com.google.gson.JsonElement; +import test.org.opensearch.ad.util.JsonDeserializer; + public class ProfileTests extends OpenSearchTestCase { String node1, nodeName1, clusterName; String node2, nodeName2; diff --git a/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java b/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java index 40d9d8e02..df1cc296e 100644 --- a/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java +++ b/src/test/java/org/opensearch/ad/transport/RCFPollingTests.java @@ -55,12 +55,12 @@ import org.opensearch.transport.TransportResponseHandler; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.FakeNode; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import test.org.opensearch.ad.util.FakeNode; +import test.org.opensearch.ad.util.JsonDeserializer; + public class RCFPollingTests extends AbstractADTest { Gson gson = new GsonBuilder().create(); private String detectorId = "jqIG6XIBEyaF3zCMZfcB"; diff --git a/src/test/java/org/opensearch/ad/transport/RCFResultTests.java b/src/test/java/org/opensearch/ad/transport/RCFResultTests.java index 786809ec6..ba6926a75 100644 --- a/src/test/java/org/opensearch/ad/transport/RCFResultTests.java +++ b/src/test/java/org/opensearch/ad/transport/RCFResultTests.java @@ -62,11 +62,11 @@ import org.opensearch.transport.Transport; import org.opensearch.transport.TransportService; -import test.org.opensearch.ad.util.JsonDeserializer; - import com.google.gson.Gson; import com.google.gson.GsonBuilder; +import test.org.opensearch.ad.util.JsonDeserializer; + public class RCFResultTests extends OpenSearchTestCase { Gson gson = new GsonBuilder().create(); diff --git a/src/test/java/org/opensearch/ad/util/ThrottlerTests.java b/src/test/java/org/opensearch/ad/util/ThrottlerTests.java index ed2a6f710..63d060b28 100644 --- a/src/test/java/org/opensearch/ad/util/ThrottlerTests.java +++ b/src/test/java/org/opensearch/ad/util/ThrottlerTests.java @@ -12,7 +12,7 @@ package org.opensearch.ad.util; import static org.mockito.Mockito.mock; -import static org.powermock.api.mockito.PowerMockito.when; +import static org.mockito.Mockito.when; import java.time.Clock;