Skip to content

Commit

Permalink
Adds transport layer actions for CRUD workflows (opensearch-project#934
Browse files Browse the repository at this point in the history
…) (opensearch-project#938)

(cherry picked from commit e0b7a5a7905b977e58d80e3b9134b14893d122b0)

* remove unneeded import

---------






* Stashed user together with it's roles



---------







* Added workflow execution logic (opensearch-project#850)

* Added workflow execution logic



* Adjusted code according to comments



* Updated version of the findings json



* Updating the workflow metadata in the case of updating flag set to false while the metadata alerady exist



* Added logging for workflow metadata update



* Added Rest Execute Workflow action



* Extended workflow context with workflowMetadataId. Adjusted the doc level monitor findings



* Updated conditions for unstashing the context when indexing and deleting the workflow



---------



* Added fix when executing the workflow and when chained findings index… (opensearch-project#890)



* Fixed deleting monitor workflow metadata (#882)

* Fixed deleting monitor metadata and workflow metadata.




* fix monitor metadata error from conflict resolution



* remove unused import



* remove rest execute workflow action



* increment schema version for findings mapping json



---------

Signed-off-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Signed-off-by: Angie Zhang <langelzh@amazon.com>
Signed-off-by: Ashish Agrawal <ashisagr@amazon.com>
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
Co-authored-by: Stevan Buzejic <buzejic.stevan@gmail.com>
Co-authored-by: Angie Zhang <langelzh@amazon.com>
Co-authored-by: opensearch-trigger-bot[bot] <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Co-authored-by: Petar Dzepina <petar.dzepina@gmail.com>
Co-authored-by: Ashish Agrawal <ashisagr@amazon.com>
  • Loading branch information
6 people committed May 26, 2023
1 parent f8063fa commit 171cd2e
Show file tree
Hide file tree
Showing 38 changed files with 5,575 additions and 200 deletions.
2 changes: 2 additions & 0 deletions alerting/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ dependencies {
testImplementation "org.mockito:mockito-core:${versions.mockito}"
testImplementation "org.opensearch.plugin:reindex-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:parent-join-client:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-painless:${opensearch_version}"
testImplementation "org.opensearch.plugin:lang-mustache-client:${opensearch_version}"
}

javadoc.enabled = false // turn off javadoc as it barfs on Kotlin code
Expand Down
44 changes: 41 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.alerting
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
Expand Down Expand Up @@ -38,25 +39,31 @@ import org.opensearch.alerting.resthandler.RestSearchEmailAccountAction
import org.opensearch.alerting.resthandler.RestSearchEmailGroupAction
import org.opensearch.alerting.resthandler.RestSearchMonitorAction
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.service.DeleteMonitorService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
import org.opensearch.alerting.transport.TransportGetDestinationsAction
import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
import org.opensearch.alerting.transport.TransportIndexWorkflowAction
import org.opensearch.alerting.transport.TransportSearchEmailAccountAction
import org.opensearch.alerting.transport.TransportSearchEmailGroupAction
import org.opensearch.alerting.transport.TransportSearchMonitorAction
import org.opensearch.alerting.util.DocLevelMonitorQueries
import org.opensearch.alerting.util.destinationmigration.DestinationMigrationCoordinator
import org.opensearch.alerting.workflow.WorkflowRunnerService
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.node.DiscoveryNodes
Expand All @@ -78,6 +85,7 @@ import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.QueryLevelTrigger
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.SearchInput
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.core.xcontent.NamedXContentRegistry
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.env.Environment
Expand Down Expand Up @@ -140,6 +148,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
}

lateinit var runner: MonitorRunnerService
lateinit var workflowRunner: WorkflowRunnerService
lateinit var scheduler: JobScheduler
lateinit var sweeper: JobSweeper
lateinit var scheduledJobIndices: ScheduledJobIndices
Expand Down Expand Up @@ -191,8 +200,11 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(SearchEmailGroupAction.INSTANCE, TransportSearchEmailGroupAction::class.java),
ActionPlugin.ActionHandler(GetDestinationsAction.INSTANCE, TransportGetDestinationsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_ALERTS_ACTION_TYPE, TransportGetAlertsAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java)

ActionPlugin.ActionHandler(AlertingActions.GET_FINDINGS_ACTION_TYPE, TransportGetFindingsSearchAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
)
}

Expand All @@ -204,7 +216,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY,
ClusterMetricsInput.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
DocumentLevelTrigger.XCONTENT_REGISTRY,
Workflow.XCONTENT_REGISTRY
)
}

Expand Down Expand Up @@ -239,6 +252,22 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerConsumers()
.registerDestinationSettings()
workflowRunner = WorkflowRunnerService
.registerClusterService(clusterService)
.registerClient(client)
.registerNamedXContentRegistry(xContentRegistry)
.registerScriptService(scriptService)
.registerIndexNameExpressionResolver(indexNameExpressionResolver)
.registerSettings(settings)
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService)
scheduler = JobScheduler(threadPool, runner)
Expand All @@ -254,6 +283,15 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

WorkflowMetadataService.initialize(
client,
clusterService,
xContentRegistry,
settings
)

DeleteMonitorService.initialize(client)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -59,7 +60,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down Expand Up @@ -118,7 +120,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitor,
periodStart,
periodEnd,
monitorResult.inputResults
monitorResult.inputResults,
workflowRunContext
)
if (firstIteration) {
firstPageOfInputResults = inputResults
Expand Down Expand Up @@ -154,7 +157,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx,
periodStart,
periodEnd,
!dryrun && monitor.id != Monitor.NO_ID
!dryrun && monitor.id != Monitor.NO_ID,
workflowRunContext
)
} else {
emptyList()
Expand Down Expand Up @@ -350,7 +354,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowRunContext: WorkflowRunContext? = null
): List<String> {
monitor.inputs.forEach { input ->
if (input is SearchInput) {
Expand All @@ -361,14 +366,14 @@ object BucketLevelMonitorRunner : MonitorRunner() {
for (aggFactory in (query.aggregations() as AggregatorFactories.Builder).aggregatorFactories) {
when (aggFactory) {
is CompositeAggregationBuilder -> {
var grouByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
var groupByFields = 0 // if number of fields used to group by > 1 we won't calculate findings
val sources = aggFactory.sources()
for (source in sources) {
if (grouByFields > 0) {
if (groupByFields > 0) {
logger.error("grouByFields > 0. not generating findings for bucket level monitor ${monitor.id}")
return listOf()
}
grouByFields++
groupByFields++
fieldName = source.field()
}
}
Expand Down Expand Up @@ -409,7 +414,7 @@ object BucketLevelMonitorRunner : MonitorRunner() {
sr.source().query(queryBuilder)
}
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding)
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, workflowRunContext?.executionId)
} else {
logger.error("Couldn't resolve groupBy field. Not generating bucket level monitor findings for monitor %${monitor.id}")
}
Expand All @@ -422,7 +427,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
searchResponse: SearchResponse,
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null
): List<String> {
val docIdsByIndexName: MutableMap<String, MutableList<String>> = mutableMapOf()
for (hit in searchResponse.hits.hits) {
Expand All @@ -441,7 +447,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = it.key,
timestamp = Instant.now(),
docLevelQueries = listOf()
docLevelQueries = listOf(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.metadata.IndexMetadata
Expand Down Expand Up @@ -70,7 +71,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?
): MonitorRunResult<DocumentLevelTriggerRunResult> {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
Expand All @@ -96,7 +98,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
var (monitorMetadata, _) = MonitorMetadataService.getOrCreateMetadata(
monitor = monitor,
createWithRunContext = false,
skipIndex = isTempMonitor
skipIndex = isTempMonitor,
workflowRunContext?.workflowMetadataId
)

val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput
Expand Down Expand Up @@ -135,6 +138,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
}

// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

indices.forEach { indexName ->
// Prepare lastRunContext for each index
val indexLastRunContext = lastRunContext.getOrPut(indexName) {
Expand Down Expand Up @@ -169,7 +175,13 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Prepare DocumentExecutionContext for each index
val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext)

val matchingDocs = getMatchingDocs(monitor, monitorCtx, docExecutionContext, indexName)
val matchingDocs = getMatchingDocs(
monitor,
monitorCtx,
docExecutionContext,
indexName,
matchingDocIdsPerIndex?.get(indexName)
)

if (matchingDocs.isNotEmpty()) {
val matchedQueriesForDocs = getMatchedQueries(
Expand Down Expand Up @@ -226,7 +238,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap,
docsToQueries,
queryToDocIds,
dryrun
dryrun,
workflowRunContext?.executionId
)
}
}
Expand Down Expand Up @@ -298,7 +311,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean
dryrun: Boolean,
workflowExecutionId: String? = null
): DocumentLevelTriggerRunResult {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)
Expand All @@ -309,7 +323,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(monitor, monitorCtx, triggeredQueries, it.key, !dryrun && monitor.id != Monitor.NO_ID)
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
workflowExecutionId
)
findings.add(findingId)

if (triggerResult.triggeredDocs.contains(it.key)) {
Expand Down Expand Up @@ -379,7 +400,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
shouldCreateFinding: Boolean
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
Expand All @@ -392,7 +414,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now()
timestamp = Instant.now(),
executionId = workflowExecutionId
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
Expand Down Expand Up @@ -514,7 +537,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docExecutionCtx: DocumentExecutionContext,
index: String
index: String,
docIds: List<String>? = null
): List<Pair<String, BytesReference>> {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
val matchingDocs = mutableListOf<Pair<String, BytesReference>>()
Expand All @@ -530,7 +554,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard,
prevSeqNo,
maxSeqNo,
null
null,
docIds
)

if (hits.hits.isNotEmpty()) {
Expand All @@ -549,7 +574,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
shard: String,
prevSeqNo: Long?,
maxSeqNo: Long,
query: String?
query: String?,
docIds: List<String>? = null
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand All @@ -561,6 +587,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
}

if (!docIds.isNullOrEmpty()) {
boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds))
}

val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand Down
Loading

0 comments on commit 171cd2e

Please sign in to comment.