diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index 81fc4be38..bdf67ee50 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -289,6 +289,10 @@ public List> getSettings() { SecurityAnalyticsSettings.FINDING_HISTORY_INDEX_MAX_AGE, SecurityAnalyticsSettings.FINDING_HISTORY_ROLLOVER_PERIOD, SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD, + SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS, + SecurityAnalyticsSettings.CORRELATION_HISTORY_INDEX_MAX_AGE, + SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD, + SecurityAnalyticsSettings.CORRELATION_HISTORY_RETENTION_PERIOD, SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING, SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW, SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA, diff --git a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java index bcfc7405e..9a423f6fb 100644 --- a/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java +++ b/src/main/java/org/opensearch/securityanalytics/correlation/VectorEmbeddingsEngine.java @@ -73,7 +73,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -103,7 +103,7 @@ public void onResponse(SearchResponse response) { searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(10000); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -135,14 +135,14 @@ public void onResponse(MultiSearchResponse items) { String correlatedFinding = hitSource.get("finding1").toString(); try { - float[] corrVector = new float[101]; + float[] corrVector = new float[3]; if (counter != prevCounter) { - for (int i = 0; i < 100; ++i) { + for (int i = 0; i < 2; ++i) { corrVector[i] = ((float) counter) - 50.0f; } - corrVector[Integer.parseInt(correlationId)] = (float) counter; - corrVector[100] = timestampFeature; + corrVector[0] = (float) counter; + corrVector[2] = timestampFeature; XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); builder.field("root", false); @@ -156,19 +156,19 @@ public void onResponse(MultiSearchResponse items) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(builder) .timeout(indexTimeout); bulkRequest.add(indexRequest); } - corrVector = new float[101]; - for (int i = 0; i < 100; ++i) { + corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { corrVector[i] = ((float) counter) - 50.0f; } - corrVector[Integer.parseInt(correlationId)] = (2.0f * ((float) counter) - 50.0f) / 2.0f; - corrVector[Integer.parseInt(correlationId)] = (2.0f * ((float) neighborCounter) - 50.0f) / 2.0f; - corrVector[100] = timestampFeature; + corrVector[0] = (2.0f * ((float) counter) - 50.0f) / 2.0f; + corrVector[1] = (2.0f * ((float) neighborCounter) - 50.0f) / 2.0f; + corrVector[2] = timestampFeature; XContentBuilder corrBuilder = XContentFactory.jsonBuilder().startObject(); corrBuilder.field("root", false); @@ -183,7 +183,7 @@ public void onResponse(MultiSearchResponse items) { corrBuilder.field("corrRules", correlationRules); corrBuilder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(corrBuilder) .timeout(indexTimeout); bulkRequest.add(indexRequest); @@ -241,7 +241,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -268,7 +268,7 @@ public void onResponse(SearchResponse response) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) .id(id) .source(builder) .timeout(indexTimeout) @@ -279,9 +279,9 @@ public void onResponse(SearchResponse response) { public void onResponse(IndexResponse response) { if (response.status().equals(RestStatus.OK)) { try { - float[] corrVector = new float[101]; - corrVector[Integer.parseInt(correlationId)] = 50.0f; - corrVector[100] = timestampFeature; + float[] corrVector = new float[3]; + corrVector[0] = 50.0f; + corrVector[2] = timestampFeature; XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); builder.field("root", false); @@ -295,7 +295,7 @@ public void onResponse(IndexResponse response) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(builder) .timeout(indexTimeout) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -338,7 +338,7 @@ public void onFailure(Exception e) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) .id(id) .source(builder) .timeout(indexTimeout) @@ -350,9 +350,9 @@ public void onResponse(IndexResponse response) { if (response.status().equals(RestStatus.OK)) { correlateFindingAction.onOperation(); try { - float[] corrVector = new float[101]; - corrVector[Integer.parseInt(logTypes.get(detectorType).getTags().get("correlation_id").toString())] = 50.0f; - corrVector[100] = timestampFeature; + float[] corrVector = new float[3]; + corrVector[0] = 50.0f; + corrVector[2] = timestampFeature; XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); builder.field("root", false); @@ -366,7 +366,7 @@ public void onResponse(IndexResponse response) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(builder) .timeout(indexTimeout) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -398,11 +398,11 @@ public void onFailure(Exception e) { } }); } else { - float[] query = new float[101]; - for (int i = 0; i < 100; ++i) { + float[] query = new float[3]; + for (int i = 0; i < 2; ++i) { query[i] = (2.0f * ((float) counter) - 50.0f) / 2.0f; } - query[100] = timestampFeature; + query[2] = timestampFeature; CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, 100, QueryBuilders.boolQuery() .mustNot(QueryBuilders.matchQuery( @@ -417,7 +417,7 @@ public void onFailure(Exception e) { searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -439,12 +439,12 @@ public void onResponse(SearchResponse response) { if (totalHits == 0L || existCounter != ((long) (2.0f * ((float) counter) - 50.0f) / 2.0f)) { try { - float[] corrVector = new float[101]; - for (int i = 0; i < 100; ++i) { + float[] corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { corrVector[i] = ((float) counter) - 50.0f; } - corrVector[Integer.parseInt(logTypes.get(detectorType).getTags().get("correlation_id").toString())] = (float) counter; - corrVector[100] = timestampFeature; + corrVector[0] = (float) counter; + corrVector[2] = timestampFeature; XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); builder.field("root", false); @@ -458,7 +458,7 @@ public void onResponse(SearchResponse response) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(builder) .timeout(indexTimeout) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); @@ -493,7 +493,7 @@ public void onFailure(Exception e) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) .id(id) .source(builder) .timeout(indexTimeout) @@ -504,12 +504,12 @@ public void onFailure(Exception e) { public void onResponse(IndexResponse response) { if (response.status().equals(RestStatus.OK)) { try { - float[] corrVector = new float[101]; - for (int i = 0; i < 100; ++i) { + float[] corrVector = new float[3]; + for (int i = 0; i < 2; ++i) { corrVector[i] = (float) counter; } - corrVector[Integer.parseInt(logTypes.get(detectorType).getTags().get("correlation_id").toString())] = counter + 50.0f; - corrVector[100] = timestampFeature; + corrVector[0] = counter + 50.0f; + corrVector[2] = timestampFeature; XContentBuilder builder = XContentFactory.jsonBuilder().startObject(); builder.field("root", false); @@ -523,7 +523,7 @@ public void onResponse(IndexResponse response) { builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) .source(builder) .timeout(indexTimeout) .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); diff --git a/src/main/java/org/opensearch/securityanalytics/indexmanagment/DetectorIndexManagementService.java b/src/main/java/org/opensearch/securityanalytics/indexmanagment/DetectorIndexManagementService.java index a828280a2..f6630499f 100644 --- a/src/main/java/org/opensearch/securityanalytics/indexmanagment/DetectorIndexManagementService.java +++ b/src/main/java/org/opensearch/securityanalytics/indexmanagment/DetectorIndexManagementService.java @@ -35,19 +35,11 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.securityanalytics.config.monitors.DetectorMonitorConfig; import org.opensearch.securityanalytics.logtype.LogTypeService; +import org.opensearch.securityanalytics.util.CorrelationIndices; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ALERT_HISTORY_ENABLED; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ALERT_HISTORY_INDEX_MAX_AGE; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ALERT_HISTORY_MAX_DOCS; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ALERT_HISTORY_RETENTION_PERIOD; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.ALERT_HISTORY_ROLLOVER_PERIOD; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.FINDING_HISTORY_ENABLED; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.FINDING_HISTORY_INDEX_MAX_AGE; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.FINDING_HISTORY_MAX_DOCS; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD; -import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.FINDING_HISTORY_ROLLOVER_PERIOD; +import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.*; public class DetectorIndexManagementService extends AbstractLifecycleComponent implements ClusterStateListener { @@ -65,23 +57,35 @@ public class DetectorIndexManagementService extends AbstractLifecycleComponent i private volatile Long alertHistoryMaxDocs; private volatile Long findingHistoryMaxDocs; + private volatile Long correlationHistoryMaxDocs; + private volatile TimeValue alertHistoryMaxAge; private volatile TimeValue findingHistoryMaxAge; + private volatile TimeValue correlationHistoryMaxAge; + private volatile TimeValue alertHistoryRolloverPeriod; private volatile TimeValue findingHistoryRolloverPeriod; + private volatile TimeValue correlationHistoryRolloverPeriod; + private volatile TimeValue alertHistoryRetentionPeriod; private volatile TimeValue findingHistoryRetentionPeriod; + private volatile TimeValue correlationHistoryRetentionPeriod; + private volatile boolean isClusterManager = false; private Scheduler.Cancellable scheduledAlertsRollover = null; private Scheduler.Cancellable scheduledFindingsRollover = null; + private Scheduler.Cancellable scheduledCorrelationHistoryRollover = null; + List alertHistoryIndices = new ArrayList<>(); List findingHistoryIndices = new ArrayList<>(); + HistoryIndexInfo correlationHistoryIndex = null; + @Inject public DetectorIndexManagementService( Settings settings, @@ -136,6 +140,27 @@ public DetectorIndexManagementService( }); clusterService.getClusterSettings().addSettingsUpdateConsumer(FINDING_HISTORY_RETENTION_PERIOD, this::setFindingHistoryRetentionPeriod); + clusterService.getClusterSettings().addSettingsUpdateConsumer(CORRELATION_HISTORY_MAX_DOCS, maxDocs -> { + setCorrelationHistoryMaxDocs(maxDocs); + if (correlationHistoryIndex != null) { + correlationHistoryIndex.maxDocs = maxDocs; + } + }); + + clusterService.getClusterSettings().addSettingsUpdateConsumer(CORRELATION_HISTORY_INDEX_MAX_AGE, maxAge -> { + setCorrelationHistoryMaxAge(maxAge); + if (correlationHistoryIndex != null) { + correlationHistoryIndex.maxAge = maxAge; + } + }); + + clusterService.getClusterSettings().addSettingsUpdateConsumer(CORRELATION_HISTORY_ROLLOVER_PERIOD, timeValue -> { + DetectorIndexManagementService.this.correlationHistoryRolloverPeriod = timeValue; + rescheduleCorrelationHistoryRollover(); + }); + + clusterService.getClusterSettings().addSettingsUpdateConsumer(CORRELATION_HISTORY_RETENTION_PERIOD, this::setCorrelationHistoryRetentionPeriod); + initFromClusterSettings(); } @@ -178,12 +203,16 @@ private void initFromClusterSettings() { findingHistoryEnabled = FINDING_HISTORY_ENABLED.get(settings); alertHistoryMaxDocs = ALERT_HISTORY_MAX_DOCS.get(settings); findingHistoryMaxDocs = FINDING_HISTORY_MAX_DOCS.get(settings); + correlationHistoryMaxDocs = CORRELATION_HISTORY_MAX_DOCS.get(settings); alertHistoryMaxAge = ALERT_HISTORY_INDEX_MAX_AGE.get(settings); findingHistoryMaxAge = FINDING_HISTORY_INDEX_MAX_AGE.get(settings); + correlationHistoryMaxAge = CORRELATION_HISTORY_INDEX_MAX_AGE.get(settings); alertHistoryRolloverPeriod = ALERT_HISTORY_ROLLOVER_PERIOD.get(settings); findingHistoryRolloverPeriod = FINDING_HISTORY_ROLLOVER_PERIOD.get(settings); + correlationHistoryRolloverPeriod = CORRELATION_HISTORY_ROLLOVER_PERIOD.get(settings); alertHistoryRetentionPeriod = ALERT_HISTORY_RETENTION_PERIOD.get(settings); findingHistoryRetentionPeriod = FINDING_HISTORY_RETENTION_PERIOD.get(settings); + correlationHistoryRetentionPeriod = CORRELATION_HISTORY_RETENTION_PERIOD.get(settings); } @Override @@ -205,6 +234,10 @@ public void clusterChanged(ClusterChangedEvent event) { for (HistoryIndexInfo h : findingHistoryIndices) { h.isInitialized = event.state().metadata().hasAlias(h.indexAlias); } + + if (correlationHistoryIndex != null && correlationHistoryIndex.indexAlias != null) { + correlationHistoryIndex.isInitialized = event.state().metadata().hasAlias(correlationHistoryIndex.indexAlias); + } } private void onMaster() { @@ -213,17 +246,20 @@ private void onMaster() { threadPool.schedule(() -> { rolloverAndDeleteAlertHistoryIndices(); rolloverAndDeleteFindingHistoryIndices(); + rolloverAndDeleteCorrelationHistoryIndices(); }, TimeValue.timeValueSeconds(1), executorName()); // schedule the next rollover for approx MAX_AGE later scheduledAlertsRollover = threadPool .scheduleWithFixedDelay(() -> rolloverAndDeleteAlertHistoryIndices(), alertHistoryRolloverPeriod, executorName()); scheduledFindingsRollover = threadPool .scheduleWithFixedDelay(() -> rolloverAndDeleteFindingHistoryIndices(), findingHistoryRolloverPeriod, executorName()); + scheduledCorrelationHistoryRollover = threadPool + .scheduleWithFixedDelay(() -> rolloverAndDeleteCorrelationHistoryIndices(), correlationHistoryRolloverPeriod, executorName()); } catch (Exception e) { // This should be run on cluster startup logger.error( - "Error creating alert/finding indices. " + - "Alerts/Findings can't be recorded until master node is restarted.", + "Error creating alert/finding/correlation indices. " + + "Alerts/Findings/Correlations can't be recorded until master node is restarted.", e ); } @@ -236,6 +272,9 @@ private void offMaster() { if (scheduledFindingsRollover != null) { scheduledFindingsRollover.cancel(); } + if (scheduledCorrelationHistoryRollover != null) { + scheduledCorrelationHistoryRollover.cancel(); + } } private String executorName() { @@ -284,6 +323,10 @@ private List getIndicesToDelete(ClusterStateResponse clusterStateRespons if (indexToDelete != null) { indicesToDelete.add(indexToDelete); } + indexToDelete = getHistoryIndexToDelete(indexMetaData, correlationHistoryRetentionPeriod.millis(), correlationHistoryIndex != null? List.of(correlationHistoryIndex): List.of(), true); + if (indexToDelete != null) { + indicesToDelete.add(indexToDelete); + } } return indicesToDelete; } @@ -328,7 +371,7 @@ private void deleteAllOldHistoryIndices(List indicesToDelete) { public void onResponse(AcknowledgedResponse deleteIndicesResponse) { if (!deleteIndicesResponse.isAcknowledged()) { logger.error( - "Could not delete one or more Alerting/Finding history indices: [" + indicesToDelete + "]. Retrying one by one." + "Could not delete one or more Alerting/Finding/Correlation history indices: [" + indicesToDelete + "]. Retrying one by one." ); deleteOldHistoryIndex(indicesToDelete); } else { @@ -338,7 +381,7 @@ public void onResponse(AcknowledgedResponse deleteIndicesResponse) { @Override public void onFailure(Exception e) { - logger.error("Delete for Alerting/Finding History Indices failed: [" + indicesToDelete + "]. Retrying one By one."); + logger.error("Delete for Alerting/Finding/Correlation History Indices failed: [" + indicesToDelete + "]. Retrying one By one."); deleteOldHistoryIndex(indicesToDelete); } } @@ -356,7 +399,7 @@ private void deleteOldHistoryIndex(List indicesToDelete) { @Override public void onResponse(AcknowledgedResponse acknowledgedResponse) { if (!acknowledgedResponse.isAcknowledged()) { - logger.error("Could not delete one or more Alerting/Finding history indices: " + index); + logger.error("Could not delete one or more Alerting/Finding/Correlation history indices: " + index); } } @@ -395,6 +438,23 @@ private void rolloverAndDeleteFindingHistoryIndices() { }, e -> {})); } + private void rolloverAndDeleteCorrelationHistoryIndices() { + try { + correlationHistoryIndex = new HistoryIndexInfo( + CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX, + CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN, + CorrelationIndices.correlationMappings(), + correlationHistoryMaxDocs, + correlationHistoryMaxAge, + clusterService.state().metadata().hasAlias(CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX) + ); + rolloverCorrelationHistoryIndices(); + deleteOldIndices("Correlation", CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); + } catch (Exception ex) { + logger.error("failed to construct correlation history index info"); + } + } + private List getAllAlertsIndicesPatternForAllTypes(List logTypes) { return logTypes .stream() @@ -416,7 +476,8 @@ private void rolloverIndex( String pattern, String map, Long docsCondition, - TimeValue ageCondition + TimeValue ageCondition, + Boolean isCorrelation ) { if (!initialized) { return; @@ -426,7 +487,10 @@ private void rolloverIndex( RolloverRequest request = new RolloverRequest(index, null); request.getCreateIndexRequest().index(pattern) .mapping(map) - .settings(Settings.builder().put("index.hidden", true).build()); + .settings(isCorrelation? + Settings.builder().put("index.hidden", true).put("index.correlation", true).build(): + Settings.builder().put("index.hidden", true).build() + ); request.addMaxIndexDocsCondition(docsCondition); request.addMaxIndexAgeCondition(ageCondition); client.admin().indices().rolloverIndex( @@ -452,7 +516,7 @@ private void rolloverAlertHistoryIndices() { rolloverIndex( h.isInitialized, h.indexAlias, h.indexPattern, h.indexMappings, - h.maxDocs, h.maxAge + h.maxDocs, h.maxAge, false ); } } @@ -461,13 +525,27 @@ private void rolloverFindingHistoryIndices() { rolloverIndex( h.isInitialized, h.indexAlias, h.indexPattern, h.indexMappings, - h.maxDocs, h.maxAge + h.maxDocs, h.maxAge, false + ); + } + } + + private void rolloverCorrelationHistoryIndices() { + if (correlationHistoryIndex != null) { + rolloverIndex( + correlationHistoryIndex.isInitialized, + correlationHistoryIndex.indexAlias, + correlationHistoryIndex.indexPattern, + correlationHistoryIndex.indexMappings, + correlationHistoryIndex.maxDocs, + correlationHistoryIndex.maxAge, + true ); } } private void rescheduleAlertRollover() { - if (clusterService.state().getNodes().isLocalNodeElectedMaster()) { + if (clusterService.state().getNodes().isLocalNodeElectedClusterManager()) { if (scheduledAlertsRollover != null) { scheduledAlertsRollover.cancel(); } @@ -477,7 +555,7 @@ private void rescheduleAlertRollover() { } private void rescheduleFindingRollover() { - if (clusterService.state().getNodes().isLocalNodeElectedMaster()) { + if (clusterService.state().getNodes().isLocalNodeElectedClusterManager()) { if (scheduledFindingsRollover != null) { scheduledFindingsRollover.cancel(); } @@ -486,6 +564,16 @@ private void rescheduleFindingRollover() { } } + private void rescheduleCorrelationHistoryRollover() { + if (clusterService.state().getNodes().isLocalNodeElectedClusterManager()) { + if (scheduledCorrelationHistoryRollover != null) { + scheduledCorrelationHistoryRollover.cancel(); + } + scheduledCorrelationHistoryRollover = threadPool + .scheduleWithFixedDelay(() -> rolloverAndDeleteCorrelationHistoryIndices(), correlationHistoryRolloverPeriod, executorName()); + } + } + private String alertMapping() { String alertMapping = null; try ( @@ -528,6 +616,10 @@ public void setFindingHistoryMaxDocs(Long findingHistoryMaxDocs) { this.findingHistoryMaxDocs = findingHistoryMaxDocs; } + public void setCorrelationHistoryMaxDocs(Long correlationHistoryMaxDocs) { + this.correlationHistoryMaxDocs = correlationHistoryMaxDocs; + } + public void setAlertHistoryMaxAge(TimeValue alertHistoryMaxAge) { this.alertHistoryMaxAge = alertHistoryMaxAge; } @@ -536,6 +628,10 @@ public void setFindingHistoryMaxAge(TimeValue findingHistoryMaxAge) { this.findingHistoryMaxAge = findingHistoryMaxAge; } + public void setCorrelationHistoryMaxAge(TimeValue correlationHistoryMaxAge) { + this.correlationHistoryMaxAge = correlationHistoryMaxAge; + } + public void setAlertHistoryRolloverPeriod(TimeValue alertHistoryRolloverPeriod) { this.alertHistoryRolloverPeriod = alertHistoryRolloverPeriod; } @@ -544,6 +640,10 @@ public void setFindingHistoryRolloverPeriod(TimeValue findingHistoryRolloverPeri this.findingHistoryRolloverPeriod = findingHistoryRolloverPeriod; } + public void setCorrelationHistoryRolloverPeriod(TimeValue correlationHistoryRolloverPeriod) { + this.correlationHistoryRolloverPeriod = correlationHistoryRolloverPeriod; + } + public void setAlertHistoryRetentionPeriod(TimeValue alertHistoryRetentionPeriod) { this.alertHistoryRetentionPeriod = alertHistoryRetentionPeriod; } @@ -552,6 +652,10 @@ public void setFindingHistoryRetentionPeriod(TimeValue findingHistoryRetentionPe this.findingHistoryRetentionPeriod = findingHistoryRetentionPeriod; } + public void setCorrelationHistoryRetentionPeriod(TimeValue correlationHistoryRetentionPeriod) { + this.correlationHistoryRetentionPeriod = correlationHistoryRetentionPeriod; + } + public void setClusterManager(boolean clusterManager) { isClusterManager = clusterManager; } @@ -569,6 +673,9 @@ protected void doStop() { if (scheduledFindingsRollover != null) { scheduledFindingsRollover.cancel(); } + if (scheduledCorrelationHistoryRollover != null) { + scheduledCorrelationHistoryRollover.cancel(); + } } @Override @@ -579,6 +686,9 @@ protected void doClose() { if (scheduledFindingsRollover != null) { scheduledFindingsRollover.cancel(); } + if (scheduledCorrelationHistoryRollover != null) { + scheduledCorrelationHistoryRollover.cancel(); + } } private static class HistoryIndexInfo { diff --git a/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java b/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java index f8942e70e..b55548069 100644 --- a/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java +++ b/src/main/java/org/opensearch/securityanalytics/settings/SecurityAnalyticsSettings.java @@ -43,6 +43,12 @@ public class SecurityAnalyticsSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ); + public static final Setting CORRELATION_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( + "plugins.security_analytics.correlation_history_rollover_period", + TimeValue.timeValueHours(12), + Setting.Property.NodeScope, Setting.Property.Dynamic + ); + public static final Setting ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( "plugins.security_analytics.alert_history_max_age", new TimeValue(30, TimeUnit.DAYS), @@ -55,6 +61,12 @@ public class SecurityAnalyticsSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ); + public static final Setting CORRELATION_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( + "plugins.security_analytics.correlation_history_max_age", + new TimeValue(30, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ); + public static final Setting ALERT_HISTORY_MAX_DOCS = Setting.longSetting( "plugins.security_analytics.alert_history_max_docs", 1000L, @@ -69,6 +81,13 @@ public class SecurityAnalyticsSettings { Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated ); + public static final Setting CORRELATION_HISTORY_MAX_DOCS = Setting.longSetting( + "plugins.security_analytics.correlation_history_max_docs", + 1000L, + 0L, + Setting.Property.NodeScope, Setting.Property.Dynamic + ); + public static final Setting ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( "plugins.security_analytics.alert_history_retention_period", new TimeValue(60, TimeUnit.DAYS), @@ -81,6 +100,12 @@ public class SecurityAnalyticsSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ); + public static final Setting CORRELATION_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( + "plugins.security_analytics.correlation_history_retention_period", + new TimeValue(60, TimeUnit.DAYS), + Setting.Property.NodeScope, Setting.Property.Dynamic + ); + public static final Setting REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.security_analytics.request_timeout", TimeValue.timeValueSeconds(10), diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java index ecbff7655..db2b0287d 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportCorrelateFindingAction.java @@ -135,22 +135,52 @@ protected void doExecute(Task task, ActionRequest request, ActionListener() { - @Override - public void onResponse(BulkResponse response) { - if (response.hasFailures()) { - log.error(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); - } + if (IndexUtils.correlationIndexUpdated) { + IndexUtils.lastUpdatedCorrelationHistoryIndex = IndexUtils.getIndexNameWithAlias( + clusterService.state(), + CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX + ); + } - AsyncCorrelateFindingAction correlateFindingAction = new AsyncCorrelateFindingAction(task, transformedRequest, actionListener); - correlateFindingAction.start(); - } + if (!correlationIndices.correlationMetadataIndexExists()) { + try { + correlationIndices.initCorrelationMetadataIndex(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse response) { + if (response.isAcknowledged()) { + IndexUtils.correlationMetadataIndexUpdated(); + + correlationIndices.setupCorrelationIndex(indexTimeout, setupTimestamp, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (response.hasFailures()) { + log.error(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + } + + AsyncCorrelateFindingAction correlateFindingAction = new AsyncCorrelateFindingAction(task, transformedRequest, actionListener); + correlateFindingAction.start(); + } + + @Override + public void onFailure(Exception e) { + log.error(e); + } + }); + } else { + log.error(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR)); + } + } - @Override - public void onFailure(Exception e) { - log.error(e); + @Override + public void onFailure(Exception e) { + + } + }); + } catch (Exception ex) { + onFailure(ex); } - }); + + } } else { log.error(new OpenSearchStatusException("Failed to create correlation Index", RestStatus.INTERNAL_SERVER_ERROR)); } @@ -259,7 +289,7 @@ public void initCorrelationIndex(String detectorType, Map> try { if (!IndexUtils.correlationIndexUpdated) { IndexUtils.updateIndexMapping( - CorrelationIndices.CORRELATION_INDEX, + CorrelationIndices.CORRELATION_HISTORY_WRITE_INDEX, CorrelationIndices.correlationMappings(), clusterService.state(), client.admin().indices(), new ActionListener<>() { @Override @@ -276,7 +306,8 @@ public void onResponse(AcknowledgedResponse response) { public void onFailure(Exception e) { onFailures(e); } - } + }, + true ); } else { getTimestampFeature(detectorType, correlatedFindings, null, correlationRules); @@ -287,84 +318,311 @@ public void onFailure(Exception e) { } public void getTimestampFeature(String detectorType, Map> correlatedFindings, Finding orphanFinding, List correlationRules) { - long findingTimestamp = this.request.getFinding().getTimestamp().toEpochMilli(); - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() - .mustNot(QueryBuilders.termQuery("scoreTimestamp", 0L)); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(queryBuilder); - searchSourceBuilder.fetchSource(true); - searchSourceBuilder.size(1); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); - searchRequest.source(searchSourceBuilder); - searchRequest.preference(Preference.PRIMARY_FIRST.type()); - - client.search(searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse response) { - String id = response.getHits().getHits()[0].getId(); - Map hitSource = response.getHits().getHits()[0].getSourceAsMap(); - long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); - - if (findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL > scoreTimestamp) { - try { - XContentBuilder scoreBuilder = XContentFactory.jsonBuilder().startObject(); - scoreBuilder.field("scoreTimestamp", findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL); - scoreBuilder.field("root", false); - scoreBuilder.endObject(); - - IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) - .id(id) - .source(scoreBuilder) - .timeout(indexTimeout) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); - - client.index(scoreIndexRequest, new ActionListener<>() { - @Override - public void onResponse(IndexResponse response) { - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() - .must(QueryBuilders.existsQuery("source")); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(queryBuilder); - searchSourceBuilder.fetchSource(true); - searchSourceBuilder.size(10000); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); - searchRequest.source(searchSourceBuilder); - - client.search(searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse response) { - if (response.isTimedOut()) { - onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + if (!correlationIndices.correlationMetadataIndexExists()) { + try { + correlationIndices.initCorrelationMetadataIndex(new ActionListener<>() { + @Override + public void onResponse(CreateIndexResponse response) { + if (response.isAcknowledged()) { + IndexUtils.correlationMetadataIndexUpdated(); + + correlationIndices.setupCorrelationIndex(indexTimeout, setupTimestamp, new ActionListener<>() { + @Override + public void onResponse(BulkResponse response) { + if (response.hasFailures()) { + log.error(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR)); + } + + long findingTimestamp = request.getFinding().getTimestamp().toEpochMilli(); + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("scoreTimestamp", 0L)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); + searchRequest.source(searchSourceBuilder); + searchRequest.preference(Preference.PRIMARY_FIRST.type()); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + String id = response.getHits().getHits()[0].getId(); + Map hitSource = response.getHits().getHits()[0].getSourceAsMap(); + long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); + + if (findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL > scoreTimestamp) { + try { + XContentBuilder scoreBuilder = XContentFactory.jsonBuilder().startObject(); + scoreBuilder.field("scoreTimestamp", findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL); + scoreBuilder.field("root", false); + scoreBuilder.endObject(); + + IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) + .id(id) + .source(scoreBuilder) + .timeout(indexTimeout) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.index(scoreIndexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("source")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(10000); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + } + + SearchHit[] hits = response.getHits().getHits(); + Map logTypes = new HashMap<>(); + for (SearchHit hit : hits) { + Map sourceMap = hit.getSourceAsMap(); + logTypes.put(sourceMap.get("name").toString(), + new CustomLogType(sourceMap)); + } + + if (correlatedFindings != null) { + if (correlatedFindings.isEmpty()) { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + } + for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { + vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), + Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); + } + } else { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } catch (Exception ex) { + onFailures(ex); + } + } else { + float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); + + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("source")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(10000); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + } + + SearchHit[] hits = response.getHits().getHits(); + Map logTypes = new HashMap<>(); + for (SearchHit hit : hits) { + Map sourceMap = hit.getSourceAsMap(); + logTypes.put(sourceMap.get("name").toString(), + new CustomLogType(sourceMap)); + } + + if (correlatedFindings != null) { + if (correlatedFindings.isEmpty()) { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), timestampFeature, logTypes); + } + for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { + vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), + timestampFeature, correlationRules, logTypes); + } + } else { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, timestampFeature, logTypes); + } + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } } - SearchHit[] hits = response.getHits().getHits(); - Map logTypes = new HashMap<>(); - for (SearchHit hit : hits) { - Map sourceMap = hit.getSourceAsMap(); - logTypes.put(sourceMap.get("name").toString(), - new CustomLogType(sourceMap)); + @Override + public void onFailure(Exception e) { + onFailures(e); } + }); + } + + @Override + public void onFailure(Exception e) { + log.error(e); + } + }); + } else { + log.error(new OpenSearchStatusException("Failed to create correlation metadata Index", RestStatus.INTERNAL_SERVER_ERROR)); + } + } + + @Override + public void onFailure(Exception e) { + + } + }); + } catch (Exception ex) { + onFailures(ex); + } + } else { + long findingTimestamp = this.request.getFinding().getTimestamp().toEpochMilli(); + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .mustNot(QueryBuilders.termQuery("scoreTimestamp", 0L)); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(1); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); + searchRequest.source(searchSourceBuilder); + searchRequest.preference(Preference.PRIMARY_FIRST.type()); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + String id = response.getHits().getHits()[0].getId(); + Map hitSource = response.getHits().getHits()[0].getSourceAsMap(); + long scoreTimestamp = (long) hitSource.get("scoreTimestamp"); + + if (findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL > scoreTimestamp) { + try { + XContentBuilder scoreBuilder = XContentFactory.jsonBuilder().startObject(); + scoreBuilder.field("scoreTimestamp", findingTimestamp - CorrelationIndices.FIXED_HISTORICAL_INTERVAL); + scoreBuilder.field("root", false); + scoreBuilder.endObject(); + + IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_METADATA_INDEX) + .id(id) + .source(scoreBuilder) + .timeout(indexTimeout) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE); + + client.index(scoreIndexRequest, new ActionListener<>() { + @Override + public void onResponse(IndexResponse response) { + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("source")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(10000); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + } - if (correlatedFindings != null) { - if (correlatedFindings.isEmpty()) { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + SearchHit[] hits = response.getHits().getHits(); + Map logTypes = new HashMap<>(); + for (SearchHit hit : hits) { + Map sourceMap = hit.getSourceAsMap(); + logTypes.put(sourceMap.get("name").toString(), + new CustomLogType(sourceMap)); } - for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { - vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), - Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); + + if (correlatedFindings != null) { + if (correlatedFindings.isEmpty()) { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); + } + for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { + vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), + Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), correlationRules, logTypes); + } + } else { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); } - } else { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, Long.valueOf(CorrelationIndices.FIXED_HISTORICAL_INTERVAL / 1000L).floatValue(), logTypes); } - } - @Override - public void onFailure(Exception e) { - onFailures(e); + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } + + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } catch (Exception ex) { + onFailures(ex); + } + } else { + float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); + + BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("source")); + SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); + searchSourceBuilder.query(queryBuilder); + searchSourceBuilder.fetchSource(true); + searchSourceBuilder.size(10000); + SearchRequest searchRequest = new SearchRequest(); + searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); + searchRequest.source(searchSourceBuilder); + + client.search(searchRequest, new ActionListener<>() { + @Override + public void onResponse(SearchResponse response) { + if (response.isTimedOut()) { + onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); + } + + SearchHit[] hits = response.getHits().getHits(); + Map logTypes = new HashMap<>(); + for (SearchHit hit : hits) { + Map sourceMap = hit.getSourceAsMap(); + logTypes.put(sourceMap.get("name").toString(), + new CustomLogType(sourceMap)); + } + + if (correlatedFindings != null) { + if (correlatedFindings.isEmpty()) { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), timestampFeature, logTypes); + } + for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { + vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), + timestampFeature, correlationRules, logTypes); } - }); + } else { + vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, timestampFeature, logTypes); + } } @Override @@ -372,63 +630,15 @@ public void onFailure(Exception e) { onFailures(e); } }); - } catch (Exception ex) { - onFailures(ex); } - } else { - float timestampFeature = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); - - BoolQueryBuilder queryBuilder = QueryBuilders.boolQuery() - .must(QueryBuilders.existsQuery("source")); - SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); - searchSourceBuilder.query(queryBuilder); - searchSourceBuilder.fetchSource(true); - searchSourceBuilder.size(10000); - SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(LogTypeService.LOG_TYPE_INDEX); - searchRequest.source(searchSourceBuilder); - - client.search(searchRequest, new ActionListener<>() { - @Override - public void onResponse(SearchResponse response) { - if (response.isTimedOut()) { - onFailures(new OpenSearchStatusException("Search request timed out", RestStatus.REQUEST_TIMEOUT)); - } - - SearchHit[] hits = response.getHits().getHits(); - Map logTypes = new HashMap<>(); - for (SearchHit hit : hits) { - Map sourceMap = hit.getSourceAsMap(); - logTypes.put(sourceMap.get("name").toString(), - new CustomLogType(sourceMap)); - } - - if (correlatedFindings != null) { - if (correlatedFindings.isEmpty()) { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, request.getFinding(), timestampFeature, logTypes); - } - for (Map.Entry> correlatedFinding : correlatedFindings.entrySet()) { - vectorEmbeddingsEngine.insertCorrelatedFindings(detectorType, request.getFinding(), correlatedFinding.getKey(), correlatedFinding.getValue(), - timestampFeature, correlationRules, logTypes); - } - } else { - vectorEmbeddingsEngine.insertOrphanFindings(detectorType, orphanFinding, timestampFeature, logTypes); - } - } - - @Override - public void onFailure(Exception e) { - onFailures(e); - } - }); } - } - @Override - public void onFailure(Exception e) { - onFailures(e); - } - }); + @Override + public void onFailure(Exception e) { + onFailures(e); + } + }); + } } public void onOperation() { diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCorrelationRuleAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCorrelationRuleAction.java index 1bb8f6f73..9d056f9f5 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCorrelationRuleAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCorrelationRuleAction.java @@ -123,7 +123,8 @@ public void onResponse(AcknowledgedResponse response) { public void onFailure(Exception e) { onFailures(e); } - } + }, + false ); } else { indexCorrelationRule(); diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCustomLogTypeAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCustomLogTypeAction.java index cf296a96c..137c05243 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCustomLogTypeAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexCustomLogTypeAction.java @@ -197,7 +197,8 @@ public void onResponse(AcknowledgedResponse response) { public void onFailure(Exception e) { onFailures(e); } - }); + }, false + ); } else { prepareCustomLogTypeIndexing(); } diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index 3b05e6a40..e6dea9947 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -1041,7 +1041,8 @@ public void onResponse(AcknowledgedResponse response) { public void onFailure(Exception e) { onFailures(e); } - } + }, + false ); } else { prepareDetectorIndexing(); diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexRuleAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexRuleAction.java index b1fc12825..32fc4a6d6 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexRuleAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexRuleAction.java @@ -172,7 +172,8 @@ public void onResponse(AcknowledgedResponse response) { public void onFailure(Exception e) { onFailures(e); } - } + }, + false ); } else { prepareRuleIndexing(); diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java index da2d7b5bb..61b010828 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportListCorrelationAction.java @@ -111,7 +111,7 @@ void start() { searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(10000); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java index 596f16c2a..738371eb0 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchCorrelationAction.java @@ -133,7 +133,7 @@ public void onResponse(SearchResponse response) { scoreSearchSourceBuilder.fetchSource(true); scoreSearchSourceBuilder.size(1); SearchRequest scoreSearchRequest = new SearchRequest(); - scoreSearchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + scoreSearchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX); scoreSearchRequest.source(scoreSearchSourceBuilder); scoreSearchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -156,7 +156,7 @@ public void onResponse(SearchResponse response) { searchSourceBuilder.fetchField("counter"); searchSourceBuilder.size(1); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); @@ -168,11 +168,11 @@ public void onResponse(SearchResponse response) { for (SearchHit hit: hits) { long counter = hit.getFields().get("counter").getValue(); - float[] query = new float[101]; - for (int i = 0; i < 100; ++i) { + float[] query = new float[3]; + for (int i = 0; i < 2; ++i) { query[i] = (2.0f * ((float) counter) - 50.0f) / 2.0f; } - query[100] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); + query[2] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue(); CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, noOfNearbyFindings, QueryBuilders.boolQuery() .mustNot(QueryBuilders.matchQuery( @@ -188,7 +188,7 @@ public void onResponse(SearchResponse response) { searchSourceBuilder.fetchSource(true); searchSourceBuilder.size(noOfNearbyFindings); SearchRequest searchRequest = new SearchRequest(); - searchRequest.indices(CorrelationIndices.CORRELATION_INDEX); + searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP); searchRequest.source(searchSourceBuilder); searchRequest.preference(Preference.PRIMARY_FIRST.type()); diff --git a/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java b/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java index efb96a45f..02229a57c 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java +++ b/src/main/java/org/opensearch/securityanalytics/util/CorrelationIndices.java @@ -7,6 +7,7 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.core.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; @@ -33,7 +34,13 @@ public class CorrelationIndices { private static final Logger log = LogManager.getLogger(CorrelationIndices.class); - public static final String CORRELATION_INDEX = ".opensearch-sap-correlation-history"; + + public static final String CORRELATION_METADATA_INDEX = ".opensearch-sap-correlation-metadata"; + public static final String CORRELATION_HISTORY_INDEX_PATTERN = "<.opensearch-sap-correlation-history-{now/d}-1>"; + + public static final String CORRELATION_HISTORY_INDEX_PATTERN_REGEXP = ".opensearch-sap-correlation-history*"; + + public static final String CORRELATION_HISTORY_WRITE_INDEX = ".opensearch-sap-correlation-history-write"; public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L; private final Client client; @@ -51,16 +58,35 @@ public static String correlationMappings() throws IOException { public void initCorrelationIndex(ActionListener actionListener) throws IOException { if (!correlationIndexExists()) { - CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_INDEX) + CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_HISTORY_INDEX_PATTERN) + .mapping(correlationMappings()) + .settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build()); + indexRequest.alias(new Alias(CORRELATION_HISTORY_WRITE_INDEX)); + client.admin().indices().create(indexRequest, actionListener); + } else { + actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_HISTORY_INDEX_PATTERN)); + } + } + + public void initCorrelationMetadataIndex(ActionListener actionListener) throws IOException { + if (!correlationMetadataIndexExists()) { + CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_METADATA_INDEX) .mapping(correlationMappings()) .settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build()); client.admin().indices().create(indexRequest, actionListener); + } else { + actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_METADATA_INDEX)); } } public boolean correlationIndexExists() { ClusterState clusterState = clusterService.state(); - return clusterState.getRoutingTable().hasIndex(CORRELATION_INDEX); + return clusterState.metadata().hasAlias(CORRELATION_HISTORY_WRITE_INDEX); + } + + public boolean correlationMetadataIndexExists() { + ClusterState clusterState = clusterService.state(); + return clusterState.metadata().hasIndex(CORRELATION_METADATA_INDEX); } public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, ActionListener listener) { @@ -76,7 +102,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A builder.field("scoreTimestamp", 0L); builder.endObject(); - IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest indexRequest = new IndexRequest(CORRELATION_METADATA_INDEX) .source(builder) .timeout(indexTimeout); @@ -85,7 +111,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A scoreBuilder.field("root", false); scoreBuilder.endObject(); - IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX) + IndexRequest scoreIndexRequest = new IndexRequest(CORRELATION_METADATA_INDEX) .source(scoreBuilder) .timeout(indexTimeout); @@ -100,16 +126,4 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A log.error(ex); } } - - public ClusterIndexHealth correlationIndexHealth() { - ClusterIndexHealth indexHealth = null; - - if (correlationIndexExists()) { - IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(CORRELATION_INDEX); - IndexMetadata indexMetadata = clusterService.state().metadata().index(CORRELATION_INDEX); - - indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable); - } - return indexHealth; - } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java b/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java index 1632b2188..ce358591e 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java +++ b/src/main/java/org/opensearch/securityanalytics/util/IndexUtils.java @@ -4,7 +4,10 @@ */ package org.opensearch.securityanalytics.util; +import java.util.Optional; import java.util.SortedMap; + +import org.opensearch.cluster.metadata.Metadata; import org.opensearch.core.action.ActionListener; import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest; import org.opensearch.action.support.IndicesOptions; @@ -24,6 +27,7 @@ import java.util.Locale; import java.util.Map; import java.util.Objects; +import java.util.function.Predicate; public class IndexUtils { @@ -35,6 +39,10 @@ public class IndexUtils { public static Boolean customRuleIndexUpdated = false; public static Boolean prePackagedRuleIndexUpdated = false; public static Boolean correlationIndexUpdated = false; + + public static Boolean correlationMetadataIndexUpdated = false; + + public static String lastUpdatedCorrelationHistoryIndex = null; public static Boolean correlationRuleIndexUpdated = false; public static Boolean customLogTypeIndexUpdated = false; @@ -53,6 +61,10 @@ public static void prePackagedRuleIndexUpdated() { public static void correlationIndexUpdated() { correlationIndexUpdated = true; } + public static void correlationMetadataIndexUpdated() { + correlationMetadataIndexUpdated = true; + } + public static void correlationRuleIndexUpdated() { correlationRuleIndexUpdated = true; } @@ -112,11 +124,20 @@ public static void updateIndexMapping( String mapping, ClusterState clusterState, IndicesAdminClient client, - ActionListener actionListener + ActionListener actionListener, + boolean alias ) throws IOException { - if (clusterState.metadata().indices().containsKey(index)) { - if (shouldUpdateIndex(clusterState.metadata().index(index), mapping)) { - PutMappingRequest putMappingRequest = new PutMappingRequest(index).source(mapping, XContentType.JSON); + String targetIndex = index; + if (alias) { + targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index); + } + if (targetIndex.equals(IndexUtils.lastUpdatedCorrelationHistoryIndex)) { + return; + } + + if (clusterState.metadata().indices().containsKey(targetIndex)) { + if (shouldUpdateIndex(clusterState.metadata().index(targetIndex), mapping)) { + PutMappingRequest putMappingRequest = new PutMappingRequest(targetIndex).source(mapping, XContentType.JSON); client.putMapping(putMappingRequest, actionListener); } else { actionListener.onResponse(new AcknowledgedResponse(true)); @@ -176,4 +197,11 @@ public static String getNewIndexByCreationDate(ClusterState state, IndexNameExpr return getNewestIndexByCreationDate(strings, state); } + public static String getIndexNameWithAlias(ClusterState clusterState, String alias) { + Optional> entry = clusterState.metadata().indices().entrySet().stream().filter( + stringIndexMetadataEntry -> stringIndexMetadataEntry.getValue().getAliases().containsKey(alias) + ).findFirst(); + return entry.map(Map.Entry::getKey).orElse(null); + } + } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/util/RuleIndices.java b/src/main/java/org/opensearch/securityanalytics/util/RuleIndices.java index 65762c57f..0cb57e97c 100644 --- a/src/main/java/org/opensearch/securityanalytics/util/RuleIndices.java +++ b/src/main/java/org/opensearch/securityanalytics/util/RuleIndices.java @@ -174,7 +174,8 @@ public void initPrepackagedRulesIndex(ActionListener create IndexUtils.updateIndexMapping( Rule.PRE_PACKAGED_RULES_INDEX, RuleIndices.ruleMappings(), clusterService.state(), client.admin().indices(), - updateListener + updateListener, + false ); } else { countRules(searchListener); diff --git a/src/main/resources/mappings/correlation.json b/src/main/resources/mappings/correlation.json index 9ba3292db..5d7dd8867 100644 --- a/src/main/resources/mappings/correlation.json +++ b/src/main/resources/mappings/correlation.json @@ -1,6 +1,6 @@ { "_meta" : { - "schema_version": 1 + "schema_version": 2 }, "properties": { "root": { @@ -17,7 +17,7 @@ }, "corr_vector": { "type": "sa_vector", - "dimension": 101, + "dimension": 3, "correlation_ctx": { "similarityFunction": "EUCLIDEAN", "parameters": { diff --git a/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsRestTestCase.java b/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsRestTestCase.java index 1c8770677..9a4e43704 100644 --- a/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsRestTestCase.java +++ b/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsRestTestCase.java @@ -65,6 +65,7 @@ import org.opensearch.securityanalytics.model.Detector; import org.opensearch.securityanalytics.model.Rule; import org.opensearch.securityanalytics.model.ThreatIntelFeedData; +import org.opensearch.securityanalytics.util.CorrelationIndices; import org.opensearch.test.rest.OpenSearchRestTestCase; @@ -1488,6 +1489,24 @@ public List getFindingIndices(String detectorType) throws IOException { return indices; } + public List getCorrelationHistoryIndices() throws IOException { + Response response = client().performRequest(new Request("GET", "/_cat/indices/" + CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP + "?format=json")); + XContentParser xcp = createParser(XContentType.JSON.xContent(), response.getEntity().getContent()); + List responseList = xcp.list(); + List indices = new ArrayList<>(); + for (Object o : responseList) { + if (o instanceof Map) { + ((Map) o).forEach((BiConsumer) + (o1, o2) -> { + if (o1.equals("index")) { + indices.add((String) o2); + } + }); + } + } + return indices; + } + public void updateClusterSetting(String setting, String value) throws IOException { String settingJson = "{\n" + " \"persistent\" : {" + diff --git a/src/test/java/org/opensearch/securityanalytics/alerts/AlertsIT.java b/src/test/java/org/opensearch/securityanalytics/alerts/AlertsIT.java index 04f17d867..fbd091595 100644 --- a/src/test/java/org/opensearch/securityanalytics/alerts/AlertsIT.java +++ b/src/test/java/org/opensearch/securityanalytics/alerts/AlertsIT.java @@ -593,7 +593,6 @@ public void testGetAlerts_byDetectorType_multipleDetectors_success() throws IOEx } - @Ignore public void testAlertHistoryRollover_maxAge() throws IOException, InterruptedException { updateClusterSetting(ALERT_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); updateClusterSetting(ALERT_HISTORY_MAX_DOCS.getKey(), "1000"); @@ -664,7 +663,6 @@ public void testAlertHistoryRollover_maxAge() throws IOException, InterruptedExc restoreAlertsFindingsIMSettings(); } - @Ignore public void testAlertHistoryRollover_maxAge_low_retention() throws IOException, InterruptedException { updateClusterSetting(ALERT_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); updateClusterSetting(ALERT_HISTORY_MAX_DOCS.getKey(), "1000"); @@ -745,7 +743,6 @@ public void testAlertHistoryRollover_maxAge_low_retention() throws IOException, restoreAlertsFindingsIMSettings(); } - @Ignore public void testAlertHistoryRollover_maxDocs() throws IOException, InterruptedException { updateClusterSetting(ALERT_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); updateClusterSetting(ALERT_HISTORY_MAX_DOCS.getKey(), "1"); @@ -829,7 +826,6 @@ public void testAlertHistoryRollover_maxDocs() throws IOException, InterruptedEx restoreAlertsFindingsIMSettings(); } - @Ignore public void testGetAlertsFromAllIndices() throws IOException, InterruptedException { updateClusterSetting(ALERT_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); updateClusterSetting(ALERT_HISTORY_MAX_DOCS.getKey(), "1"); diff --git a/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java index 225cebb8c..b511bb9c1 100644 --- a/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/correlation/CorrelationEngineRestApiIT.java @@ -17,6 +17,8 @@ import org.opensearch.securityanalytics.model.DetectorInput; import org.opensearch.securityanalytics.model.DetectorRule; import org.opensearch.securityanalytics.model.DetectorTrigger; +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.securityanalytics.util.CorrelationIndices; import java.io.IOException; import java.util.Collections; @@ -30,7 +32,7 @@ public class CorrelationEngineRestApiIT extends SecurityAnalyticsRestTestCase { @SuppressWarnings("unchecked") - public void testBasicCorrelationEngineWorkflow() throws IOException { + public void testBasicCorrelationEngineWorkflow() throws IOException, InterruptedException { LogIndices indices = createIndices(); String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); @@ -79,14 +81,31 @@ public void testBasicCorrelationEngineWorkflow() throws IOException { Map getFindingsBody = entityAsMap(getFindingsResponse); String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); - List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); - Assert.assertEquals(2, correlatedFindings.size()); - Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); - - for (var correlatedFinding: correlatedFindings) { - if (correlatedFinding.get("detector_type").equals("network")) { - Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); - Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 2) { + Assert.assertTrue(true); + + Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); + + for (var correlatedFinding: correlatedFindings) { + if (correlatedFinding.get("detector_type").equals("network")) { + Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); + Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + } + } + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; } } } @@ -116,19 +135,198 @@ public void testListCorrelationsWorkflow() throws IOException, InterruptedExcept Assert.assertEquals(1, noOfSigmaRuleMatches); Thread.sleep(5000); - Long endTime = System.currentTimeMillis(); - Request request = new Request("GET", "/_plugins/_security_analytics/correlations?start_timestamp=" + startTime + "&end_timestamp=" + endTime); - Response response = client().performRequest(request); + int count = 0; + while (true) { + try { + Long endTime = System.currentTimeMillis(); + Request request = new Request("GET", "/_plugins/_security_analytics/correlations?start_timestamp=" + startTime + "&end_timestamp=" + endTime); + Response response = client().performRequest(request); + + Map responseMap = entityAsMap(response); + List results = (List) responseMap.get("findings"); + if (results.size() == 1) { + Assert.assertTrue(true); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } + } + + @SuppressWarnings("unchecked") + public void testBasicCorrelationEngineWorkflowWithoutRules() throws IOException, InterruptedException { + LogIndices indices = createIndices(); + + String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); + String adLdapMonitorId = createAdLdapDetector(indices.adLdapLogsIndex); + String testWindowsMonitorId = createTestWindowsDetector(indices.windowsIndex); + String appLogsMonitorId = createAppLogsDetector(indices.appLogsIndex); + String s3MonitorId = createS3Detector(indices.s3AccessLogsIndex); + + indexDoc(indices.adLdapLogsIndex, "22", randomAdLdapDoc()); + Response executeResponse = executeAlertingMonitor(adLdapMonitorId, Collections.emptyMap()); + Map executeResults = entityAsMap(executeResponse); + int noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + + indexDoc(indices.windowsIndex, "2", randomDoc()); + executeResponse = executeAlertingMonitor(testWindowsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(5, noOfSigmaRuleMatches); + + indexDoc(indices.appLogsIndex, "4", randomAppLogDoc()); + executeResponse = executeAlertingMonitor(appLogsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + + indexDoc(indices.s3AccessLogsIndex, "5", randomS3AccessLogDoc()); + executeResponse = executeAlertingMonitor(s3MonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + + indexDoc(indices.vpcFlowsIndex, "1", randomVpcFlowDoc()); + executeResponse = executeAlertingMonitor(vpcFlowMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "test_windows"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); - Assert.assertEquals(200, response.getStatusLine().getStatusCode()); - Map responseMap = entityAsMap(response); - List results = (List) responseMap.get("findings"); - Assert.assertEquals(1, results.size()); + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 2) { + Assert.assertTrue(true); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } } @SuppressWarnings("unchecked") - public void testBasicCorrelationEngineWorkflowWithoutRules() throws IOException { + public void testBasicCorrelationEngineWorkflowWithRolloverByMaxAge() throws IOException, InterruptedException { + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_INDEX_MAX_AGE.getKey(), "1s"); + + LogIndices indices = createIndices(); + + String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); + String adLdapMonitorId = createAdLdapDetector(indices.adLdapLogsIndex); + String testWindowsMonitorId = createTestWindowsDetector(indices.windowsIndex); + String appLogsMonitorId = createAppLogsDetector(indices.appLogsIndex); + String s3MonitorId = createS3Detector(indices.s3AccessLogsIndex); + + String ruleId = createNetworkToAdLdapToWindowsRule(indices); + createWindowsToAppLogsToS3LogsRule(indices); + + indexDoc(indices.adLdapLogsIndex, "22", randomAdLdapDoc()); + Response executeResponse = executeAlertingMonitor(adLdapMonitorId, Collections.emptyMap()); + Map executeResults = entityAsMap(executeResponse); + int noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.windowsIndex, "2", randomDoc()); + executeResponse = executeAlertingMonitor(testWindowsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(5, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.appLogsIndex, "4", randomAppLogDoc()); + executeResponse = executeAlertingMonitor(appLogsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.s3AccessLogsIndex, "5", randomS3AccessLogDoc()); + executeResponse = executeAlertingMonitor(s3MonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.vpcFlowsIndex, "1", randomVpcFlowDoc()); + executeResponse = executeAlertingMonitor(vpcFlowMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "test_windows"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); + Thread.sleep(1000L); + + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 2) { + Assert.assertTrue(true); + + Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); + + for (var correlatedFinding: correlatedFindings) { + if (correlatedFinding.get("detector_type").equals("network")) { + Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); + Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + } + } + + List correlationIndices = getCorrelationHistoryIndices(); + while (correlationIndices.size() < 2) { + correlationIndices = getCorrelationHistoryIndices(); + Thread.sleep(1000); + } + Assert.assertTrue("Did not find more then 2 correlation indices", correlationIndices.size() >= 2); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } + } + + public void testBasicCorrelationEngineWorkflowWithRolloverByMaxDoc() throws IOException, InterruptedException { + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS.getKey(), "1"); + LogIndices indices = createIndices(); String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); @@ -137,35 +335,43 @@ public void testBasicCorrelationEngineWorkflowWithoutRules() throws IOException String appLogsMonitorId = createAppLogsDetector(indices.appLogsIndex); String s3MonitorId = createS3Detector(indices.s3AccessLogsIndex); + String ruleId = createNetworkToAdLdapToWindowsRule(indices); + createWindowsToAppLogsToS3LogsRule(indices); + indexDoc(indices.adLdapLogsIndex, "22", randomAdLdapDoc()); Response executeResponse = executeAlertingMonitor(adLdapMonitorId, Collections.emptyMap()); Map executeResults = entityAsMap(executeResponse); int noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); indexDoc(indices.windowsIndex, "2", randomDoc()); executeResponse = executeAlertingMonitor(testWindowsMonitorId, Collections.emptyMap()); executeResults = entityAsMap(executeResponse); noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); Assert.assertEquals(5, noOfSigmaRuleMatches); + Thread.sleep(1000L); indexDoc(indices.appLogsIndex, "4", randomAppLogDoc()); executeResponse = executeAlertingMonitor(appLogsMonitorId, Collections.emptyMap()); executeResults = entityAsMap(executeResponse); noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); indexDoc(indices.s3AccessLogsIndex, "5", randomS3AccessLogDoc()); executeResponse = executeAlertingMonitor(s3MonitorId, Collections.emptyMap()); executeResults = entityAsMap(executeResponse); noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); indexDoc(indices.vpcFlowsIndex, "1", randomVpcFlowDoc()); executeResponse = executeAlertingMonitor(vpcFlowMonitorId, Collections.emptyMap()); executeResults = entityAsMap(executeResponse); noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); // Call GetFindings API Map params = new HashMap<>(); @@ -173,9 +379,145 @@ public void testBasicCorrelationEngineWorkflowWithoutRules() throws IOException Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); Map getFindingsBody = entityAsMap(getFindingsResponse); String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); + Thread.sleep(1000L); + + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 2) { + Assert.assertTrue(true); + + Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); + + for (var correlatedFinding: correlatedFindings) { + if (correlatedFinding.get("detector_type").equals("network")) { + Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); + Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + } + } + + List correlationIndices = getCorrelationHistoryIndices(); + while (correlationIndices.size() < 2) { + correlationIndices = getCorrelationHistoryIndices(); + Thread.sleep(1000); + } + Assert.assertTrue("Did not find more then 2 correlation indices", correlationIndices.size() >= 2); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } + } + + public void testBasicCorrelationEngineWorkflowWithRolloverByMaxDocAndShortRetention() throws IOException, InterruptedException { + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS.getKey(), "1"); + + LogIndices indices = createIndices(); + + String vpcFlowMonitorId = createVpcFlowDetector(indices.vpcFlowsIndex); + String adLdapMonitorId = createAdLdapDetector(indices.adLdapLogsIndex); + String testWindowsMonitorId = createTestWindowsDetector(indices.windowsIndex); + String appLogsMonitorId = createAppLogsDetector(indices.appLogsIndex); + String s3MonitorId = createS3Detector(indices.s3AccessLogsIndex); - List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); - Assert.assertEquals(2, correlatedFindings.size()); + String ruleId = createNetworkToAdLdapToWindowsRule(indices); + createWindowsToAppLogsToS3LogsRule(indices); + + indexDoc(indices.adLdapLogsIndex, "22", randomAdLdapDoc()); + Response executeResponse = executeAlertingMonitor(adLdapMonitorId, Collections.emptyMap()); + Map executeResults = entityAsMap(executeResponse); + int noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.windowsIndex, "2", randomDoc()); + executeResponse = executeAlertingMonitor(testWindowsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(5, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.appLogsIndex, "4", randomAppLogDoc()); + executeResponse = executeAlertingMonitor(appLogsMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.s3AccessLogsIndex, "5", randomS3AccessLogDoc()); + executeResponse = executeAlertingMonitor(s3MonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(0, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + indexDoc(indices.vpcFlowsIndex, "1", randomVpcFlowDoc()); + executeResponse = executeAlertingMonitor(vpcFlowMonitorId, Collections.emptyMap()); + executeResults = entityAsMap(executeResponse); + noOfSigmaRuleMatches = ((List>) ((Map) executeResults.get("input_results")).get("results")).get(0).size(); + Assert.assertEquals(1, noOfSigmaRuleMatches); + Thread.sleep(1000L); + + // Call GetFindings API + Map params = new HashMap<>(); + params.put("detectorType", "test_windows"); + Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null); + Map getFindingsBody = entityAsMap(getFindingsResponse); + String finding = ((List>) getFindingsBody.get("findings")).get(0).get("id").toString(); + Thread.sleep(1000L); + + int count = 0; + while (true) { + try { + List> correlatedFindings = searchCorrelatedFindings(finding, "test_windows", 300000L, 10); + if (correlatedFindings.size() == 2) { + Assert.assertTrue(true); + + Assert.assertTrue(correlatedFindings.get(0).get("rules") instanceof List); + + for (var correlatedFinding: correlatedFindings) { + if (correlatedFinding.get("detector_type").equals("network")) { + Assert.assertEquals(1, ((List) correlatedFinding.get("rules")).size()); + Assert.assertTrue(((List) correlatedFinding.get("rules")).contains(ruleId)); + } + } + + List correlationIndices = getCorrelationHistoryIndices(); + while (correlationIndices.size() < 2) { + correlationIndices = getCorrelationHistoryIndices(); + Thread.sleep(1000); + } + Assert.assertTrue("Did not find more then 2 correlation indices", correlationIndices.size() >= 2); + + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_RETENTION_PERIOD.getKey(), "1s"); + updateClusterSetting(SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS.getKey(), "1000"); + + while (correlationIndices.size() != 1) { + correlationIndices = getCorrelationHistoryIndices(); + Thread.sleep(1000); + } + Assert.assertTrue("Found more than 1 correlation indices", correlationIndices.size() == 1); + break; + } + } catch (Exception ex) { + // suppress ex + } + ++count; + Thread.sleep(5000); + if (count >= 12) { + Assert.assertTrue(false); + break; + } + } } private LogIndices createIndices() throws IOException { diff --git a/src/test/java/org/opensearch/securityanalytics/findings/FindingIT.java b/src/test/java/org/opensearch/securityanalytics/findings/FindingIT.java index c69bb2e00..3b7ca3c0a 100644 --- a/src/test/java/org/opensearch/securityanalytics/findings/FindingIT.java +++ b/src/test/java/org/opensearch/securityanalytics/findings/FindingIT.java @@ -265,7 +265,6 @@ public void testGetFindings_byDetectorType_success() throws IOException { Assert.assertEquals(1, getFindingsBody.get("total_findings")); } - @Ignore public void testGetFindings_rolloverByMaxAge_success() throws IOException, InterruptedException { updateClusterSetting(FINDING_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); @@ -336,7 +335,6 @@ public void testGetFindings_rolloverByMaxAge_success() throws IOException, Inter restoreAlertsFindingsIMSettings(); } - @Ignore public void testGetFindings_rolloverByMaxDoc_success() throws IOException, InterruptedException { updateClusterSetting(FINDING_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); @@ -402,7 +400,6 @@ public void testGetFindings_rolloverByMaxDoc_success() throws IOException, Inter restoreAlertsFindingsIMSettings(); } - @Ignore public void testGetFindings_rolloverByMaxDoc_short_retention_success() throws IOException, InterruptedException { updateClusterSetting(FINDING_HISTORY_ROLLOVER_PERIOD.getKey(), "1s"); updateClusterSetting(FINDING_HISTORY_MAX_DOCS.getKey(), "1"); diff --git a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java index 2059fb191..40a55388b 100644 --- a/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java +++ b/src/test/java/org/opensearch/securityanalytics/resthandler/DetectorRestApiIT.java @@ -169,7 +169,6 @@ public void testCreatingADetector() throws IOException { Assert.assertEquals(5, noOfSigmaRuleMatches); } - @Ignore public void testCreatingADetectorScheduledJobFinding() throws IOException, InterruptedException { String index = createTestIndex(randomIndex(), windowsIndexMapping());