From 70118d035fa6720a5bd5bde899bd643f45d6b8d2 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Fri, 12 Jul 2024 19:11:18 +0000 Subject: [PATCH] Fix the job scheduler parser, action listeners, and multi-node test (#1157) * fix job parser Signed-off-by: Joanne Wang * fix listener.onFailure Signed-off-by: Joanne Wang * fix flaky create threat intel monitor test Signed-off-by: Joanne Wang --------- Signed-off-by: Joanne Wang (cherry picked from commit b8496a9aaa7b3d40b055f9b05b4dc510d2158134) Signed-off-by: github-actions[bot] --- .../SecurityAnalyticsPlugin.java | 19 +++-- .../jobscheduler/SecurityAnalyticsRunner.java | 4 + .../iocscan/dao/BaseEntityCrudService.java | 3 +- .../threatIntel/model/TIFJobParameter.java | 75 +++++++++++++++++-- .../SATIFSourceConfigManagementService.java | 2 - 5 files changed, 87 insertions(+), 16 deletions(-) diff --git a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java index f4a84c8f1..95ca88315 100644 --- a/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/securityanalytics/SecurityAnalyticsPlugin.java @@ -146,6 +146,7 @@ import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFSourceConfigRunner; import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig; import org.opensearch.securityanalytics.threatIntel.model.monitor.TransportThreatIntelMonitorFanOutAction; +import org.opensearch.securityanalytics.threatIntel.model.TIFJobParameter; import org.opensearch.securityanalytics.threatIntel.resthandler.RestDeleteTIFSourceConfigAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestGetIocFindingsAction; import org.opensearch.securityanalytics.threatIntel.resthandler.RestGetTIFSourceConfigAction; @@ -409,18 +410,20 @@ public ScheduledJobRunner getJobRunner() { @Override public ScheduledJobParser getJobParser() { - // TODO: @jowg fix the job parser to parse previous tif job return (xcp, id, jobDocVersion) -> { XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp); while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { String fieldName = xcp.currentName(); - xcp.nextToken(); - switch (fieldName) { - case SOURCE_CONFIG_FIELD: - return SATIFSourceConfig.parse(xcp, id, jobDocVersion.getVersion()); - default: - log.error("Job parser failed for [{}] in security analytics job registration", fieldName); - xcp.skipChildren(); + if (xcp.nextToken() == XContentParser.Token.START_OBJECT) { + switch (fieldName) { + case SOURCE_CONFIG_FIELD: + return SATIFSourceConfig.parse(xcp, id, null); + default: + log.error("Job parser failed for [{}] in security analytics job registration", fieldName); + xcp.skipChildren(); + } + } else { + return TIFJobParameter.parseFromParser(xcp, id, jobDocVersion.getVersion()); } } return null; diff --git a/src/main/java/org/opensearch/securityanalytics/jobscheduler/SecurityAnalyticsRunner.java b/src/main/java/org/opensearch/securityanalytics/jobscheduler/SecurityAnalyticsRunner.java index 129a2cc0b..44ea1971d 100644 --- a/src/main/java/org/opensearch/securityanalytics/jobscheduler/SecurityAnalyticsRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/jobscheduler/SecurityAnalyticsRunner.java @@ -5,7 +5,9 @@ import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; +import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobRunner; import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFSourceConfigRunner; +import org.opensearch.securityanalytics.threatIntel.model.TIFJobParameter; import org.opensearch.securityanalytics.threatIntel.sacommons.TIFSourceConfig; public class SecurityAnalyticsRunner implements ScheduledJobRunner { @@ -30,6 +32,8 @@ private SecurityAnalyticsRunner() {} public void runJob(ScheduledJobParameter job, JobExecutionContext context) { if (job instanceof TIFSourceConfig) { TIFSourceConfigRunner.getJobRunnerInstance().runJob(job, context); + } else if (job instanceof TIFJobParameter) { + TIFJobRunner.getJobRunnerInstance().runJob(job, context); } else { String errorMessage = "Invalid job type, found " + job.getClass().getSimpleName() + "with id: " + context.getJobId(); log.error(errorMessage); diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java index e69706b94..62eee1a57 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/iocscan/dao/BaseEntityCrudService.java @@ -25,6 +25,7 @@ import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.securityanalytics.model.threatintel.BaseEntity; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.transport.RemoteTransportException; import java.io.IOException; import java.util.ArrayList; @@ -226,7 +227,7 @@ public void createIndexIfNotExists(final ActionListener listener) { log.debug("{} index created", getEntityName()); listener.onResponse(null); }, e -> { - if (e instanceof ResourceAlreadyExistsException) { + if (e instanceof ResourceAlreadyExistsException || (e instanceof RemoteTransportException && e.getCause() instanceof ResourceAlreadyExistsException)) { log.debug("index {} already exist", getEntityIndexMapping()); listener.onResponse(null); return; diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java index 2fa5cb199..6e74b3b5a 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/model/TIFJobParameter.java @@ -51,9 +51,9 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter { private static final String SCHEDULE_FIELD = "schedule"; private static final String ENABLED_TIME_FIELD = "enabled_time"; private static final String ENABLED_TIME_FIELD_READABLE = "enabled_time_field"; - private static final String state_field = "state"; + private static final String STATE_FIELD = "state"; private static final String INDICES_FIELD = "indices"; - private static final String update_stats_field = "update_stats"; + private static final String UPDATE_STATS_FIELD = "update_stats"; /** @@ -70,9 +70,9 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter { /** * Additional fields for tif job */ - public static final ParseField STATE_PARSER_FIELD = new ParseField(state_field); + public static final ParseField STATE_PARSER_FIELD = new ParseField(STATE_FIELD); public static final ParseField INDICES_PARSER_FIELD = new ParseField(INDICES_FIELD); - public static final ParseField UPDATE_STATS_PARSER_FIELD = new ParseField(update_stats_field); + public static final ParseField UPDATE_STATS_PARSER_FIELD = new ParseField(UPDATE_STATS_FIELD); /** * Default variables for job scheduling @@ -128,6 +128,71 @@ public class TIFJobParameter implements Writeable, ScheduledJobParameter { */ private UpdateStats updateStats; + public static TIFJobParameter parseFromParser(XContentParser xcp, String id, Long version) throws IOException { + String name = null; + Instant lastUpdateTime = null; + Boolean isEnabled = null; + TIFJobState state = null; + Instant enabledTime = null; + IntervalSchedule schedule = null; + List indices = new ArrayList<>(); + UpdateStats updateStats = null; + + // parsing is coming from the security analytics plugin parser, so it begins with value_string token + XContentParserUtils.ensureExpectedToken(XContentParser.Token.VALUE_STRING, xcp.currentToken(), xcp); + while (true) { + String fieldName = xcp.currentName(); + switch (fieldName) { + case NAME_FIELD: + name = xcp.text(); + break; + case LAST_UPDATE_TIME_FIELD: + lastUpdateTime = Instant.ofEpochMilli(xcp.longValue()); + break; + case ENABLED_TIME_FIELD: + if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) { + enabledTime = null; + } else if (xcp.currentToken().isValue()) { + enabledTime = Instant.ofEpochMilli(xcp.longValue()); + } else { + XContentParserUtils.throwUnknownToken(xcp.currentToken(), xcp.getTokenLocation()); + enabledTime = null; + } + break; + case ENABLED_FIELD: + isEnabled = xcp.booleanValue(); + break; + case SCHEDULE_FIELD: + schedule = (IntervalSchedule) ScheduleParser.parse(xcp); + break; + case STATE_FIELD: + state = toState(xcp.text()); + break; + case INDICES_FIELD: + indices = new ArrayList<>(); + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp); + while (xcp.nextToken() != XContentParser.Token.END_ARRAY) { + indices.add(xcp.text()); + } + break; + case UPDATE_STATS_FIELD: + updateStats = UpdateStats.PARSER.parse(xcp, null); + break; + default: + xcp.skipChildren(); + } + + if (xcp.nextToken() == XContentParser.Token.END_OBJECT){ + break; + } else { + xcp.nextToken(); + } + } + + return new TIFJobParameter(name, lastUpdateTime, enabledTime, isEnabled, schedule, state, indices, updateStats); + } + + // parser used for integ test public static TIFJobParameter parse(XContentParser xcp, String id, Long version) throws IOException { String name = null; Instant lastUpdateTime = null; @@ -150,7 +215,7 @@ public static TIFJobParameter parse(XContentParser xcp, String id, Long version) case ENABLED_FIELD: isEnabled = xcp.booleanValue(); break; - case state_field: + case STATE_FIELD: state = toState(xcp.text()); break; default: diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigManagementService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigManagementService.java index 65514f459..eb0c32393 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigManagementService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/service/SATIFSourceConfigManagementService.java @@ -398,7 +398,6 @@ private void storeAndDeleteIocIndices(List stix2IOCList, ActionListene listener.onFailure(ex); } )); - listener.onFailure(e); }) ); } @@ -499,7 +498,6 @@ private void downloadAndSaveIocsToRefresh(ActionListener l listener.onFailure(e); } )); - listener.onFailure(downloadAndSaveIocsError); })); }