Skip to content

Commit

Permalink
Add search request timeouts for correlations workflows (opensearch-pr…
Browse files Browse the repository at this point in the history
…oject#893)

* Reinstating more leaks plugged-in for correlations workflows

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Add search timeouts to all correlation searches

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Fix logging and exception messages

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

* Change search timeout to 30 seconds

Signed-off-by: Megha Goyal <goyamegh@amazon.com>

---------

Signed-off-by: Megha Goyal <goyamegh@amazon.com>
  • Loading branch information
goyamegh committed Mar 8, 2024
1 parent 656a5fe commit 75c4429
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.apache.lucene.search.join.ScoreMode;
import org.opensearch.OpenSearchStatusException;
import org.opensearch.cluster.routing.Preference;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.commons.alerting.model.DocLevelQuery;
import org.opensearch.core.action.ActionListener;
import org.opensearch.action.search.MultiSearchRequest;
Expand Down Expand Up @@ -132,6 +133,7 @@ private void generateAutoCorrelations(Detector detector, Finding finding) throws
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(logTypeName));
searchRequest.source(sourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
mSearchRequest.add(searchRequest);
}

Expand Down Expand Up @@ -214,6 +216,7 @@ private void onAutoCorrelations(Detector detector, Finding finding, Map<String,
searchRequest.indices(CorrelationRule.CORRELATION_RULE_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

client.search(searchRequest, ActionListener.wrap(response -> {
if (response.isTimedOut()) {
Expand Down Expand Up @@ -277,6 +280,7 @@ private void getValidDocuments(String detectorType, List<String> indices, List<C
searchRequest.indices(indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

validCorrelationRules.add(rule);
validFields.add(query.get().getField());
Expand Down Expand Up @@ -377,6 +381,7 @@ private void searchFindingsByTimestamp(String detectorType, Map<String, List<Cor
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(categoryToQueries.getKey()));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
mSearchRequest.add(searchRequest);
categoryToQueriesPairs.add(Pair.of(categoryToQueries.getKey(), categoryToQueries.getValue()));
}
Expand Down Expand Up @@ -441,6 +446,7 @@ private void searchDocsWithFilterKeys(String detectorType, Map<String, DocSearch
searchRequest.indices(docSearchCriteria.getValue().indices.toArray(new String[]{}));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

categories.add(docSearchCriteria.getKey());
mSearchRequest.add(searchRequest);
Expand Down Expand Up @@ -502,6 +508,7 @@ private void getCorrelatedFindings(String detectorType, Map<String, List<String>
searchRequest.indices(DetectorMonitorConfig.getAllFindingsIndicesPattern(relatedDocIds.getKey()));
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

categories.add(relatedDocIds.getKey());
mSearchRequest.add(searchRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.opensearch.securityanalytics.transport.TransportCorrelateFindingAction;
import org.opensearch.securityanalytics.util.CorrelationIndices;

import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
Expand Down Expand Up @@ -94,6 +95,7 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

mSearchRequest.add(request);
}
Expand Down Expand Up @@ -195,6 +197,12 @@ public void insertCorrelatedFindings(String detectorType, Finding finding, Strin
}

public void insertOrphanFindings(String detectorType, Finding finding, float timestampFeature, Map<String, CustomLogType> logTypes) {
if (logTypes.get(detectorType) == null ) {
log.debug("Missing detector type {} in the log types index for finding id {}. Keys in the index: {}",
detectorType, finding.getId(), Arrays.toString(logTypes.keySet().toArray()));
onFailure(new OpenSearchStatusException("insertOrphanFindings null log types for detector type: " + detectorType, RestStatus.INTERNAL_SERVER_ERROR));
}

SearchRequest searchRequest = getSearchMetadataIndexRequest(detectorType, finding, logTypes);
Map<String, Object> tags = logTypes.get(detectorType).getTags();
String correlationId = tags.get("correlation_id").toString();
Expand Down Expand Up @@ -251,7 +259,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} else {
Expand Down Expand Up @@ -297,7 +306,8 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException(indexResponse.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} else {
Expand All @@ -323,6 +333,7 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
request.indices(CorrelationIndices.CORRELATION_HISTORY_INDEX_PATTERN_REGEXP);
request.source(searchSourceBuilder);
request.preference(Preference.PRIMARY_FIRST.type());
request.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));

client.search(request, ActionListener.wrap(searchResponse -> {
if (searchResponse.isTimedOut()) {
Expand Down Expand Up @@ -407,6 +418,9 @@ public void insertOrphanFindings(String detectorType, Finding finding, float tim
} catch (Exception ex) {
onFailure(ex);
}
} else {
onFailure(new OpenSearchStatusException("Indexing failed with response {} ",
indexResponse.status(), indexResponse.toString()));
}
}, this::onFailure));
} catch (Exception ex) {
Expand All @@ -432,7 +446,7 @@ private void indexCorrelatedFindings(XContentBuilder builder) {
if (response.status().equals(RestStatus.CREATED)) {
correlateFindingAction.onOperation();
} else {
onFailure(new OpenSearchStatusException(response.toString(), RestStatus.INTERNAL_SERVER_ERROR));
onFailure(new OpenSearchStatusException("Indexing failed with response {} ", response.status(), response.toString()));
}
}, this::onFailure));
}
Expand All @@ -454,6 +468,7 @@ private SearchRequest getSearchMetadataIndexRequest(String detectorType, Finding
searchRequest.indices(CorrelationIndices.CORRELATION_METADATA_INDEX);
searchRequest.source(searchSourceBuilder);
searchRequest.preference(Preference.PRIMARY_FIRST.type());
searchRequest.setCancelAfterTimeInterval(TimeValue.timeValueSeconds(30L));
return searchRequest;
}

Expand Down
Loading

0 comments on commit 75c4429

Please sign in to comment.