diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 0a80f33ae..e90d2193d 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -346,7 +346,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 3b8e4dee7..0e88b5cb3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -8,12 +8,15 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.InputRunResults @@ -273,10 +276,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -365,7 +365,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, @@ -374,35 +374,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings( - monitor, - monitorCtx, - triggeredQueries, - it.key, - !dryrun && monitor.id != Monitor.NO_ID, - executionId - ) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), @@ -461,51 +459,92 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return triggerResult } + /** + * 1. Bulk index all findings based on shouldCreateFinding flag + * 2. invoke publishFinding() to kickstart auto-correlations + * 3. Returns a list of pairs for finding id to doc id + */ private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, + docsToQueries: MutableMap>, + idQueryMap: Map, shouldCreateFinding: Boolean, workflowExecutionId: String? = null, - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now(), - executionId = workflowExecutionId - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } + if (indexRequests.isNotEmpty()) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + try { - publishFinding(monitor, monitorCtx, finding) + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } } catch (e: Exception) { // suppress exception logger.error("Optional finding callback failed", e) } - return finding.id + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { + indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(batch), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } + } + monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } private fun publishFinding( @@ -629,7 +668,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) } } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") + logger.error("Failed to run for shard $shard. Error: ${e.message}") } } return matchingDocs diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 41a26bb79..2c98495de 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -47,5 +47,6 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index ca223f7a0..103da2230 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -22,8 +22,10 @@ import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT @@ -169,6 +171,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } + return this } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index e23d44c5b..e8048e5b2 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,7 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -153,5 +154,12 @@ class AlertingSettings { -1L, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 1, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 943ad11a6..479e29dca 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -393,6 +393,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } + fun `test execute monitor for bulk index findings`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName, fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + for (i in 0 until 9) { + indexDoc(testIndex, i.toString(), testDoc) + } + indexDoc(testIndex2, "3", testDoc) + adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Correct search result", 10, matchingDocsToQuery.size) + assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 10, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 10, findings.size) + val foundFindings = + findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + assertEquals("Found findings for all docs", 4, foundFindings.size) + } + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() { val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" val testQueryName = "wildcard-test-query"