Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove blocking calls and change threat intel feed flow to event driven #871

Merged
merged 5 commits into from
Mar 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,13 +116,10 @@ private String buildQueryStringQueryWithIocList(Set<String> iocs) {
* Fetches threat intel data and creates doc level queries from threat intel data
*/
public void createDocLevelQueryFromThreatIntel(List<LogType.IocFields> iocFieldList, Detector detector, ActionListener<List<DocLevelQuery>> listener) {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar try/catch comment as below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener framework is event driven and no catch is required as ActionListener.onFailure() would need to implement whatever logic was written in catch block as callback mechanism will not throw an exception

if (false == detector.getThreatIntelEnabled() || iocFieldList.isEmpty()) {
listener.onResponse(Collections.emptyList());
return;
}

CountDownLatch latch = new CountDownLatch(1);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we know why the latches were initially implemented? Seems fine to remove them based on the testing performed but I'm puzzled as to why they would have been added in the first place if they are not required

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bad practice. The right construct to use is a Countdown

Countdown is a  simple thread safe count-down class that in contrast to a CountDownLatch never blocks. This class is useful if a certain action has to wait for N concurrent tasks to return or a timeout to occur in order to proceed.

but safer to do it the event-driven way

threatIntelFeedDataService.getThreatIntelFeedData(new ActionListener<>() {
@Override
public void onResponse(List<ThreatIntelFeedData> threatIntelFeedData) {
Expand All @@ -133,23 +130,14 @@ public void onResponse(List<ThreatIntelFeedData> threatIntelFeedData) {
createDocLevelQueriesFromThreatIntelList(iocFieldList, threatIntelFeedData, detector)
);
}
latch.countDown();
}

@Override
public void onFailure(Exception e) {
log.error("Failed to get threat intel feeds for doc level query creation", e);
listener.onFailure(e);
latch.countDown();
}
});

latch.await(30, TimeUnit.SECONDS);
} catch (InterruptedException e) {
log.error("Failed to create doc level queries from threat intel feeds", e);
listener.onFailure(e);
}

}

private static String constructId(Detector detector, String iocType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,12 @@
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.securityanalytics.model.ThreatIntelFeedData;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction;
import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest;
import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse;
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata;
import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService;
import org.opensearch.securityanalytics.util.IndexUtils;
import org.opensearch.securityanalytics.util.SecurityAnalyticsException;
Expand All @@ -56,7 +56,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -103,24 +102,11 @@
public void getThreatIntelFeedData(
ActionListener<List<ThreatIntelFeedData>> listener
) {
try {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we no longer need this top-level try/catch? My observation has been that calls will hang it exceptions are not handled via the ActionListener

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

listener framework is event driven and no catch is required as ActionListener.onFailure() would need to implement whatever logic was written in catch block as callback mechanism will not throw an exception

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this code throws an exception, we never make a call to ActionListener.onFailure(). I believe the original call just hangs in this case, at least that is what I have experienced while developing in this package.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me know if I am missing something

Copy link
Member Author

@eirsep eirsep Feb 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true.
reverting try-catch removal


String tifdIndex = getLatestIndexByCreationDate();
if (tifdIndex == null) {
createThreatIntelFeedData(listener);
} else {
SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));
}
} catch (InterruptedException e) {
log.error("Failed to get threat intel feed data", e);
listener.onFailure(e);
String tifdIndex = getLatestIndexByCreationDate();

Check warning on line 105 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L105

Added line #L105 was not covered by tests
if (tifdIndex == null) {
createThreatIntelFeedData(listener);

Check warning on line 107 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L107

Added line #L107 was not covered by tests
} else {
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener);

Check warning on line 109 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L109

Added line #L109 was not covered by tests
}
}

Expand Down Expand Up @@ -307,36 +293,35 @@
);
}

private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(1);
private void createThreatIntelFeedData(ActionListener<List<ThreatIntelFeedData>> listener) {
client.execute(
PutTIFJobAction.INSTANCE,
new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)),
new ActionListener<>() {
@Override
public void onResponse(AcknowledgedResponse acknowledgedResponse) {
log.debug("Acknowledged threat intel feed updater job created");
countDownLatch.countDown();
String tifdIndex = getLatestIndexByCreationDate();

SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));
fetchThreatIntelFeedDataFromIndex(tifdIndex, listener);

Check warning on line 305 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L305

Added line #L305 was not covered by tests
}

@Override
public void onFailure(Exception e) {
log.debug("Failed to create threat intel feed updater job", e);
countDownLatch.countDown();
}
}
);
countDownLatch.await();
}

Check warning on line 314 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L314

Added line #L314 was not covered by tests

private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener<List<ThreatIntelFeedData>> listener) {
SearchRequest searchRequest = new SearchRequest(tifdIndex);
searchRequest.source().size(9999); //TODO: convert to scroll
String finalTifdIndex = tifdIndex;
client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> {
log.error(String.format(

Check warning on line 321 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L317-L321

Added lines #L317 - L321 were not covered by tests
"Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e);
listener.onFailure(e);
}));

Check warning on line 324 in src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java#L323-L324

Added lines #L323 - L324 were not covered by tests
}

private String getIndexMapping() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
// TODO refactor this into a service class that creates feed updation job. This is not necessary to be a transport action
private static final Logger log = LogManager.getLogger(TransportPutTIFJobAction.class);

private final ThreadPool threadPool;
private final TIFJobParameterService tifJobParameterService;
private final TIFJobUpdateService tifJobUpdateService;
private final TIFLockService lockService;
Expand All @@ -65,7 +64,6 @@
final TIFLockService lockService
) {
super(PutTIFJobAction.NAME, transportService, actionFilters, PutTIFJobRequest::new);
this.threadPool = threadPool;
this.tifJobParameterService = tifJobParameterService;
this.tifJobUpdateService = tifJobUpdateService;
this.lockService = lockService;
Expand Down Expand Up @@ -103,16 +101,17 @@
final LockModel lock,
final ActionListener<AcknowledgedResponse> listener
) {
StepListener<Void> createIndexStep = new StepListener<>();
tifJobParameterService.createJobIndexIfNotExists(createIndexStep);
createIndexStep.whenComplete(v -> {
StepListener<Void> createIndexStepListener = new StepListener<>();
createIndexStepListener.whenComplete(v -> {

Check warning on line 105 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L104-L105

Added lines #L104 - L105 were not covered by tests
TIFJobParameter tifJobParameter = TIFJobParameter.Builder.build(request);
tifJobParameterService.saveTIFJobParameter(tifJobParameter, postIndexingTifJobParameter(tifJobParameter, lock, listener));
}, exception -> {
lockService.releaseLock(lock);
log.error("failed to release lock", exception);
listener.onFailure(exception);
});
tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener);

Check warning on line 113 in src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java#L113

Added line #L113 was not covered by tests

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,8 @@

package org.opensearch.securityanalytics.threatIntel.common;

import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;


import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import org.opensearch.OpenSearchException;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -25,6 +15,13 @@
import org.opensearch.jobscheduler.spi.utils.LockService;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;

import java.time.Instant;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;

/**
* A wrapper of job scheduler's lock service
*/
Expand All @@ -48,52 +45,27 @@
this.lockService = new LockService(client, clusterService);
}

/**
* Wrapper method of LockService#acquireLockWithId
*
* tif job uses its name as doc id in job scheduler. Therefore, we can use tif job name to acquire
* a lock on a tif job.
*
* @param tifJobName tifJobName to acquire lock on
* @param lockDurationSeconds the lock duration in seconds
* @param listener the listener
*/
public void acquireLock(final String tifJobName, final Long lockDurationSeconds, final ActionListener<LockModel> listener) {
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, tifJobName, listener);
}

/**
* Synchronous method of #acquireLock
*
* @param tifJobName tifJobName to acquire lock on
* @param lockDurationSeconds the lock duration in seconds
* @return lock model
*/
public Optional<LockModel> acquireLock(final String tifJobName, final Long lockDurationSeconds) {
public void acquireLock(final String tifJobName, final Long lockDurationSeconds, ActionListener<LockModel> listener) {
AtomicReference<LockModel> lockReference = new AtomicReference();
CountDownLatch countDownLatch = new CountDownLatch(1);
lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, tifJobName, new ActionListener<>() {
@Override
public void onResponse(final LockModel lockModel) {
lockReference.set(lockModel);
countDownLatch.countDown();
listener.onResponse(lockReference.get());

Check warning on line 60 in src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java

View check run for this annotation

Codecov / codecov/patch

src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java#L60

Added line #L60 was not covered by tests
}

@Override
public void onFailure(final Exception e) {
lockReference.set(null);
countDownLatch.countDown();
log.error("aquiring lock failed", e);
log.error("Failed to acquire lock for tif job " + tifJobName, e);
listener.onFailure(e);
}
});

try {
countDownLatch.await(clusterService.getClusterSettings().get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT).getSeconds(), TimeUnit.SECONDS);
return Optional.ofNullable(lockReference.get());
} catch (InterruptedException e) {
log.error("Waiting for the count down latch failed", e);
return Optional.empty();
}
}

/**
Expand Down
Loading
Loading