-
Notifications
You must be signed in to change notification settings - Fork 69
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove blocking calls and change threat intel feed flow to event driven #871
Changes from all commits
58f9727
5dce731
b20270c
f222f41
3bfb29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,12 @@ | |
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.securityanalytics.model.ThreatIntelFeedData; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; | ||
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; | ||
import org.opensearch.securityanalytics.util.IndexUtils; | ||
import org.opensearch.securityanalytics.util.SecurityAnalyticsException; | ||
|
@@ -56,7 +56,6 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
|
@@ -104,21 +103,13 @@ | |
ActionListener<List<ThreatIntelFeedData>> listener | ||
) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we no longer need this top-level try/catch? My observation has been that calls will hang it exceptions are not handled via the ActionListener There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listener framework is event driven and no catch is required as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this code throws an exception, we never make a call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if I am missing something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true. |
||
|
||
String tifdIndex = getLatestIndexByCreationDate(); | ||
if (tifdIndex == null) { | ||
createThreatIntelFeedData(listener); | ||
} else { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
Check warning on line 110 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L110
|
||
} | ||
} catch (InterruptedException e) { | ||
} catch (Exception e) { | ||
Check warning on line 112 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L112
|
||
log.error("Failed to get threat intel feed data", e); | ||
listener.onFailure(e); | ||
} | ||
|
@@ -150,21 +141,16 @@ | |
.mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); | ||
StashedThreadContext.run( | ||
client, | ||
() -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { | ||
@Override | ||
public void onResponse(CreateIndexResponse response) { | ||
if (response.isAcknowledged()) { | ||
listener.onResponse(response); | ||
} else { | ||
onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}) | ||
() -> client.admin().indices().create(createIndexRequest, | ||
ActionListener.wrap( | ||
Check warning on line 145 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L144-L145
|
||
response -> { | ||
if (response.isAcknowledged()) | ||
listener.onResponse(response); | ||
Check warning on line 148 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L148
|
||
else | ||
listener.onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
Check warning on line 150 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L150
|
||
|
||
}, listener::onFailure | ||
Check warning on line 152 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L152
|
||
)) | ||
); | ||
} | ||
|
||
|
@@ -223,28 +209,20 @@ | |
} | ||
bulkRequestList.add(bulkRequest); | ||
|
||
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() { | ||
@Override | ||
public void onResponse(Collection<BulkResponse> bulkResponses) { | ||
int idx = 0; | ||
for (BulkResponse response: bulkResponses) { | ||
BulkRequest request = bulkRequestList.get(idx); | ||
if (response.hasFailures()) { | ||
throw new OpenSearchException( | ||
"error occurred while ingesting threat intel feed data in {} with an error {}", | ||
StringUtils.join(request.getIndices()), | ||
response.buildFailureMessage() | ||
); | ||
} | ||
GroupedActionListener<BulkResponse> bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { | ||
int idx = 0; | ||
Check warning on line 213 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L212-L213
|
||
for (BulkResponse response : bulkResponses) { | ||
BulkRequest request = bulkRequestList.get(idx); | ||
Check warning on line 215 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L215
|
||
if (response.hasFailures()) { | ||
throw new OpenSearchException( | ||
Check warning on line 217 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L217
|
||
"error occurred while ingesting threat intel feed data in {} with an error {}", | ||
StringUtils.join(request.getIndices()), | ||
response.buildFailureMessage() | ||
Check warning on line 220 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L219-L220
|
||
); | ||
} | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}, bulkRequestList.size()); | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); | ||
}, listener::onFailure), bulkRequestList.size()); | ||
Check warning on line 225 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L224-L225
|
||
|
||
for (int i = 0; i < bulkRequestList.size(); ++i) { | ||
saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener); | ||
|
@@ -291,52 +269,47 @@ | |
.prepareDelete(indices.toArray(new String[0])) | ||
.setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) | ||
.setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) | ||
.execute(new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse response) { | ||
if (response.isAcknowledged() == false) { | ||
onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices))); | ||
} | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.error("unknown exception:", e); | ||
} | ||
}) | ||
.execute(ActionListener.wrap( | ||
Check warning on line 272 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L272
|
||
response -> { | ||
if (response.isAcknowledged() == false) { | ||
log.error(new OpenSearchException("failed to delete threat intel feed index[{}]", | ||
String.join(",", indices))); | ||
Check warning on line 276 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L275-L276
|
||
} | ||
}, e -> log.error("failed to delete threat intel feed index [{}]", e) | ||
Check warning on line 278 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L278
|
||
)) | ||
); | ||
} | ||
|
||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException { | ||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) { | ||
client.execute( | ||
PutTIFJobAction.INSTANCE, | ||
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
countDownLatch.countDown(); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
|
||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
ActionListener.wrap( | ||
Check warning on line 287 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L287
|
||
r -> { | ||
if (false == r.isAcknowledged()) { | ||
listener.onFailure(new Exception("Failed to acknowledge Put Tif job action")); | ||
return; | ||
Check warning on line 291 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L290-L291
|
||
} | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
}, e -> { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
Check warning on line 297 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L293-L297
|
||
listener.onFailure(e); | ||
})); | ||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
countDownLatch.countDown(); | ||
} | ||
} | ||
} | ||
Check warning on line 299 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L299
|
||
) | ||
); | ||
countDownLatch.await(); | ||
} | ||
Check warning on line 302 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L302
|
||
|
||
private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
Check warning on line 309 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L305-L309
|
||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
Check warning on line 312 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java Codecov / codecov/patchsrc/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L311-L312
|
||
} | ||
|
||
private String getIndexMapping() { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know why the latches were initially implemented? Seems fine to remove them based on the testing performed but I'm puzzled as to why they would have been added in the first place if they are not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad practice. The right construct to use is a
Countdown
but safer to do it the event-driven way