Skip to content

Commit

Permalink
add rollover & archival mechanism for correlation history indices (#670)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Oct 31, 2023
1 parent 0dd9787 commit 24e94b4
Show file tree
Hide file tree
Showing 20 changed files with 1,009 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ public List<Setting<?>> getSettings() {
SecurityAnalyticsSettings.FINDING_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.FINDING_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_MAX_DOCS,
SecurityAnalyticsSettings.CORRELATION_HISTORY_INDEX_MAX_AGE,
SecurityAnalyticsSettings.CORRELATION_HISTORY_ROLLOVER_PERIOD,
SecurityAnalyticsSettings.CORRELATION_HISTORY_RETENTION_PERIOD,
SecurityAnalyticsSettings.IS_CORRELATION_INDEX_SETTING,
SecurityAnalyticsSettings.CORRELATION_TIME_WINDOW,
SecurityAnalyticsSettings.DEFAULT_MAPPING_SCHEMA,
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_rollover_period",
TimeValue.timeValueHours(12),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Expand All @@ -55,6 +61,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_max_age",
new TimeValue(30, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<Long> ALERT_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.alert_history_max_docs",
1000L,
Expand All @@ -69,6 +81,13 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated
);

public static final Setting<Long> CORRELATION_HISTORY_MAX_DOCS = Setting.longSetting(
"plugins.security_analytics.correlation_history_max_docs",
1000L,
0L,
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.alert_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Expand All @@ -81,6 +100,12 @@ public class SecurityAnalyticsSettings {
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> CORRELATION_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting(
"plugins.security_analytics.correlation_history_retention_period",
new TimeValue(60, TimeUnit.DAYS),
Setting.Property.NodeScope, Setting.Property.Dynamic
);

public static final Setting<TimeValue> REQUEST_TIMEOUT = Setting.positiveTimeSetting(
"plugins.security_analytics.request_timeout",
TimeValue.timeValueSeconds(10),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
indexCorrelationRule();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
});
}, false
);
} else {
prepareCustomLogTypeIndexing();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1041,7 +1041,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareDetectorIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,8 @@ public void onResponse(AcknowledgedResponse response) {
public void onFailure(Exception e) {
onFailures(e);
}
}
},
false
);
} else {
prepareRuleIndexing();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void start() {
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(10000);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void onResponse(SearchResponse response) {
scoreSearchSourceBuilder.fetchSource(true);
scoreSearchSourceBuilder.size(1);
SearchRequest scoreSearchRequest = new SearchRequest();
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
scoreSearchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
scoreSearchRequest.source(scoreSearchSourceBuilder);
scoreSearchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -156,7 +156,7 @@ public void onResponse(SearchResponse response) {
searchSourceBuilder.fetchField("counter");
searchSourceBuilder.size(1);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand All @@ -168,11 +168,11 @@ public void onResponse(SearchResponse response) {

for (SearchHit hit: hits) {
long counter = hit.getFields().get("counter").<Long>getValue();
float[] query = new float[101];
for (int i = 0; i < 100; ++i) {
float[] query = new float[3];
for (int i = 0; i < 2; ++i) {
query[i] = (2.0f * ((float) counter) - 50.0f) / 2.0f;
}
query[100] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();
query[2] = Long.valueOf((findingTimestamp - scoreTimestamp) / 1000L).floatValue();

CorrelationQueryBuilder correlationQueryBuilder = new CorrelationQueryBuilder("corr_vector", query, noOfNearbyFindings, QueryBuilders.boolQuery()
.mustNot(QueryBuilders.matchQuery(
Expand All @@ -188,7 +188,7 @@ public void onResponse(SearchResponse response) {
searchSourceBuilder.fetchSource(true);
searchSourceBuilder.size(noOfNearbyFindings);
SearchRequest searchRequest = new SearchRequest();
searchRequest.indices(CorrelationIndices.CORRELATION_INDEX);
searchRequest.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.admin.indices.alias.Alias;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
Expand All @@ -33,7 +34,13 @@
public class CorrelationIndices {

private static final Logger log = LogManager.getLogger(CorrelationIndices.class);
public static final String CORRELATION_INDEX = ".opensearch-sap-correlation-history";

public static final String CORRELATION_METADATA_INDEX = ".opensearch-sap-correlation-metadata";
public static final String CORRELATION_HISTORY_INDEX_PATTERN = "<.opensearch-sap-correlation-history-{now/d}-1>";

public static final String CORRELATION_HISTORY_INDEX_PATTERN_REGEXP = ".opensearch-sap-correlation-history*";

public static final String CORRELATION_HISTORY_WRITE_INDEX = ".opensearch-sap-correlation-history-write";
public static final long FIXED_HISTORICAL_INTERVAL = 24L * 60L * 60L * 20L * 1000L;

private final Client client;
Expand All @@ -51,16 +58,35 @@ public static String correlationMappings() throws IOException {

public void initCorrelationIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_INDEX)
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_HISTORY_INDEX_PATTERN)
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
indexRequest.alias(new Alias(CORRELATION_HISTORY_WRITE_INDEX));
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_HISTORY_INDEX_PATTERN));
}
}

public void initCorrelationMetadataIndex(ActionListener<CreateIndexResponse> actionListener) throws IOException {
if (!correlationMetadataIndexExists()) {
CreateIndexRequest indexRequest = new CreateIndexRequest(CORRELATION_METADATA_INDEX)
.mapping(correlationMappings())
.settings(Settings.builder().put("index.hidden", true).put("index.correlation", true).build());
client.admin().indices().create(indexRequest, actionListener);
} else {
actionListener.onResponse(new CreateIndexResponse(true, true, CORRELATION_METADATA_INDEX));
}
}

public boolean correlationIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.getRoutingTable().hasIndex(CORRELATION_INDEX);
return clusterState.metadata().hasAlias(CORRELATION_HISTORY_WRITE_INDEX);
}

public boolean correlationMetadataIndexExists() {
ClusterState clusterState = clusterService.state();
return clusterState.metadata().hasIndex(CORRELATION_METADATA_INDEX);
}

public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, ActionListener<BulkResponse> listener) {
Expand All @@ -76,7 +102,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
builder.field("scoreTimestamp", 0L);
builder.endObject();

IndexRequest indexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest indexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)
.source(builder)
.timeout(indexTimeout);

Expand All @@ -85,7 +111,7 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
scoreBuilder.field("root", false);
scoreBuilder.endObject();

IndexRequest scoreIndexRequest = new IndexRequest(CorrelationIndices.CORRELATION_INDEX)
IndexRequest scoreIndexRequest = new IndexRequest(CORRELATION_METADATA_INDEX)
.source(scoreBuilder)
.timeout(indexTimeout);

Expand All @@ -100,16 +126,4 @@ public void setupCorrelationIndex(TimeValue indexTimeout, Long setupTimestamp, A
log.error(ex);
}
}

public ClusterIndexHealth correlationIndexHealth() {
ClusterIndexHealth indexHealth = null;

if (correlationIndexExists()) {
IndexRoutingTable indexRoutingTable = clusterService.state().routingTable().index(CORRELATION_INDEX);
IndexMetadata indexMetadata = clusterService.state().metadata().index(CORRELATION_INDEX);

indexHealth = new ClusterIndexHealth(indexMetadata, indexRoutingTable);
}
return indexHealth;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
*/
package org.opensearch.securityanalytics.util;

import java.util.Optional;
import java.util.SortedMap;

import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.opensearch.action.support.IndicesOptions;
Expand All @@ -24,6 +27,7 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.function.Predicate;

public class IndexUtils {

Expand All @@ -35,6 +39,10 @@ public class IndexUtils {
public static Boolean customRuleIndexUpdated = false;
public static Boolean prePackagedRuleIndexUpdated = false;
public static Boolean correlationIndexUpdated = false;

public static Boolean correlationMetadataIndexUpdated = false;

public static String lastUpdatedCorrelationHistoryIndex = null;
public static Boolean correlationRuleIndexUpdated = false;

public static Boolean customLogTypeIndexUpdated = false;
Expand All @@ -53,6 +61,10 @@ public static void prePackagedRuleIndexUpdated() {

public static void correlationIndexUpdated() { correlationIndexUpdated = true; }

public static void correlationMetadataIndexUpdated() {
correlationMetadataIndexUpdated = true;
}

public static void correlationRuleIndexUpdated() {
correlationRuleIndexUpdated = true;
}
Expand Down Expand Up @@ -112,11 +124,20 @@ public static void updateIndexMapping(
String mapping,
ClusterState clusterState,
IndicesAdminClient client,
ActionListener<AcknowledgedResponse> actionListener
ActionListener<AcknowledgedResponse> actionListener,
boolean alias
) throws IOException {
if (clusterState.metadata().indices().containsKey(index)) {
if (shouldUpdateIndex(clusterState.metadata().index(index), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(index).source(mapping, XContentType.JSON);
String targetIndex = index;
if (alias) {
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index);
}
if (targetIndex.equals(IndexUtils.lastUpdatedCorrelationHistoryIndex)) {
return;
}

if (clusterState.metadata().indices().containsKey(targetIndex)) {
if (shouldUpdateIndex(clusterState.metadata().index(targetIndex), mapping)) {
PutMappingRequest putMappingRequest = new PutMappingRequest(targetIndex).source(mapping, XContentType.JSON);
client.putMapping(putMappingRequest, actionListener);
} else {
actionListener.onResponse(new AcknowledgedResponse(true));
Expand Down Expand Up @@ -176,4 +197,11 @@ public static String getNewIndexByCreationDate(ClusterState state, IndexNameExpr
return getNewestIndexByCreationDate(strings, state);
}

public static String getIndexNameWithAlias(ClusterState clusterState, String alias) {
Optional<Map.Entry<String, IndexMetadata>> entry = clusterState.metadata().indices().entrySet().stream().filter(
stringIndexMetadataEntry -> stringIndexMetadataEntry.getValue().getAliases().containsKey(alias)
).findFirst();
return entry.map(Map.Entry::getKey).orElse(null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ public void initPrepackagedRulesIndex(ActionListener<CreateIndexResponse> create
IndexUtils.updateIndexMapping(
Rule.PRE_PACKAGED_RULES_INDEX,
RuleIndices.ruleMappings(), clusterService.state(), client.admin().indices(),
updateListener
updateListener,
false
);
} else {
countRules(searchListener);
Expand Down
4 changes: 2 additions & 2 deletions src/main/resources/mappings/correlation.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 1
"schema_version": 2
},
"properties": {
"root": {
Expand All @@ -17,7 +17,7 @@
},
"corr_vector": {
"type": "sa_vector",
"dimension": 101,
"dimension": 3,
"correlation_ctx": {
"similarityFunction": "EUCLIDEAN",
"parameters": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
import org.opensearch.securityanalytics.model.Detector;
import org.opensearch.securityanalytics.model.Rule;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.util.CorrelationIndices;
import org.opensearch.test.rest.OpenSearchRestTestCase;


Expand Down Expand Up @@ -1488,6 +1489,24 @@ public List<String> getFindingIndices(String detectorType) throws IOException {
return indices;
}

public List<String> getCorrelationHistoryIndices() throws IOException {
Response response = client().performRequest(new Request("GET", "/_cat/indices/" + CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP + "?format=json"));
XContentParser xcp = createParser(XContentType.JSON.xContent(), response.getEntity().getContent());
List<Object> responseList = xcp.list();
List<String> indices = new ArrayList<>();
for (Object o : responseList) {
if (o instanceof Map) {
((Map<?, ?>) o).forEach((BiConsumer<Object, Object>)
(o1, o2) -> {
if (o1.equals("index")) {
indices.add((String) o2);
}
});
}
}
return indices;
}

public void updateClusterSetting(String setting, String value) throws IOException {
String settingJson = "{\n" +
" \"persistent\" : {" +
Expand Down
Loading

0 comments on commit 24e94b4

Please sign in to comment.