-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Remove blocking calls and change threat intel feed flow to event driven #871
Changes from 3 commits
58f9727
5dce731
b20270c
f222f41
3bfb29d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -34,12 +34,12 @@ | |
import org.opensearch.core.xcontent.ToXContent; | ||
import org.opensearch.core.xcontent.XContentBuilder; | ||
import org.opensearch.securityanalytics.model.ThreatIntelFeedData; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; | ||
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; | ||
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; | ||
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; | ||
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; | ||
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; | ||
import org.opensearch.securityanalytics.util.IndexUtils; | ||
import org.opensearch.securityanalytics.util.SecurityAnalyticsException; | ||
|
@@ -56,7 +56,6 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import java.util.Optional; | ||
import java.util.concurrent.CountDownLatch; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
import java.util.stream.Collectors; | ||
|
@@ -104,21 +103,13 @@ | |
ActionListener<List<ThreatIntelFeedData>> listener | ||
) { | ||
try { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we no longer need this top-level try/catch? My observation has been that calls will hang it exceptions are not handled via the ActionListener There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. listener framework is event driven and no catch is required as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If this code throws an exception, we never make a call to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Let me know if I am missing something There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. true. |
||
|
||
String tifdIndex = getLatestIndexByCreationDate(); | ||
if (tifdIndex == null) { | ||
createThreatIntelFeedData(listener); | ||
} else { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
Check warning on line 110 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
} | ||
} catch (InterruptedException e) { | ||
} catch (Exception e) { | ||
Check warning on line 112 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
log.error("Failed to get threat intel feed data", e); | ||
listener.onFailure(e); | ||
} | ||
|
@@ -307,36 +298,35 @@ | |
); | ||
} | ||
|
||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException { | ||
CountDownLatch countDownLatch = new CountDownLatch(1); | ||
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) { | ||
client.execute( | ||
PutTIFJobAction.INSTANCE, | ||
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), | ||
new ActionListener<>() { | ||
@Override | ||
public void onResponse(AcknowledgedResponse acknowledgedResponse) { | ||
log.debug("Acknowledged threat intel feed updater job created"); | ||
countDownLatch.countDown(); | ||
String tifdIndex = getLatestIndexByCreationDate(); | ||
|
||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); | ||
Check warning on line 310 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
} | ||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
log.debug("Failed to create threat intel feed updater job", e); | ||
countDownLatch.countDown(); | ||
} | ||
} | ||
); | ||
countDownLatch.await(); | ||
} | ||
Check warning on line 319 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
|
||
private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) { | ||
SearchRequest searchRequest = new SearchRequest(tifdIndex); | ||
searchRequest.source().size(9999); //TODO: convert to scroll | ||
String finalTifdIndex = tifdIndex; | ||
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { | ||
log.error(String.format( | ||
Check warning on line 326 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); | ||
listener.onFailure(e); | ||
})); | ||
Check warning on line 329 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java
|
||
} | ||
|
||
private String getIndexMapping() { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,6 +9,7 @@ | |
import org.apache.logging.log4j.Logger; | ||
import org.opensearch.OpenSearchStatusException; | ||
import org.opensearch.ResourceAlreadyExistsException; | ||
import org.opensearch.ResourceNotFoundException; | ||
import org.opensearch.action.DocWriteRequest; | ||
import org.opensearch.action.StepListener; | ||
import org.opensearch.action.admin.indices.create.CreateIndexRequest; | ||
|
@@ -84,6 +85,7 @@ | |
stepListener.onResponse(null); | ||
return; | ||
} | ||
log.error("Failed to create security analytics job index", e); | ||
Check warning on line 88 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
stepListener.onFailure(e); | ||
} | ||
})); | ||
|
@@ -104,82 +106,72 @@ | |
|
||
/** | ||
* Update jobSchedulerParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param jobSchedulerParameter the jobSchedulerParameter | ||
*/ | ||
public void updateJobSchedulerParameter(final TIFJobParameter jobSchedulerParameter, final ActionListener<ThreatIntelIndicesResponse> listener) { | ||
jobSchedulerParameter.setLastUpdateTime(Instant.now()); | ||
StashedThreadContext.run(client, () -> { | ||
try { | ||
if (listener != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this listener never be null? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. changed it to non-null in all invocations |
||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute(new ActionListener<>() { | ||
@Override | ||
public void onResponse(IndexResponse indexResponse) { | ||
if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); | ||
} else { | ||
listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
} | ||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute(new ActionListener<>() { | ||
Check warning on line 121 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
@Override | ||
public void onResponse(IndexResponse indexResponse) { | ||
if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { | ||
listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); | ||
Check warning on line 125 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
} else { | ||
listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); | ||
Check warning on line 127 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
} | ||
} | ||
Check warning on line 129 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
|
||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
}); | ||
} else { | ||
client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) | ||
.setId(jobSchedulerParameter.getName()) | ||
.setOpType(DocWriteRequest.OpType.INDEX) | ||
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) | ||
.setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) | ||
.execute().actionGet(); | ||
} | ||
@Override | ||
public void onFailure(Exception e) { | ||
listener.onFailure(e); | ||
} | ||
Check warning on line 134 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
}); | ||
} catch (IOException e) { | ||
throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); | ||
log.error("failed to update job scheduler param for tif job", e); | ||
listener.onFailure(e); | ||
Check warning on line 138 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
} | ||
}); | ||
} | ||
|
||
/** | ||
* Get tif job from an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param name the name of a tif job | ||
* @return tif job | ||
* @throws IOException exception | ||
*/ | ||
public TIFJobParameter getJobParameter(final String name) throws IOException { | ||
public void getJobParameter(final String name, ActionListener<TIFJobParameter> listener) { | ||
GetRequest request = new GetRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME, name); | ||
GetResponse response; | ||
try { | ||
response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))); | ||
if (response.isExists() == false) { | ||
log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
} catch (IndexNotFoundException e) { | ||
log.error("Index[{}] is not found", SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
return null; | ||
} | ||
|
||
XContentParser parser = XContentHelper.createParser( | ||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getSourceAsBytesRef() | ||
); | ||
return TIFJobParameter.PARSER.parse(parser, null); | ||
StashedThreadContext.run(client, () -> client.get(request, ActionListener.wrap( | ||
Check warning on line 150 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
response -> { | ||
if (response.isExists() == false) { | ||
log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); | ||
listener.onFailure(new ResourceNotFoundException("name")); | ||
Check warning on line 154 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
} | ||
XContentParser parser = XContentHelper.createParser( | ||
Check warning on line 156 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
NamedXContentRegistry.EMPTY, | ||
LoggingDeprecationHandler.INSTANCE, | ||
response.getSourceAsBytesRef() | ||
Check warning on line 159 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
); | ||
listener.onResponse(TIFJobParameter.PARSER.parse(parser, null)); | ||
}, e -> { | ||
log.error("Failed to fetch tif job document " + name, e); | ||
listener.onFailure(e); | ||
}))); | ||
Check warning on line 165 in src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java
|
||
} | ||
|
||
/** | ||
* Put tifJobParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} | ||
* | ||
* @param tifJobParameter the tifJobParameter | ||
* @param listener the listener | ||
* @param listener the listener | ||
*/ | ||
public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener listener) { | ||
public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener<IndexResponse> listener) { | ||
tifJobParameter.setLastUpdateTime(Instant.now()); | ||
StashedThreadContext.run(client, () -> { | ||
try { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know why the latches were initially implemented? Seems fine to remove them based on the testing performed but I'm puzzled as to why they would have been added in the first place if they are not required
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bad practice. The right construct to use is a
Countdown
but safer to do it the event-driven way