Skip to content

Commit

Permalink
Upgrade rcf to 4.0 (#1173)
Browse files Browse the repository at this point in the history
This PR upgrades rcf to 4.0 as it has bug fixes and support for streaming imputation mode.

Testing done:
1. gradle build

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Mar 26, 2024
1 parent 8ca1d0d commit 3034784
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 72 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/backport.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ on:

jobs:
backport:
if: github.event.pull_request.merged == true
runs-on: ubuntu-latest
permissions:
contents: write
Expand All @@ -25,4 +26,5 @@ jobs:
uses: VachaShah/backport@v2.2.0
with:
github_token: ${{ steps.github_app_token.outputs.token }}
branch_name: backport/backport-${{ github.event.number }}
head_template: backport/backport-<%= number %>-to-<%= base %>
failure_labels: backport-failed
2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Build and Run Tests
run: |
Expand Down
8 changes: 4 additions & 4 deletions .github/workflows/test_build_multi_platform.yml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ jobs:

- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down Expand Up @@ -85,13 +85,13 @@ jobs:
java-version: ${{ matrix.java }}

- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Assemble / build / mavenlocal / integTest
run: |
chown -R 1000:1000 `pwd`
su `id -un 1000` -c "./gradlew assemble &&
./gradlew build &&
./gradlew build -x spotlessJava &&
./gradlew publishToMavenLocal &&
./gradlew integTest -PnumNodes=3"
- name: Upload Coverage Report
Expand Down Expand Up @@ -127,7 +127,7 @@ jobs:
./gradlew assemble
- name: Build and Run Tests
run: |
./gradlew build
./gradlew build -x spotlessJava
- name: Publish to Maven Local
run: |
./gradlew publishToMavenLocal
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_bwc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:

# anomaly-detection
- name: Checkout AD
uses: actions/checkout@v4
uses: actions/checkout@v3

- name: Assemble anomaly-detection
run: |
Expand Down
23 changes: 13 additions & 10 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.10.0"
bwcVersionShort = "2.14.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.12.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:4.0.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:4.0.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.16.1"
Expand All @@ -149,6 +149,9 @@ dependencies {
exclude group: 'org.ow2.asm', module: 'asm-tree'
}

// used for output encoding of config descriptions
implementation group: 'org.owasp.encoder' , name: 'encoder', version: '1.2.3'

testImplementation group: 'pl.pragmatists', name: 'JUnitParams', version: '1.1.1'
testImplementation group: 'org.mockito', name: 'mockito-core', version: '5.9.0'
testImplementation group: 'org.objenesis', name: 'objenesis', version: '3.3'
Expand Down Expand Up @@ -538,7 +541,7 @@ List<Provider<RegularFile>> plugins = [

// Creates 2 test clusters with 3 nodes of the old version.
2.times {i ->
task "${baseName}#oldVersionClusterTask$i"(type: StandaloneRestIntegTestTask) {
task "${baseName}#oldVersionClusterTask$i"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}$i"
filter {
includeTestsMatching "org.opensearch.ad.bwc.*IT"
Expand All @@ -554,7 +557,7 @@ List<Provider<RegularFile>> plugins = [
// Upgrades one node of the old cluster to new OpenSearch version with upgraded plugin version
// This results in a mixed cluster with 2 nodes on the old version and 1 upgraded node.
// This is also used as a one third upgraded cluster for a rolling upgrade.
task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#mixedClusterTask"(type: RestIntegTestTask) {
useCluster testClusters."${baseName}0"
dependsOn "${baseName}#oldVersionClusterTask0"
doFirst {
Expand All @@ -573,7 +576,7 @@ task "${baseName}#mixedClusterTask"(type: StandaloneRestIntegTestTask) {
// Upgrades the second node to new OpenSearch version with upgraded plugin version after the first node is upgraded.
// This results in a mixed cluster with 1 node on the old version and 2 upgraded nodes.
// This is used for rolling upgrade.
task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#twoThirdsUpgradedClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#mixedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -592,7 +595,7 @@ task "${baseName}#twoThirdsUpgradedClusterTask"(type: StandaloneRestIntegTestTas
// Upgrades the third node to new OpenSearch version with upgraded plugin version after the second node is upgraded.
// This results in a fully upgraded cluster.
// This is used for rolling upgrade.
task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#rollingUpgradeClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#twoThirdsUpgradedClusterTask"
useCluster testClusters."${baseName}0"
doFirst {
Expand All @@ -611,7 +614,7 @@ task "${baseName}#rollingUpgradeClusterTask"(type: StandaloneRestIntegTestTask)

// Upgrades all the nodes of the old cluster to new OpenSearch version with upgraded plugin version
// at the same time resulting in a fully upgraded cluster.
task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
task "${baseName}#fullRestartClusterTask"(type: RestIntegTestTask) {
dependsOn "${baseName}#oldVersionClusterTask1"
useCluster testClusters."${baseName}1"
doFirst {
Expand All @@ -627,7 +630,7 @@ task "${baseName}#fullRestartClusterTask"(type: StandaloneRestIntegTestTask) {
}

// A bwc test suite which runs all the bwc tasks combined.
task bwcTestSuite(type: StandaloneRestIntegTestTask) {
task bwcTestSuite(type: RestIntegTestTask) {
exclude '**/*Test*'
exclude '**/*IT*'
dependsOn tasks.named("${baseName}#mixedClusterTask")
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,8 @@ private Optional<ThresholdedRandomCutForest> convertToTRCF(Optional<RandomCutFor
if (kllThreshold.isPresent()) {
scores = kllThreshold.get().extractScores();
}
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores));
// last parameter is lastShingledInput. Since we don't know it, use all 0 double array
return Optional.of(new ThresholdedRandomCutForest(rcf.get(), anomalyRate, scores, new double[rcf.get().getDimensions()]));
}

/**
Expand Down
26 changes: 13 additions & 13 deletions src/main/java/org/opensearch/ad/task/ADTaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,7 @@
import static org.opensearch.ad.constant.ADCommonMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR;
import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX;
import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN;
import static org.opensearch.ad.model.ADTask.COORDINATING_NODE_FIELD;
import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD;
import static org.opensearch.ad.model.ADTask.ERROR_FIELD;
import static org.opensearch.ad.model.ADTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.INIT_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.IS_LATEST_FIELD;
import static org.opensearch.ad.model.ADTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.ad.model.ADTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.ad.model.ADTask.STATE_FIELD;
import static org.opensearch.ad.model.ADTask.STOPPED_BY_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_PROGRESS_FIELD;
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES;
import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES;
Expand All @@ -52,6 +39,19 @@
import static org.opensearch.timeseries.constant.CommonName.TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TaskState.NOT_ENDED_STATES;
import static org.opensearch.timeseries.model.TaskType.taskTypeToString;
import static org.opensearch.timeseries.model.TimeSeriesTask.COORDINATING_NODE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ERROR_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.ESTIMATED_MINUTES_LEFT_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_END_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.EXECUTION_START_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.INIT_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.IS_LATEST_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.LAST_UPDATE_TIME_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.PARENT_TASK_ID_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STATE_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.STOPPED_BY_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_PROGRESS_FIELD;
import static org.opensearch.timeseries.model.TimeSeriesTask.TASK_TYPE_FIELD;
import static org.opensearch.timeseries.settings.TimeSeriesSettings.NUM_MIN_SAMPLES;
import static org.opensearch.timeseries.util.ExceptionUtil.getErrorMessage;
import static org.opensearch.timeseries.util.ExceptionUtil.getShardsFailure;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ public void testBackwardsCompatibility() throws Exception {
case MIXED:
// TODO: We have no way to specify whether send request to old node or new node now.
// Add more test later when it's possible to specify request node.
Assert.assertTrue(pluginNames.contains("opensearch-anomaly-detection"));
Assert.assertTrue(pluginNames.contains("opensearch-time-series-analytics"));
Assert.assertTrue(pluginNames.contains("opensearch-job-scheduler"));

// Create single entity detector and start realtime job
Expand Down
59 changes: 28 additions & 31 deletions src/test/java/org/opensearch/ad/ml/CheckpointDaoTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -1067,27 +1067,22 @@ public void testDeserializeTRCFModel() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(4.814651669367903);
scores.add(5.566968073093689);
scores.add(5.919907610660049);
scores.add(5.770278090352401);
scores.add(5.319779117320102);

List<Double> grade = new ArrayList<>();
grade.add(1.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
grade.add(0.0);
scores.add(5.052069275347555);
scores.add(6.117465704461799);
scores.add(6.6401649744661055);
scores.add(6.918514609476484);
scores.add(6.928318158276434);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1133,21 +1128,22 @@ public void testDeserialize_rcf3_rc3_single_stream_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data on RCF4.0. RCF4.0 changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(3.3830441158587066);
scores.add(2.825961659490065);
scores.add(2.4685871670647384);
scores.add(2.3123460886413647);
scores.add(2.1401987653477135);
scores.add(3.678754481587072);
scores.add(3.6809634269790252);
scores.add(3.683659822587799);
scores.add(3.6852688612219646);
scores.add(3.6859330728661064);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down Expand Up @@ -1190,21 +1186,22 @@ public void testDeserialize_rcf3_rc3_hc_model() throws Exception {
coldStartData.add(sample4);
coldStartData.add(sample5);

// This scores were generated with the sample data but on RCF3.0-rc1 and we are comparing them
// to the scores generated by the imported RCF3.0-rc2.1
// This scores were generated with the sample data but on RCF4.0 that changed implementation
// and we are seeing different rcf scores between 4.0 and 3.8. This is verified by switching
// rcf version between 3.8 and 4.0 while other code in AD unchanged. But we get different scores.
List<Double> scores = new ArrayList<>();
scores.add(1.86645896573027);
scores.add(1.8760247712797833);
scores.add(1.6809181763279901);
scores.add(1.7126716645678555);
scores.add(1.323776514074674);
scores.add(2.119532552959117);
scores.add(2.7347456872746325);
scores.add(3.066704948143919);
scores.add(3.2965580521876725);
scores.add(3.1888920146607047);

// rcf 3.8 has a number of improvements on thresholder and predictor corrector.
// We don't expect the results have the same anomaly grade.
for (int i = 0; i < coldStartData.size(); i++) {
forest.process(coldStartData.get(i), 0);
AnomalyDescriptor descriptor = forest.process(coldStartData.get(i), 0);
assertEquals(descriptor.getRCFScore(), scores.get(i), 1e-9);
assertEquals(scores.get(i), descriptor.getRCFScore(), 1e-9);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ public void testAccuracyOneMinuteIntervalNoInterpolation() throws Exception {
clusterService
);

accuracyTemplate(1, 0.6f, 0.6f);
accuracyTemplate(1, 0.5f, 0.5f);
}

private ModelState<EntityModel> createStateForCacheRelease() {
Expand Down
25 changes: 19 additions & 6 deletions src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.ShardSearchFailure;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.ADUnitTestCase;
import org.opensearch.ad.cluster.HashRing;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.mock.model.MockSimpleLog;
Expand All @@ -89,6 +88,7 @@
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.rest.handler.IndexAnomalyDetectorJobActionHandler;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.stats.InternalStatNames;
import org.opensearch.ad.transport.ADStatsNodeResponse;
import org.opensearch.ad.transport.ADStatsNodesResponse;
Expand Down Expand Up @@ -120,6 +120,7 @@
import org.opensearch.search.aggregations.InternalAggregations;
import org.opensearch.search.internal.InternalSearchResponse;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.AbstractTimeSeriesTest;
import org.opensearch.timeseries.TestHelpers;
import org.opensearch.timeseries.common.exception.DuplicateTaskException;
import org.opensearch.timeseries.constant.CommonName;
Expand All @@ -139,7 +140,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

public class ADTaskManagerTests extends ADUnitTestCase {
public class ADTaskManagerTests extends AbstractTimeSeriesTest {

private Settings settings;
private Client client;
Expand Down Expand Up @@ -1447,10 +1448,22 @@ public void testForwardRequestToLeadNodeWithNotExistingNode() throws IOException
@SuppressWarnings("unchecked")
public void testScaleTaskLaneOnCoordinatingNode() {
ADTask adTask = mock(ADTask.class);
when(adTask.getCoordinatingNode()).thenReturn(node1.getId());
when(nodeFilter.getEligibleDataNodes()).thenReturn(new DiscoveryNode[] { node1, node2 });
ActionListener<JobResponse> listener = mock(ActionListener.class);
adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, transportService, listener);
try {
// bring up real transport service as mockito cannot mock final method
// and transportService.sendRequest is called. A lot of null pointer
// exception will be thrown if we use mocked transport service.
setUpThreadPool(ADTaskManagerTests.class.getSimpleName());
setupTestNodes(AnomalyDetectorSettings.AD_MAX_ENTITIES_PER_QUERY, AnomalyDetectorSettings.AD_PAGE_SIZE);
when(adTask.getCoordinatingNode()).thenReturn(testNodes[1].getNodeId());
when(nodeFilter.getEligibleDataNodes())
.thenReturn(new DiscoveryNode[] { testNodes[0].discoveryNode(), testNodes[1].discoveryNode() });
ActionListener<JobResponse> listener = mock(ActionListener.class);

adTaskManager.scaleTaskLaneOnCoordinatingNode(adTask, 2, testNodes[1].transportService, listener);
} finally {
tearDownTestNodes();
tearDownThreadPool();
}
}

@SuppressWarnings("unchecked")
Expand Down

0 comments on commit 3034784

Please sign in to comment.