diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index 42d30add5..141e94a21 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -21,6 +21,7 @@ import org.opensearch.alerting.core.JobSweeper import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings @@ -47,6 +48,7 @@ import org.opensearch.alerting.resthandler.RestSearchMonitorAction import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.service.DeleteMonitorService import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings @@ -99,6 +101,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.env.Environment import org.opensearch.env.NodeEnvironment import org.opensearch.index.IndexModule +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.painless.spi.PainlessExtension import org.opensearch.painless.spi.Whitelist import org.opensearch.painless.spi.WhitelistLoader @@ -254,6 +257,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ): Collection { // Need to figure out how to use the OpenSearch DI classes rather than handwiring things here. val settings = environment.settings() + val lockService = LockService(client, clusterService) alertIndices = AlertIndices(settings, client, threadPool, clusterService) runner = MonitorRunnerService .registerClusterService(clusterService) @@ -268,7 +272,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerTriggerService(TriggerService(scriptService)) .registerAlertService(AlertService(client, xContentRegistry, alertIndices)) .registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService)) + .registerJvmStats(JvmStats.jvmStats()) .registerWorkflowService(WorkflowService(client, xContentRegistry)) + .registerLockService(lockService) .registerConsumers() .registerDestinationSettings() scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) @@ -293,9 +299,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R settings ) - DeleteMonitorService.initialize(client) + DeleteMonitorService.initialize(client, lockService) - return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator) + return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService) } override fun getSettings(): List> { @@ -325,6 +331,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.ALERT_HISTORY_MAX_DOCS, AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, AlertingSettings.ALERTING_MAX_MONITORS, + AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, + AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY, AlertingSettings.REQUEST_TIMEOUT, AlertingSettings.MAX_ACTION_THROTTLE_VALUE, AlertingSettings.FILTER_BY_BACKEND_ROLES, @@ -345,6 +354,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, + AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED, DestinationSettings.EMAIL_USERNAME, DestinationSettings.EMAIL_PASSWORD, DestinationSettings.ALLOW_LIST, @@ -357,7 +367,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_MAX_DOCS, AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, - AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD + AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 15b55f119..6779d5b34 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -9,14 +9,17 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener +import org.opensearch.action.DocWriteRequest +import org.opensearch.action.admin.indices.refresh.RefreshAction +import org.opensearch.action.admin.indices.refresh.RefreshRequest +import org.opensearch.action.bulk.BulkRequest +import org.opensearch.action.bulk.BulkResponse import org.opensearch.action.index.IndexRequest -import org.opensearch.action.index.IndexResponse import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse -import org.opensearch.action.support.WriteRequest -import org.opensearch.alerting.model.DocumentExecutionContext import org.opensearch.alerting.model.DocumentLevelTriggerRunResult +import org.opensearch.alerting.model.IndexExecutionContext import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.MonitorRunResult @@ -28,9 +31,9 @@ import org.opensearch.alerting.util.IndexUtils import org.opensearch.alerting.util.defaultToPerExecutionAction import org.opensearch.alerting.util.getActionExecutionPolicy import org.opensearch.alerting.workflow.WorkflowRunContext -import org.opensearch.client.Client import org.opensearch.client.node.NodeClient import org.opensearch.cluster.metadata.IndexMetadata +import org.opensearch.cluster.routing.Preference import org.opensearch.cluster.routing.ShardRouting import org.opensearch.cluster.service.ClusterService import org.opensearch.common.bytes.BytesReference @@ -54,18 +57,31 @@ import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.indices.IndexClosedException import org.opensearch.percolator.PercolateQueryBuilderExt import org.opensearch.rest.RestStatus +import org.opensearch.search.SearchHit import org.opensearch.search.SearchHits import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID +import java.util.stream.Collectors import kotlin.math.max -object DocumentLevelMonitorRunner : MonitorRunner() { +class DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) + var nonPercolateSearchesTimeTakenStat = 0L + var percolateQueriesTimeTakenStat = 0L + var totalDocsQueriedStat = 0L + var docTransformTimeTakenStat = 0L + var totalDocsSizeInBytesStat = 0L + var docsSizeOfBatchInBytes = 0L + /* Contains list of docs source that are held in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() override suspend fun runMonitor( monitor: Monitor, @@ -119,12 +135,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices - val concreteIndices = IndexUtils.resolveAllIndices( + val allConcreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) - if (concreteIndices.isEmpty()) { + if (allConcreteIndices.isEmpty()) { logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}") throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(",")) } @@ -140,7 +156,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // cleanup old indices that are not monitored anymore from the same monitor val runContextKeys = updatedLastRunContext.keys.toMutableSet() for (ind in runContextKeys) { - if (!concreteIndices.contains(ind)) { + if (!allConcreteIndices.contains(ind)) { updatedLastRunContext.remove(ind) } } @@ -148,13 +164,32 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex + val concreteIndicesSeenSoFar = mutableListOf() + val updatedIndexNames = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> - val concreteIndices = IndexUtils.resolveAllIndices( + var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + var lastWriteIndex: String? = null + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + } + } + concreteIndicesSeenSoFar.addAll(concreteIndices) val updatedIndexName = indexName.replace("*", "_") + updatedIndexNames.add(updatedIndexName) val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields( monitorCtx.clusterService!!.state(), concreteIndices @@ -173,12 +208,21 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( + val indexUpdatedRunContext = initializeNewLastRunContext( indexLastRunContext.toMutableMap(), monitorCtx, - concreteIndexName + concreteIndexName, ) as MutableMap - updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + updatedLastRunContext.remove(lastWriteIndex) + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } + } else { + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + } val count: Int = indexLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -191,44 +235,65 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - - val matchingDocs = getMatchingDocs( - monitor, - monitorCtx, - docExecutionContext, + val fieldsToBeQueried = mutableSetOf() + if (monitorCtx.fetchOnlyQueryFieldNames) { + for (it in queries) { + if (it.queryFieldNames.isEmpty()) { + fieldsToBeQueried.clear() + logger.debug( + "Monitor ${monitor.id} : " + + "Doc Level query ${it.id} : ${it.query} doesn't have queryFieldNames populated. " + + "Cannot optimize monitor to fetch only query-relevant fields. " + + "Querying entire doc source." + ) + break + } + fieldsToBeQueried.addAll(it.queryFieldNames) + } + if (fieldsToBeQueried.isNotEmpty()) + logger.debug( + "Monitor ${monitor.id} Querying only fields " + + "${fieldsToBeQueried.joinToString()} instead of entire _source of documents" + ) + } + val indexExecutionContext = IndexExecutionContext( + queries, + indexLastRunContext, + indexUpdatedRunContext, updatedIndexName, concreteIndexName, conflictingFields.toList(), - matchingDocIdsPerIndex?.get(concreteIndexName) + matchingDocIdsPerIndex?.get(concreteIndexName), ) - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, - monitor, - monitorMetadata, - updatedIndexName, - concreteIndexName - ) - - matchedQueriesForDocs.forEach { hit -> - val id = hit.id - .replace("_${updatedIndexName}_${monitor.id}", "") - .replace("_${concreteIndexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) - } - } + fetchShardDataAndMaybeExecutePercolateQueries( + monitor, + monitorCtx, + indexExecutionContext, + monitorMetadata, + inputRunResults, + docsToQueries, + updatedIndexNames, + concreteIndicesSeenSoFar, + ArrayList(fieldsToBeQueried) + ) { shard, maxSeqNo -> // function passed to update last run context with new max sequence number + indexExecutionContext.updatedLastRunContext[shard] = maxSeqNo } } } + /* if all indices are covered still in-memory docs size limit is not breached we would need to submit + the percolate query at the end */ + if (transformedDocs.isNotEmpty()) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + updatedIndexNames, + concreteIndicesSeenSoFar, + inputRunResults, + docsToQueries, + ) + } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) /* @@ -248,10 +313,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // If there are no triggers defined, we still want to generate findings if (monitor.triggers.isEmpty()) { if (dryrun == false && monitor.id != Monitor.NO_ID) { - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - createFindings(monitor, monitorCtx, triggeredQueries, it.key, true) - } + createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true) } } else { monitor.triggers.forEach { @@ -288,6 +350,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata.copy(lastRunContext = updatedLastRunContext), true ) + } else { + // Clean up any queries created by the dry run monitor + monitorCtx.docLevelMonitorQueries!!.deleteDocLevelQueriesOnDryRun(monitorMetadata) } // TODO: Update the Document as part of the Trigger and return back the trigger action result @@ -302,6 +367,22 @@ object DocumentLevelMonitorRunner : MonitorRunner() { e ) return monitorResult.copy(error = alertingException, inputResults = InputRunResults(emptyList(), alertingException)) + } finally { + logger.debug( + "PERF_DEBUG_STATS: Monitor ${monitor.id} " + + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTakenStat" + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}", + monitor.id, + percolateQueriesTimeTakenStat + ) + logger.debug( + "PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}", + monitor.id, + docTransformTimeTakenStat + ) + logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueriedStat) } } @@ -340,7 +421,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { trigger: DocumentLevelTrigger, monitor: Monitor, idQueryMap: Map, - docsToQueries: Map>, + docsToQueries: MutableMap>, queryToDocIds: Map>, dryrun: Boolean, workflowRunContext: WorkflowRunContext?, @@ -349,35 +430,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger) val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds) - val findings = mutableListOf() - val findingDocPairs = mutableListOf>() + val triggerFindingDocPairs = mutableListOf>() // TODO: Implement throttling for findings - docsToQueries.forEach { - val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - val findingId = createFindings( - monitor, - monitorCtx, - triggeredQueries, - it.key, - !dryrun && monitor.id != Monitor.NO_ID, - executionId - ) - findings.add(findingId) + val findingToDocPairs = createFindings( + monitor, + monitorCtx, + docsToQueries, + idQueryMap, + !dryrun && monitor.id != Monitor.NO_ID, + executionId + ) - if (triggerResult.triggeredDocs.contains(it.key)) { - findingDocPairs.add(Pair(findingId, it.key)) + findingToDocPairs.forEach { + // Only pick those entries whose docs have triggers associated with them + if (triggerResult.triggeredDocs.contains(it.second)) { + triggerFindingDocPairs.add(Pair(it.first, it.second)) } } val actionCtx = triggerCtx.copy( triggeredDocs = triggerResult.triggeredDocs, - relatedFindings = findings, + relatedFindings = findingToDocPairs.map { it.first }, error = monitorResult.error ?: triggerResult.error ) val alerts = mutableListOf() - findingDocPairs.forEach { + triggerFindingDocPairs.forEach { val alert = monitorCtx.alertService!!.composeDocLevelAlert( listOf(it.first), listOf(it.second), @@ -436,51 +515,92 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return triggerResult } + /** + * 1. Bulk index all findings based on shouldCreateFinding flag + * 2. invoke publishFinding() to kickstart auto-correlations + * 3. Returns a list of pairs for finding id to doc id + */ private suspend fun createFindings( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docLevelQueries: List, - matchingDocId: String, + docsToQueries: MutableMap>, + idQueryMap: Map, shouldCreateFinding: Boolean, workflowExecutionId: String? = null, - ): String { - // Before the "|" is the doc id and after the "|" is the index - val docIndex = matchingDocId.split("|") + ): List> { - val finding = Finding( - id = UUID.randomUUID().toString(), - relatedDocIds = listOf(docIndex[0]), - correlatedDocIds = listOf(docIndex[0]), - monitorId = monitor.id, - monitorName = monitor.name, - index = docIndex[1], - docLevelQueries = docLevelQueries, - timestamp = Instant.now(), - executionId = workflowExecutionId - ) + val findingDocPairs = mutableListOf>() + val findings = mutableListOf() + val indexRequests = mutableListOf() - val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string() - logger.debug("Findings: $findingStr") + docsToQueries.forEach { + val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! } - if (shouldCreateFinding) { - val indexRequest = IndexRequest(monitor.dataSources.findingsIndex) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(findingStr, XContentType.JSON) - .id(finding.id) - .routing(finding.id) + // Before the "|" is the doc id and after the "|" is the index + val docIndex = it.key.split("|") - monitorCtx.client!!.suspendUntil { - monitorCtx.client!!.index(indexRequest, it) + val finding = Finding( + id = UUID.randomUUID().toString(), + relatedDocIds = listOf(docIndex[0]), + correlatedDocIds = listOf(docIndex[0]), + monitorId = monitor.id, + monitorName = monitor.name, + index = docIndex[1], + docLevelQueries = triggeredQueries, + timestamp = Instant.now(), + executionId = workflowExecutionId + ) + findingDocPairs.add(Pair(finding.id, it.key)) + findings.add(finding) + + val findingStr = + finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS) + .string() + logger.debug("Findings: $findingStr") + + if (shouldCreateFinding) { + indexRequests += IndexRequest(monitor.dataSources.findingsIndex) + .source(findingStr, XContentType.JSON) + .id(finding.id) + .opType(DocWriteRequest.OpType.CREATE) } } + if (indexRequests.isNotEmpty()) { + bulkIndexFindings(monitor, monitorCtx, indexRequests) + } + try { - publishFinding(monitor, monitorCtx, finding) + findings.forEach { finding -> + publishFinding(monitor, monitorCtx, finding) + } } catch (e: Exception) { // suppress exception logger.error("Optional finding callback failed", e) } - return finding.id + return findingDocPairs + } + + private suspend fun bulkIndexFindings( + monitor: Monitor, + monitorCtx: MonitorRunnerExecutionContext, + indexRequests: List + ) { + indexRequests.chunked(monitorCtx.findingsIndexBatchSize).forEach { batch -> + val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil { + bulk(BulkRequest().add(batch), it) + } + if (bulkResponse.hasFailures()) { + bulkResponse.items.forEach { item -> + if (item.isFailed) { + logger.error("Failed indexing the finding ${item.id} of monitor [${monitor.id}]") + } + } + } else { + logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.") + } + } + monitorCtx.client!!.execute(RefreshAction.INSTANCE, RefreshRequest(monitor.dataSources.findingsIndex)) } private fun publishFinding( @@ -500,17 +620,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } - private suspend fun updateLastRunContext( + private fun initializeNewLastRunContext( lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, - index: String + index: String, ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) - updatedLastRunContext[shard] = maxSeqNo.toString() + updatedLastRunContext[shard] = SequenceNumbers.UNASSIGNED_SEQ_NO.toString() } return updatedLastRunContext } @@ -542,83 +661,166 @@ object DocumentLevelMonitorRunner : MonitorRunner() { return indexCreationDate > lastExecutionTime.toEpochMilli() } - /** - * Get the current max seq number of the shard. We find it by searching the last document - * in the primary shard. - */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { - val request: SearchRequest = SearchRequest() - .indices(index) - .preference("_shards:$shard") - .source( - SearchSourceBuilder() - .version(true) - .sort("_seq_no", SortOrder.DESC) - .seqNoAndPrimaryTerm(true) - .query(QueryBuilders.matchAllQuery()) - .size(1) - ) - val response: SearchResponse = client.suspendUntil { client.search(request, it) } - if (response.status() !== RestStatus.OK) { - throw IOException("Failed to get max seq no for shard: $shard") - } - if (response.hits.hits.isEmpty()) { - return -1L - } - - return response.hits.hits[0].seqNo - } - private fun getShardsCount(clusterService: ClusterService, index: String): Int { val allShards: List = clusterService!!.state().routingTable().allShards(index) return allShards.filter { it.primary() }.size } - private suspend fun getMatchingDocs( + /** 1. Fetch data per shard for given index. (only 10000 docs are fetched. + * needs to be converted to scroll if not performant enough) + * 2. Transform documents to conform to format required for percolate query + * 3a. Check if docs in memory are crossing threshold defined by setting. + * 3b. If yes, perform percolate query and update docToQueries Map with all hits from percolate queries */ + private suspend fun fetchShardDataAndMaybeExecutePercolateQueries( monitor: Monitor, monitorCtx: MonitorRunnerExecutionContext, - docExecutionCtx: DocumentExecutionContext, - index: String, - concreteIndex: String, - conflictingFields: List, - docIds: List? = null - ): List> { - val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int - val matchingDocs = mutableListOf>() + indexExecutionCtx: IndexExecutionContext, + monitorMetadata: MonitorMetadata, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + monitorInputIndices: List, + concreteIndices: List, + fieldsToBeQueried: List, + updateLastRunContext: (String, String) -> Unit + ) { + val count: Int = indexExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { val shard = i.toString() try { - val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong() - val prevSeqNo = docExecutionCtx.lastRunContext[shard].toString().toLongOrNull() - - val hits: SearchHits = searchShard( + val prevSeqNo = indexExecutionCtx.lastRunContext[shard].toString().toLongOrNull() + val from = prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED + var to: Long = Long.MAX_VALUE + while (to >= from) { + val hits: SearchHits = searchShard( + monitorCtx, + indexExecutionCtx.concreteIndexName, + shard, + from, + to, + indexExecutionCtx.docIds, + fieldsToBeQueried, + ) + if (hits.hits.isEmpty()) { + if (to == Long.MAX_VALUE) { + updateLastRunContext(shard, (prevSeqNo ?: SequenceNumbers.NO_OPS_PERFORMED).toString()) // didn't find any docs + } + break + } + if (to == Long.MAX_VALUE) { // max sequence number of shard needs to be computed + updateLastRunContext(shard, hits.hits[0].seqNo.toString()) + } + val leastSeqNoFromHits = hits.hits.last().seqNo + to = leastSeqNoFromHits - 1 + val startTime = System.currentTimeMillis() + transformedDocs.addAll( + transformSearchHitsAndReconstructDocs( + hits, + indexExecutionCtx.indexName, + indexExecutionCtx.concreteIndexName, + monitor.id, + indexExecutionCtx.conflictingFields, + ) + ) + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( + monitorCtx, + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, + ) + } + docTransformTimeTakenStat += System.currentTimeMillis() - startTime + } + } catch (e: Exception) { + logger.error( + "Monitor ${monitor.id} :" + + "Failed to run fetch data from shard [$shard] of index [${indexExecutionCtx.concreteIndexName}]. " + + "Error: ${e.message}", + e + ) + if (e is IndexClosedException) { + throw e + } + } + if ( + transformedDocs.isNotEmpty() && + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) + ) { + performPercolateQueryAndResetCounters( monitorCtx, - concreteIndex, - shard, - prevSeqNo, - maxSeqNo, - null, - docIds + monitor, + monitorMetadata, + monitorInputIndices, + concreteIndices, + inputRunResults, + docsToQueries, ) + } + } + } - if (hits.hits.isNotEmpty()) { - matchingDocs.addAll(getAllDocs(hits, index, concreteIndex, monitor.id, conflictingFields)) + private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( + numDocs: Int, + monitorCtx: MonitorRunnerExecutionContext, + ): Boolean { + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeOfBatchInBytes, monitorCtx) || + isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) + } + + private suspend fun performPercolateQueryAndResetCounters( + monitorCtx: MonitorRunnerExecutionContext, + monitor: Monitor, + monitorMetadata: MonitorMetadata, + monitorInputIndices: List, + concreteIndices: List, + inputRunResults: MutableMap>, + docsToQueries: MutableMap>, + ) { + try { + val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( + monitorCtx, + transformedDocs, + monitor, + monitorMetadata, + concreteIndices, + monitorInputIndices, + ) + + percolateQueryResponseHits.forEach { hit -> + var id = hit.id + concreteIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + monitorInputIndices.forEach { id = id.replace("_${it}_${monitor.id}", "") } + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${transformedDocs[idx].first}|${transformedDocs[idx].second.concreteIndexName}" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } - } catch (e: Exception) { - logger.warn("Failed to run for shard $shard. Error: ${e.message}") } + totalDocsQueriedStat += transformedDocs.size.toLong() + } finally { + transformedDocs.clear() + docsSizeOfBatchInBytes = 0 } - return matchingDocs } + /** Executes search query on given shard of given index to fetch docs with sequene number greater than prevSeqNo. + * This method hence fetches only docs from shard which haven't been queried before + */ private suspend fun searchShard( monitorCtx: MonitorRunnerExecutionContext, index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, - query: String?, - docIds: List? = null + docIds: List? = null, + fieldsToFetch: List, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -626,10 +828,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val boolQueryBuilder = BoolQueryBuilder() boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo)) - if (query != null) { - boolQueryBuilder.must(QueryBuilders.queryStringQuery(query)) - } - if (!docIds.isNullOrEmpty()) { boolQueryBuilder.filter(QueryBuilders.termsQuery("_id", docIds)) } @@ -640,46 +838,66 @@ object DocumentLevelMonitorRunner : MonitorRunner() { .source( SearchSourceBuilder() .version(true) + .sort("_seq_no", SortOrder.DESC) + .seqNoAndPrimaryTerm(true) .query(boolQueryBuilder) - .size(10000) // fixme: make this configurable. + .size(monitorCtx.docLevelMonitorShardFetchSize) ) + .preference(Preference.PRIMARY_FIRST.type()) + + if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) { + request.source().fetchSource(false) + for (field in fieldsToFetch) { + request.source().fetchField(field) + } + } val response: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(request, it) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search shard: $shard") + throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } + nonPercolateSearchesTimeTakenStat += response.took.millis return response.hits } - private suspend fun getMatchedQueries( + /** Executes percolate query on the docs against the monitor's query index and return the hits from the search response*/ + private suspend fun runPercolateQueryOnTransformedDocs( monitorCtx: MonitorRunnerExecutionContext, - docs: List, + docs: MutableList>, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String, - concreteIndex: String + concreteIndices: List, + monitorInputIndices: List, ): SearchHits { - val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) - - val percolateQueryBuilder = PercolateQueryBuilderExt("query", docs, XContentType.JSON) + val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) + val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) + val percolateQueryBuilder = + PercolateQueryBuilderExt("query", docs.map { it.second.docSource }, XContentType.JSON) if (monitor.id.isNotEmpty()) { boolQueryBuilder.must(QueryBuilders.matchQuery("monitor_id", monitor.id).operator(Operator.AND)) } boolQueryBuilder.filter(percolateQueryBuilder) - - val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] - if (queryIndex == null) { - val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" + val queryIndices = + docs.map { monitorMetadata.sourceToQueryIndexMapping[it.second.indexName + monitor.id] }.distinct() + if (queryIndices.isEmpty()) { + val message = + "Monitor ${monitor.id}: Failed to resolve query Indices from source indices during monitor execution!" + + " sourceIndices: $monitorInputIndices" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) ) } - val searchRequest = SearchRequest(queryIndex) + + val searchRequest = + SearchRequest().indices(*queryIndices.toTypedArray()).preference(Preference.PRIMARY_FIRST.type()) val searchSourceBuilder = SearchSourceBuilder() searchSourceBuilder.query(boolQueryBuilder) searchRequest.source(searchSourceBuilder) - + logger.debug( + "Monitor ${monitor.id}: " + + "Executing percolate query for docs from source indices " + + "$monitorInputIndices against query index $queryIndices" + ) var response: SearchResponse try { response = monitorCtx.client!!.suspendUntil { @@ -687,42 +905,77 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } } catch (e: Exception) { throw IllegalStateException( - "Failed to run percolate search for sourceIndex [$index] and queryIndex [$queryIndex] for ${docs.size} document(s)", e + "Monitor ${monitor.id}:" + + " Failed to run percolate search for sourceIndex [${concreteIndices.joinToString()}] " + + "and queryIndex [${queryIndices.joinToString()}] for ${docs.size} document(s)", + e ) } if (response.status() !== RestStatus.OK) { - throw IOException("Failed to search percolate index: $queryIndex") + throw IOException( + "Monitor ${monitor.id}: Failed to search percolate index: ${queryIndices.joinToString()}. " + + "Response status is ${response.status()}" + ) } + logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") + percolateQueriesTimeTakenStat += response.took.millis return response.hits } + /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ + private fun buildShouldClausesOverPerIndexMatchQueries(indices: List): BoolQueryBuilder { + val boolQueryBuilder = QueryBuilders.boolQuery() + indices.forEach { boolQueryBuilder.should(QueryBuilders.matchQuery("index", it)) } + return boolQueryBuilder + } - private fun getAllDocs( + /** Transform field names and index names in all the search hits to format required to run percolate search against them. + * Hits are transformed using method transformDocumentFieldNames() */ + private fun transformSearchHitsAndReconstructDocs( hits: SearchHits, index: String, concreteIndex: String, monitorId: String, - conflictingFields: List - ): List> { - return hits.map { hit -> - val sourceMap = hit.sourceAsMap - - transformDocumentFieldNames( - sourceMap, - conflictingFields, - "_${index}_$monitorId", - "_${concreteIndex}_$monitorId", - "" - ) - - var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) - - val sourceRef = BytesReference.bytes(xContentBuilder) - - logger.debug("Document [${hit.id}] payload after transform: ", sourceRef.utf8ToString()) + conflictingFields: List, + ): List> { + return hits.mapNotNull(fun(hit: SearchHit): Pair? { + try { + val sourceMap = if (hit.hasSource()) { + hit.sourceAsMap + } else { + constructSourceMapFromFieldsInHit(hit) + } + transformDocumentFieldNames( + sourceMap, + conflictingFields, + "_${index}_$monitorId", + "_${concreteIndex}_$monitorId", + "" + ) + var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) + val sourceRef = BytesReference.bytes(xContentBuilder) + docsSizeOfBatchInBytes += sourceRef.ramBytesUsed() + totalDocsSizeInBytesStat += sourceRef.ramBytesUsed() + return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) + } catch (e: Exception) { + logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) + // skip any document which we fail to transform because we anyway won't be able to run percolate queries on them. + return null + } + }) + } - Pair(hit.id, sourceRef) + private fun constructSourceMapFromFieldsInHit(hit: SearchHit): MutableMap { + if (hit.fields == null) + return mutableMapOf() + val sourceMap: MutableMap = mutableMapOf() + for (field in hit.fields) { + if (field.value.values != null && field.value.values.isNotEmpty()) + if (field.value.values.size == 1) { + sourceMap[field.key] = field.value.values[0] + } else sourceMap[field.key] = field.value.values } + return sourceMap } /** @@ -775,4 +1028,33 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } jsonAsMap.putAll(tempMap) } + + /** + * Returns true, if the docs fetched from shards thus far amount to less than threshold + * amount of percentage (default:10. setting is dynamic and configurable) of the total heap size or not. + * + */ + private fun isInMemoryDocsSizeExceedingMemoryLimit(docsBytesSize: Long, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var thresholdPercentage = monitorCtx.percQueryDocsSizeMemoryPercentageLimit + val heapMaxBytes = monitorCtx.jvmStats!!.mem.heapMax.bytes + val thresholdBytes = (thresholdPercentage.toDouble() / 100.0) * heapMaxBytes + + return docsBytesSize > thresholdBytes + } + + private fun isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs: Int, monitorCtx: MonitorRunnerExecutionContext): Boolean { + var maxNumDocsThreshold = monitorCtx.percQueryMaxNumDocsInMemory + return numDocs >= maxNumDocsThreshold + } + + /** + * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name + * and doc source. A list of these POJOs would be passed to percolate query execution logic. + */ + data class TransformedDocDto( + var indexName: String, + var concreteIndexName: String, + var docId: String, + var docSource: BytesReference + ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 753b18a94..cfa10431c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchSecurityException +import org.opensearch.OpenSearchStatusException import org.opensearch.action.DocWriteRequest import org.opensearch.action.DocWriteResponse import org.opensearch.action.admin.indices.get.GetIndexRequest @@ -28,6 +29,7 @@ import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.IndexUtils import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings @@ -77,35 +79,51 @@ object MonitorMetadataService : @Suppress("ComplexMethod", "ReturnCount") suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata { try { - val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true")))) - .id(metadata.id) - .routing(metadata.monitorId) - .setIfSeqNo(metadata.seqNo) - .setIfPrimaryTerm(metadata.primaryTerm) - .timeout(indexTimeout) + if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) { + val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .source( + metadata.toXContent( + XContentFactory.jsonBuilder(), + ToXContent.MapParams(mapOf("with_type" to "true")) + ) + ) + .id(metadata.id) + .routing(metadata.monitorId) + .setIfSeqNo(metadata.seqNo) + .setIfPrimaryTerm(metadata.primaryTerm) + .timeout(indexTimeout) - if (updating) { - indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) - } else { - indexRequest.opType(DocWriteRequest.OpType.CREATE) - } - val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } - when (response.result) { - DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { - val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result" - log.error(failureReason) - throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason)) + if (updating) { + indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm) + } else { + indexRequest.opType(DocWriteRequest.OpType.CREATE) } - DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { - log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + val response: IndexResponse = client.suspendUntil { index(indexRequest, it) } + when (response.result) { + DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> { + val failureReason = + "The upsert metadata call failed with a ${response.result?.lowercase} result" + log.error(failureReason) + throw AlertingException( + failureReason, + RestStatus.INTERNAL_SERVER_ERROR, + IllegalStateException(failureReason) + ) + } + + DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> { + log.debug("Successfully upserted MonitorMetadata:${metadata.id} ") + } } + return metadata.copy( + seqNo = response.seqNo, + primaryTerm = response.primaryTerm + ) + } else { + val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata" + throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR) } - return metadata.copy( - seqNo = response.seqNo, - primaryTerm = response.primaryTerm - ) } catch (e: Exception) { throw AlertingException.wrap(e) } @@ -216,11 +234,19 @@ object MonitorMetadataService : val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf() try { if (index == null) return mutableMapOf() - val getIndexRequest = GetIndexRequest().indices(index) - val getIndexResponse: GetIndexResponse = client.suspendUntil { - client.admin().indices().getIndex(getIndexRequest, it) + + val indices = mutableListOf() + if (IndexUtils.isAlias(index, clusterService.state()) || + IndexUtils.isDataStream(index, clusterService.state()) + ) { + IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) } + } else { + val getIndexRequest = GetIndexRequest().indices(index) + val getIndexResponse: GetIndexResponse = client.suspendUntil { + client.admin().indices().getIndex(getIndexRequest, it) + } + indices.addAll(getIndexResponse.indices()) } - val indices = getIndexResponse.indices() indices.forEach { indexName -> if (!lastRunContext.containsKey(indexName)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 41a26bb79..2aed10a9a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.alerting.alerts.AlertIndices +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings @@ -18,6 +19,7 @@ import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings import org.opensearch.common.unit.TimeValue import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.ScriptService import org.opensearch.threadpool.ThreadPool @@ -36,6 +38,7 @@ data class MonitorRunnerExecutionContext( var alertService: AlertService? = null, var docLevelMonitorQueries: DocLevelMonitorQueries? = null, var workflowService: WorkflowService? = null, + var jvmStats: JvmStats? = null, @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, @@ -47,5 +50,13 @@ data class MonitorRunnerExecutionContext( @Volatile var destinationContextFactory: DestinationContextFactory? = null, @Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, - @Volatile var indexTimeout: TimeValue? = null + @Volatile var indexTimeout: TimeValue? = null, + @Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + @Volatile var fetchOnlyQueryFieldNames: Boolean = true, + @Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, + @Volatile var percQueryDocsSizeMemoryPercentageLimit: Int = + AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT, + @Volatile var docLevelMonitorShardFetchSize: Int = + AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, + @Volatile var lockService: LockService? = null ) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 4fdaa67fc..0cc6f92de 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -18,17 +18,26 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.ScheduledJobIndices +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.MonitorRunResult import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.script.TriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED +import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE +import org.opensearch.alerting.settings.AlertingSettings.Companion.FINDINGS_INDEXING_BATCH_SIZE import org.opensearch.alerting.settings.AlertingSettings.Companion.INDEX_TIMEOUT import org.opensearch.alerting.settings.AlertingSettings.Companion.MAX_ACTIONABLE_ALERT_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT import org.opensearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT +import org.opensearch.alerting.settings.AlertingSettings.Companion.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY import org.opensearch.alerting.settings.DestinationSettings.Companion.ALLOW_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.HOST_DENY_LIST import org.opensearch.alerting.settings.DestinationSettings.Companion.loadDestinationSettings @@ -48,6 +57,7 @@ import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.util.isBucketLevelMonitor import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.Script import org.opensearch.script.ScriptService import org.opensearch.script.TemplateScript @@ -132,6 +142,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerJvmStats(jvmStats: JvmStats): MonitorRunnerService { + this.monitorCtx.jvmStats = jvmStats + return this + } + // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( @@ -169,6 +184,35 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon monitorCtx.indexTimeout = INDEX_TIMEOUT.get(monitorCtx.settings) + monitorCtx.findingsIndexBatchSize = FINDINGS_INDEXING_BATCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE) { + monitorCtx.findingsIndexBatchSize = it + } + + monitorCtx.fetchOnlyQueryFieldNames = DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED) { + monitorCtx.fetchOnlyQueryFieldNames = it + } + + monitorCtx.percQueryMaxNumDocsInMemory = PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY) { + monitorCtx.percQueryMaxNumDocsInMemory = it + } + + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = + PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT) { + monitorCtx.percQueryDocsSizeMemoryPercentageLimit = it + } + + monitorCtx.docLevelMonitorShardFetchSize = + DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.get(monitorCtx.settings) + monitorCtx.clusterService!!.clusterSettings + .addSettingsUpdateConsumer(DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE) { + monitorCtx.docLevelMonitorShardFetchSize = it + } + return this } @@ -180,6 +224,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerLockService(lockService: LockService): MonitorRunnerService { + monitorCtx.lockService = lockService + return this + } + // Updates destination settings when the reload API is called so that new keystore values are visible fun reloadDestinationSettings(settings: Settings) { monitorCtx.destinationSettings = loadDestinationSettings(settings) @@ -251,12 +300,40 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { - runJob(job, periodStart, periodEnd, false) + var lock: LockModel? = null + try { + lock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, it) + } ?: return@launch + logger.debug("lock ${lock!!.lockId} acquired") + logger.debug( + "PERF_DEBUG: executing workflow ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + runJob(job, periodStart, periodEnd, false) + } finally { + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } + logger.debug("lock ${lock!!.lockId} released") + } } } is Monitor -> { launch { - runJob(job, periodStart, periodEnd, false) + var lock: LockModel? = null + try { + lock = monitorCtx.client!!.suspendUntil { + monitorCtx.lockService!!.acquireLock(job, it) + } ?: return@launch + logger.debug("lock ${lock!!.lockId} acquired") + logger.debug( + "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + + monitorCtx.clusterService!!.state().nodes().localNode.id + ) + runJob(job, periodStart, periodEnd, false) + } finally { + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } + logger.debug("lock ${lock!!.lockId} released") + } } } else -> { @@ -300,7 +377,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else { QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt deleted file mode 100644 index 0caad1f4a..000000000 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentExecutionContext.kt +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ - -package org.opensearch.alerting.model - -import org.opensearch.commons.alerting.model.DocLevelQuery - -data class DocumentExecutionContext( - val queries: List, - val lastRunContext: Map, - val updatedLastRunContext: Map -) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt new file mode 100644 index 000000000..e7aa707f9 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/IndexExecutionContext.kt @@ -0,0 +1,19 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.model.DocLevelQuery + +/** DTO that contains all the necessary context for fetching data from shard and performing percolate queries */ +data class IndexExecutionContext( + val queries: List, + val lastRunContext: MutableMap, + val updatedLastRunContext: MutableMap, + val indexName: String, + val concreteIndexName: String, + val conflictingFields: List, + val docIds: List? = null, +) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt index 8c96b1b4a..b78063b11 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/service/DeleteMonitorService.kt @@ -23,6 +23,8 @@ import org.opensearch.action.support.IndicesOptions import org.opensearch.action.support.WriteRequest.RefreshPolicy import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.alerting.MonitorMetadataService +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -48,11 +50,14 @@ object DeleteMonitorService : private val log = LogManager.getLogger(this.javaClass) private lateinit var client: Client + private lateinit var lockService: LockService fun initialize( client: Client, + lockService: LockService ) { DeleteMonitorService.client = client + DeleteMonitorService.lockService = lockService } /** @@ -64,6 +69,7 @@ object DeleteMonitorService : val deleteResponse = deleteMonitor(monitor.id, refreshPolicy) deleteDocLevelMonitorQueriesAndIndices(monitor) deleteMetadata(monitor) + deleteLock(monitor) return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version) } @@ -147,6 +153,10 @@ object DeleteMonitorService : } } + private suspend fun deleteLock(monitor: Monitor) { + client.suspendUntil { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) } + } + /** * Checks if the monitor is part of the workflow * diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 7dd90b106..7f0f3793f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -17,6 +17,10 @@ class AlertingSettings { companion object { const val DEFAULT_MAX_ACTIONABLE_ALERT_COUNT = 50L + const val DEFAULT_FINDINGS_INDEXING_BATCH_SIZE = 1000 + const val DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY = 50000 + const val DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = 10 + const val DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = 10000 val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", @@ -25,6 +29,49 @@ class AlertingSettings { Setting.Property.Dynamic ) + /** Defines the threshold percentage of heap size in bytes till which we accumulate docs in memory before we query against percolate query + * index in document level monitor execution. + */ + val PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_docs_size_memory_percentage_limit", + 10, + 0, + 100, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Purely a setting used to verify seq_no calculation + */ + val DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE = Setting.intSetting( + "plugins.alerting.monitor.doc_level_monitor_shard_fetch_size", + DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE, + 1, + 10000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** Defines the threshold of the maximum number of docs accumulated in memory to query against percolate query index in document + * level monitor execution. The docs are being collected from searching on shards of indices mentioned in the + * monitor input indices field. When the number of in-memory docs reaches or exceeds threshold we immediately perform percolate + * query with the current set of docs and clear the cache and repeat the process till we have queried all indices in current + * execution + */ + val PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY = Setting.intSetting( + "plugins.alerting.monitor.percolate_query_max_num_docs_in_memory", + DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY, 1000, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + + /** + * Boolean setting to enable/disable optimizing doc level monitors by fetchign only fields mentioned in queries. + * Enabled by default. If disabled, will fetch entire source of documents while fetch data from shards. + */ + val DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED = Setting.boolSetting( + "plugins.alerting.monitor.doc_level_monitor_query_field_names_enabled", + true, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) + val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, @@ -176,5 +223,12 @@ class AlertingSettings { Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( + "plugins.alerting.alert_findings_indexing_batch_size", + DEFAULT_FINDINGS_INDEXING_BATCH_SIZE, + 1, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt index 593582d9c..796503073 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDeleteWorkflowAction.kt @@ -24,6 +24,8 @@ import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters import org.opensearch.action.support.HandledTransportAction import org.opensearch.action.support.WriteRequest.RefreshPolicy +import org.opensearch.alerting.core.lock.LockModel +import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.MonitorMetadata import org.opensearch.alerting.model.WorkflowMetadata import org.opensearch.alerting.opensearchapi.addFilter @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor( val clusterService: ClusterService, val settings: Settings, val xContentRegistry: NamedXContentRegistry, + val lockService: LockService ) : HandledTransportAction( AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest ), @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor( } catch (t: Exception) { log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t) } + try { + // Delete the workflow lock + client.suspendUntil { lockService.deleteLock(LockModel.generateLockId(workflowId), it) } + } catch (t: Exception) { + log.error("Failed to delete workflow lock for $workflowId") + } actionListener.onResponse(deleteWorkflowResponse) } else { actionListener.onFailure( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index ae9222155..a62af2eaf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -198,7 +198,15 @@ class TransportIndexMonitorAction @Inject constructor( else (it as DocLevelMonitorInput).indices indices.addAll(inputIndices) } - val searchRequest = SearchRequest().indices(*indices.toTypedArray()) + val updatedIndices = indices.map { index -> + if (IndexUtils.isAlias(index, clusterService.state()) || IndexUtils.isDataStream(index, clusterService.state())) { + val metadata = clusterService.state().metadata.indicesLookup[index]?.writeIndex + metadata?.index?.name ?: index + } else { + index + } + } + val searchRequest = SearchRequest().indices(*updatedIndices.toTypedArray()) .source(SearchSourceBuilder.searchSource().size(1).query(QueryBuilders.matchAllQuery())) client.search( searchRequest, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index e5ad5578c..5bb11e769 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -9,10 +9,13 @@ import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper import org.opensearch.OpenSearchStatusException import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.ActionListener import org.opensearch.action.admin.indices.alias.Alias import org.opensearch.action.admin.indices.create.CreateIndexRequest import org.opensearch.action.admin.indices.create.CreateIndexResponse import org.opensearch.action.admin.indices.delete.DeleteIndexRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsRequest +import org.opensearch.action.admin.indices.exists.indices.IndicesExistsResponse import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest import org.opensearch.action.admin.indices.rollover.RolloverRequest import org.opensearch.action.admin.indices.rollover.RolloverResponse @@ -39,7 +42,14 @@ import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.index.mapper.MapperService.INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING +import org.opensearch.index.query.QueryBuilders +import org.opensearch.index.reindex.BulkByScrollResponse +import org.opensearch.index.reindex.DeleteByQueryAction +import org.opensearch.index.reindex.DeleteByQueryRequestBuilder import org.opensearch.rest.RestStatus +import kotlin.coroutines.resume +import kotlin.coroutines.resumeWithException +import kotlin.coroutines.suspendCoroutine private val log = LogManager.getLogger(DocLevelMonitorQueries::class.java) @@ -134,6 +144,42 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ return true } + suspend fun deleteDocLevelQueriesOnDryRun(monitorMetadata: MonitorMetadata) { + try { + monitorMetadata.sourceToQueryIndexMapping.forEach { (_, queryIndex) -> + val indicesExistsResponse: IndicesExistsResponse = + client.suspendUntil { + client.admin().indices().exists(IndicesExistsRequest(queryIndex), it) + } + if (indicesExistsResponse.isExists == false) { + return + } + + val queryBuilder = QueryBuilders.boolQuery() + .must(QueryBuilders.existsQuery("monitor_id")) + .mustNot(QueryBuilders.wildcardQuery("monitor_id", "*")) + + val response: BulkByScrollResponse = suspendCoroutine { cont -> + DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) + .source(queryIndex) + .filter(queryBuilder) + .refresh(true) + .execute( + object : ActionListener { + override fun onResponse(response: BulkByScrollResponse) = cont.resume(response) + override fun onFailure(t: Exception) = cont.resumeWithException(t) + } + ) + } + response.bulkFailures.forEach { + log.error("Failed deleting queries while removing dry run queries: [${it.id}] cause: [${it.cause}] ") + } + } + } catch (e: Exception) { + log.error("Failed to delete doc level queries on dry run", e) + } + } + fun docLevelQueryIndexExists(dataSources: DataSources): Boolean { val clusterState = clusterService.state() return clusterState.metadata.hasAlias(dataSources.queryIndex) @@ -207,11 +253,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Run through each backing index and apply appropriate mappings to query index indices.forEach { indexName -> - val concreteIndices = IndexUtils.resolveAllIndices( + var concreteIndices = IndexUtils.resolveAllIndices( listOf(indexName), monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || + IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) + ) { + val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) } + if (lastWriteIndex != null) { + val lastWriteIndexCreationDate = + IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state()) + concreteIndices = IndexUtils.getNewestIndicesByCreationDate( + concreteIndices, + monitorCtx.clusterService!!.state(), + lastWriteIndexCreationDate + ) + } + } val updatedIndexName = indexName.replace("*", "_") val updatedProperties = mutableMapOf() val allFlattenPaths = mutableSetOf>() diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 3b11f795f..b36aba597 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -13,6 +13,7 @@ import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.client.IndicesAdminClient import org.opensearch.cluster.ClusterState +import org.opensearch.cluster.metadata.IndexAbstraction import org.opensearch.cluster.metadata.IndexMetadata import org.opensearch.cluster.metadata.IndexNameExpressionResolver import org.opensearch.cluster.service.ClusterService @@ -153,5 +154,47 @@ class IndexUtils { return result } + + @JvmStatic + fun isDataStream(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().dataStreams().containsKey(name) + } + + @JvmStatic + fun isAlias(name: String, clusterState: ClusterState): Boolean { + return clusterState.metadata().hasAlias(name) + } + + @JvmStatic + fun getWriteIndex(index: String, clusterState: ClusterState): String? { + if (isAlias(index, clusterState) || isDataStream(index, clusterState)) { + val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex + if (metadata != null) { + return metadata.index.name + } + } + return null + } + + @JvmStatic + fun getNewestIndicesByCreationDate(concreteIndices: List, clusterState: ClusterState, thresholdDate: Long): List { + val filteredIndices = mutableListOf() + val lookup = clusterState.metadata().indicesLookup + concreteIndices.forEach { indexName -> + val index = lookup[indexName] + val indexMetadata = clusterState.metadata.index(indexName) + if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) { + if (indexMetadata.creationDate >= thresholdDate) { + filteredIndices.add(indexName) + } + } + } + return filteredIndices + } + + @JvmStatic + fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long { + return clusterState.metadata.index(index).creationDate + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 118cfebb9..3a0f9a4de 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -244,7 +244,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { executionId ) } else if (delegateMonitor.isDocLevelMonitor()) { - return DocumentLevelMonitorRunner.runMonitor( + return DocumentLevelMonitorRunner().runMonitor( delegateMonitor, monitorCtx, periodStart, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index 3ffc39478..3c396e537 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -1,7 +1,7 @@ { "dynamic": "strict", "_meta" : { - "schema_version": 3 + "schema_version": 4 }, "properties": { "schema_version": { @@ -46,6 +46,9 @@ "type" : "keyword" } } + }, + "query_field_names": { + "type": "keyword" } } }, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index d58b15c75..9f2f6f7b9 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler import javax.management.ObjectName import javax.management.remote.JMXConnectorFactory import javax.management.remote.JMXServiceURL +import kotlin.collections.ArrayList +import kotlin.collections.HashMap /** * Superclass for tests that interact with an external test cluster using OpenSearch's RestClient @@ -909,7 +911,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response { val requestBody = StringEntity(doc, APPLICATION_JSON) val params = if (refresh) mapOf("refresh" to "true") else mapOf() - val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody) + val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody) assertTrue( "Unable to index doc: '${doc.take(15)}...' to index: '$index'", listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus()) @@ -945,6 +947,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return index } + protected fun createTestIndex(index: String, mapping: String?, alias: String): String { + createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias) + return index + } + protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String { try { createIndex( @@ -981,7 +988,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val indicesMap = mutableMapOf() val indicesJson = jsonBuilder().startObject().startArray("actions") indices.keys.map { - val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "") + val indexName = createTestIndex(index = it, mapping = "") val isWriteIndex = indices.getOrDefault(indexName, false) indicesMap[indexName] = isWriteIndex val indexMap = mapOf( @@ -998,17 +1005,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return mutableMapOf(alias to indicesMap) } + protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) { + val indexPattern = "$datastream*" + var componentTemplateMappings = "\"properties\": {" + + " \"netflow.destination_transport_port\":{ \"type\": \"long\" }," + + " \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" + + "}" + if (mappings != null) { + componentTemplateMappings = mappings + } + if (useComponentTemplate) { + // Setup index_template + createComponentTemplateWithMappings( + "my_ds_component_template-$datastream", + componentTemplateMappings + ) + } + createComposableIndexTemplate( + "my_index_template_ds-$datastream", + listOf(indexPattern), + (if (useComponentTemplate) "my_ds_component_template-$datastream" else null), + mappings, + true, + 0 + ) + createDataStream(datastream) + } + + protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) { + client().makeRequest("PUT", "_data_stream/$datastream") + } + + protected fun deleteDataStream(datastream: String) { + client().makeRequest("DELETE", "_data_stream/$datastream") + } + + protected fun createIndexAlias(alias: String, mappings: String?) { + val indexPattern = "$alias*" + var componentTemplateMappings = "\"properties\": {" + + " \"netflow.destination_transport_port\":{ \"type\": \"long\" }," + + " \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" + + "}" + if (mappings != null) { + componentTemplateMappings = mappings + } + createComponentTemplateWithMappings( + "my_alias_component_template-$alias", + componentTemplateMappings + ) + createComposableIndexTemplate( + "my_index_template_alias-$alias", + listOf(indexPattern), + "my_alias_component_template-$alias", + mappings, + false, + 0 + ) + createTestIndex( + "$alias-000001", + null, + """ + "$alias": { + "is_write_index": true + } + """.trimIndent() + ) + } + + protected fun deleteIndexAlias(alias: String) { + client().makeRequest("DELETE", "$alias*/_alias/$alias") + } + + protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) { + val body = """{"template" : { "mappings": {$mappings} }}""" + client().makeRequest( + "PUT", + "_component_template/$componentTemplateName", + emptyMap(), + StringEntity(body, ContentType.APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun createComposableIndexTemplate( + templateName: String, + indexPatterns: List, + componentTemplateName: String?, + mappings: String?, + isDataStream: Boolean, + priority: Int + ) { + var body = "{\n" + if (isDataStream) { + body += "\"data_stream\": { }," + } + body += "\"index_patterns\": [" + + indexPatterns.stream().collect( + Collectors.joining(",", "\"", "\"") + ) + "]," + if (componentTemplateName == null) { + body += "\"template\": {\"mappings\": {$mappings}}," + } + if (componentTemplateName != null) { + body += "\"composed_of\": [\"$componentTemplateName\"]," + } + body += "\"priority\":$priority}" + client().makeRequest( + "PUT", + "_index_template/$templateName", + emptyMap(), + StringEntity(body, APPLICATION_JSON), + BasicHeader("Content-Type", "application/json") + ) + } + + protected fun getDatastreamWriteIndex(datastream: String): String { + val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null) + var respAsMap = responseAsMap(response) + if (respAsMap.containsKey("data_streams")) { + respAsMap = (respAsMap["data_streams"] as ArrayList>)[0] + val indices = respAsMap["indices"] as List> + val index = indices.last() + return index["index_name"] as String + } else { + respAsMap = respAsMap[datastream] as Map + } + val indices = respAsMap["indices"] as Array + return indices.last() + } + + protected fun rolloverDatastream(datastream: String) { + client().makeRequest( + "POST", + datastream + "/_rollover", + emptyMap(), + null + ) + } + protected fun randomAliasIndices( alias: String, num: Int = randomIntBetween(1, 10), includeWriteIndex: Boolean = true, ): Map { val indices = mutableMapOf() - val writeIndex = randomIntBetween(0, num) + val writeIndex = randomIntBetween(0, num - 1) for (i: Int in 0 until num) { - var indexName = randomAlphaOfLength(10) + var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) while (indexName.equals(alias) || indices.containsKey(indexName)) - indexName = randomAlphaOfLength(10) + indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT) indices[indexName] = includeWriteIndex && i == writeIndex } return indices diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index 9035a9c4e..c21310f58 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -10,6 +10,8 @@ import org.apache.http.entity.StringEntity import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN +import org.opensearch.alerting.core.lock.LockService +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.client.Response import org.opensearch.client.ResponseException import org.opensearch.common.xcontent.json.JsonXContent @@ -17,16 +19,20 @@ import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope import org.opensearch.rest.RestStatus import org.opensearch.script.Script +import org.opensearch.test.OpenSearchTestCase import java.time.ZonedDateTime import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.time.temporal.ChronoUnit.MILLIS import java.util.Locale +import java.util.concurrent.TimeUnit class DocumentMonitorRunnerIT : AlertingRestTestCase() { @@ -73,6 +79,212 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val alerts = searchAlerts(monitor) assertEquals("Alert saved for test monitor", 0, alerts.size) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } + } + + fun `test dryrun execute monitor with queryFieldNames set up with correct field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", queryFieldNames = listOf("test_field")) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test seq_no calculation correctness when docs are deleted`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE.key, 2) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + + val docQuery = + DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) + indexDoc(index, "11", testDoc) + indexDoc(index, "21", testDoc) + indexDoc(index, "31", testDoc) + indexDoc(index, "41", testDoc) + indexDoc(index, "51", testDoc) + + deleteDoc(index, "51") + val response = executeMonitor(monitor, params = mapOf("dryrun" to "false")) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(9, triggerResult.objectMap("action_results").values.size) + } + } + + fun `test dryrun execute monitor with queryFieldNames set up with wrong field`() { + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + indexDoc(index, "2", testDoc) + indexDoc(index, "3", testDoc) + indexDoc(index, "4", testDoc) + indexDoc(index, "5", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(0, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) + } + + fun `test fetch_query_field_names setting is disabled by configuring queryFieldNames set up with wrong field still works`() { + adminClient().updateSettings(AlertingSettings.DOC_LEVEL_MONITOR_FETCH_ONLY_QUERY_FIELDS_ENABLED.key, "false") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val index = createTestIndex() + // using wrong field name + val docQuery = DocLevelQuery( + query = "test_field:\"us-west-2\"", + name = "3", + queryFieldNames = listOf("wrong_field") + ) + val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + + indexDoc(index, "1", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + assertEquals(monitor.name, output["monitor_name"]) + + assertEquals(1, output.objectMap("trigger_results").values.size) + + for (triggerResult in output.objectMap("trigger_results").values) { + assertEquals(1, triggerResult.objectMap("action_results").values.size) + for (alertActionResult in triggerResult.objectMap("action_results").values) { + for (actionResult in alertActionResult.values) { + @Suppress("UNCHECKED_CAST") val actionOutput = (actionResult as Map>)["output"] + as Map + assertEquals("Hello ${monitor.name}", actionOutput["subject"]) + assertEquals("Hello ${monitor.name}", actionOutput["message"]) + } + } + } + + val alerts = searchAlerts(monitor) + assertEquals("Alert saved for test monitor", 0, alerts.size) } fun `test execute monitor returns search result with dryrun`() { @@ -105,6 +317,120 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) assertTrue("Incorrect search result", matchingDocsToQuery.contains("5|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 0L, it.value) } + } + + fun `test execute monitor returns search result with dryrun then without dryrun ensure dry run query not saved`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "2", testDoc) + + val response = executeMonitor(monitor, params = DRYRUN_MONITOR) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("1|$testIndex")) + assertTrue("Incorrect search result", matchingDocsToQuery.contains("2|$testIndex")) + + // ensure doc level query is deleted on dry run + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(0L, it.value) } + + // create and execute second monitor not as dryrun + val testIndex2 = createTestIndex("test1") + val testTime2 = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc2 = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime2", + "test_field" : "us-east-1" + }""" + + val docQuery2 = DocLevelQuery(query = "test_field:\"us-east-1\"", name = "3") + val docLevelInput2 = DocLevelMonitorInput("description", listOf(testIndex2), listOf(docQuery2)) + + val trigger2 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor2 = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput2), triggers = listOf(trigger2))) + assertNotNull(monitor2.id) + + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex2, "5", testDoc2) + + val response2 = executeMonitor(monitor2.id) + val output2 = entityAsMap(response2) + + assertEquals(monitor2.name, output2["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult2 = (output2.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery2 = searchResult2[docQuery2.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery2.size) + assertTrue("Incorrect search result", matchingDocsToQuery2.containsAll(listOf("1|$testIndex2", "5|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor2) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor2) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) + + // ensure query from second monitor was saved + val expectedQueries = listOf("test_field_test1_${monitor2.id}:\"us-east-1\"") + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + searchResponse.hits.totalHits?.let { assertEquals("Query saved in query index", 1L, it.value) } } fun `test execute monitor generates alerts and findings`() { @@ -125,26 +451,106 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(testIndex, "1", testDoc) indexDoc(testIndex, "5", testDoc) - - val response = executeMonitor(monitor.id) - - val output = entityAsMap(response) - - assertEquals(monitor.name, output["monitor_name"]) - @Suppress("UNCHECKED_CAST") - val searchResult = (output.objectMap("input_results")["results"] as List>).first() - @Suppress("UNCHECKED_CAST") - val matchingDocsToQuery = searchResult[docQuery.id] as List - assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) - assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) - - val alerts = searchAlertsWithFilter(monitor) - assertEquals("Alert saved for test monitor", 2, alerts.size) - - val findings = searchFindings(monitor) - assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) + } + + fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val inputMap = HashMap() + inputMap["searchString"] = monitor.name + + val responseMap = getAlerts(inputMap).asMap() + val alerts = (responseMap["alerts"] as ArrayList>) + alerts.forEach { + assertTrue(it["error_message"] == null) + } + } + + @AwaitsFix(bugUrl = "") + fun `test monitor run generate lock and monitor delete removes lock`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + OpenSearchTestCase.waitUntil({ + val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME) + return@waitUntil (response.restStatus().status == 200) + }, 240, TimeUnit.SECONDS) + + var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search") + var responseMap = entityAsMap(response) + var noOfLocks = ((responseMap["hits"] as Map)["hits"] as List).size + assertEquals(1, noOfLocks) + + deleteMonitor(monitor) + refreshIndex(LockService.LOCK_INDEX_NAME) + response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search") + responseMap = entityAsMap(response) + noOfLocks = ((responseMap["hits"] as Map)["hits"] as List).size + assertEquals(0, noOfLocks) } fun `test execute monitor with tag as trigger condition generates alerts and findings`() { @@ -183,8 +589,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor input error`() { @@ -282,8 +688,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor generates alerts and findings with per trigger execution for actions`() { @@ -345,8 +751,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1")) } fun `test execute monitor with wildcard index that generates alerts and findings for EQUALS query operator`() { @@ -393,6 +799,54 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) } + fun `test execute monitor for bulk index findings`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = testQueryName) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + for (i in 0 until 9) { + indexDoc(testIndex, i.toString(), testDoc) + } + indexDoc(testIndex2, "3", testDoc) + adminClient().updateSettings("plugins.alerting.alert_findings_indexing_batch_size", 2) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Correct search result", 10, matchingDocsToQuery.size) + assertTrue("Correct search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "2|$testIndex", "3|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 10, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 10, findings.size) + val foundFindings = + findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("2") || it.relatedDocIds.contains("3") } + assertEquals("Found findings for all docs", 4, foundFindings.size) + } + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS query operator`() { val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" val testQueryName = "wildcard-test-query" @@ -1223,6 +1677,314 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { } } + fun `test document-level monitor when datastreams contain docs that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteDataStream(dataStreamName) + } + + fun `test document-level monitor when datastreams contain docs across read-only indices that do match query`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "2", testDoc) + rolloverDatastream(dataStreamName) + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "4", testDoc) + rolloverDatastream(dataStreamName) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + + indexDoc(dataStreamName, "5", testDoc) + indexDoc(dataStreamName, "6", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + deleteDataStream(dataStreamName) + } + + fun `test document-level monitor when index alias contain docs that do match query`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + ) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteIndexAlias(aliasName) + } + + fun `test document-level monitor when multiple datastreams contain docs across read-only indices that do match query`() { + val dataStreamName1 = "test-datastream1" + createDataStream( + dataStreamName1, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + val dataStreamName2 = "test-datastream2" + createDataStream( + dataStreamName2, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName2, "-1", testDoc) + rolloverDatastream(dataStreamName2) + indexDoc(dataStreamName2, "0", testDoc) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test-datastream*"), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + indexDoc(dataStreamName1, "1", testDoc) + indexDoc(dataStreamName2, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + + indexDoc(dataStreamName1, "2", testDoc) + indexDoc(dataStreamName2, "2", testDoc) + rolloverDatastream(dataStreamName1) + rolloverDatastream(dataStreamName1) + rolloverDatastream(dataStreamName2) + indexDoc(dataStreamName1, "4", testDoc) + indexDoc(dataStreamName2, "4", testDoc) + rolloverDatastream(dataStreamName1) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 4, matchingDocsToQuery.size) + + indexDoc(dataStreamName1, "5", testDoc) + indexDoc(dataStreamName1, "6", testDoc) + indexDoc(dataStreamName2, "5", testDoc) + indexDoc(dataStreamName2, "6", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 4, matchingDocsToQuery.size) + deleteDataStream(dataStreamName1) + deleteDataStream(dataStreamName2) + } + + fun `test document-level monitor ignoring old read-only indices for datastreams`() { + val dataStreamName = "test-datastream" + createDataStream( + dataStreamName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent(), + false + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(dataStreamName, "-1", testDoc) + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "0", testDoc) + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(dataStreamName), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + indexDoc(dataStreamName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(dataStreamName) + indexDoc(dataStreamName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteDataStream(dataStreamName) + } + fun `test execute monitor with non-null data sources`() { val testIndex = createTestIndex() @@ -1305,7 +2067,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteIndex(index1) deleteIndex(index2) - indexDoc(index4, "1", testDoc) + indexDoc(index4, "2", testDoc) response = executeMonitor(monitor.id) output = entityAsMap(response) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 3ddffa73b..810836bf9 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -365,6 +365,190 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) } + fun `test all fields fetched and submitted to percolate query when one of the queries doesn't have queryFieldNames`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 1) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match all 7 queries", 7, findings[0].docLevelQueries.size) + } + + fun `test percolate query failure when queryFieldNames has alias`() { + // doesn't have query field names so even if other queries pass the wrong fields to query, findings will get generated on matching docs + val docQuery1 = DocLevelQuery( + query = "source.ip.v6.v1:12345", + name = "3", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery2 = DocLevelQuery( + query = "source.ip.v6.v2:16645", + name = "4", + queryFieldNames = listOf("source.ip.v6.v2") + ) + val docQuery3 = DocLevelQuery( + query = "source.ip.v4.v0:120", + name = "5", + queryFieldNames = listOf("source.ip.v6.v4") + ) + val docQuery4 = + DocLevelQuery( + query = "alias.some.fff:\"us-west-2\"", + name = "6", + queryFieldNames = listOf("alias.some.fff") + ) + val docQuery5 = DocLevelQuery( + query = "message:\"This is an error from IAD region\"", + name = "7", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1"), + ) + val docQuery6 = + DocLevelQuery( + query = "type.subtype:\"some subtype\"", + name = "8", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docQuery7 = + DocLevelQuery( + query = "supertype.type:\"some type\"", + name = "9", + queryFieldNames = listOf("alias.some.fff", "source.ip.v6.v1") + ) + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery1, docQuery2, docQuery3, docQuery4, docQuery5, docQuery6, docQuery7) + ) + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + // Trying to test here few different "nesting" situations and "wierd" characters + val testDoc = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v1" : 12345, + "source.ip.v6.v2" : 16645, + "source.ip.v4.v0" : 120, + "test_bad_char" : "\u0000", + "test_strict_date_time" : "$testTime", + "test_field.some_other_field" : "us-west-2", + "type.subtype" : "some subtype", + "supertype.type" : "some type" + }""" + indexDoc(index, "1", testDoc) + client().admin().indices().putMapping( + PutMappingRequest(index).source("alias.some.fff", "type=alias,path=test_field.some_other_field") + ) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + val executeMonitorResponse = executeMonitor(monitor, id, false) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + val table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.size == 1) + Assert.assertTrue(getAlertsResponse.alerts[0].state.toString().equals(Alert.State.ERROR.toString())) + val findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + } + fun `test execute monitor with custom query index`() { val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt index eddbabf90..dc8a03ba8 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/alerts/AlertIndicesIT.kt @@ -75,7 +75,7 @@ class AlertIndicesIT : AlertingRestTestCase() { putFindingMappings( AlertIndices.findingMapping().trimStart('{').trimEnd('}') - .replace("\"schema_version\": 3", "\"schema_version\": 0") + .replace("\"schema_version\": 4", "\"schema_version\": 0") ) assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 0) @@ -89,7 +89,7 @@ class AlertIndicesIT : AlertingRestTestCase() { executeMonitor(trueMonitor.id) assertIndexExists(AlertIndices.FINDING_HISTORY_WRITE_INDEX) verifyIndexSchemaVersion(ScheduledJob.SCHEDULED_JOBS_INDEX, 8) - verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 3) + verifyIndexSchemaVersion(AlertIndices.FINDING_HISTORY_WRITE_INDEX, 4) } fun `test alert index gets recreated automatically if deleted`() { diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt index 531084b26..c90c58013 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/FindingsRestApiIT.kt @@ -39,7 +39,7 @@ class FindingsRestApiIT : AlertingRestTestCase() { }""" indexDoc(testIndex, "someId", testDoc) - val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf()) + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) val trueMonitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index 1ae26ac6d..22791b5aa 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -37,6 +37,8 @@ import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.junit.annotations.TestLogging import java.time.Instant +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter import java.time.temporal.ChronoUnit import java.util.Collections import java.util.Locale @@ -1185,4 +1187,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() { val findings = searchFindings(monitor.copy(id = monitorResponse.id)) assertEquals("Findings saved for test monitor", 1, findings.size) } + + fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "__lag-monitor-test__", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = false, + schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + createWorkflow( + randomWorkflow( + monitorIds = listOf(monitor.id), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + ) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + Thread.sleep(240000) + + val alerts = searchAlerts(monitor) + alerts.forEach { + assertTrue(it.errorMessage == null) + } + } } diff --git a/build.gradle b/build.gradle index 3bc2243b4..8dcd2f401 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ buildscript { apply from: 'build-tools/repositories.gradle' ext { - opensearch_version = System.getProperty("opensearch.version", "2.9.0-SNAPSHOT") + opensearch_version = System.getProperty("opensearch.version", "2.9.1-SNAPSHOT") buildVersionQualifier = System.getProperty("build.version_qualifier", "") isSnapshot = "true" == System.getProperty("build.snapshot", "true") // 2.7.0-SNAPSHOT -> 2.7.0.0-SNAPSHOT diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt new file mode 100644 index 000000000..91d144013 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockModel.kt @@ -0,0 +1,117 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.core.lock + +import org.opensearch.common.xcontent.XContentParserUtils +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.core.xcontent.XContentParser +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +class LockModel( + val lockId: String, + val scheduledJobId: String, + val lockTime: Instant, + val released: Boolean, + val seqNo: Long, + val primaryTerm: Long +) : ToXContentObject { + + constructor( + copyLock: LockModel, + seqNo: Long, + primaryTerm: Long + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + copyLock.released, + seqNo, + primaryTerm + ) + + constructor( + copyLock: LockModel, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + copyLock.lockTime, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + copyLock: LockModel, + updateLockTime: Instant, + released: Boolean + ) : this ( + copyLock.lockId, + copyLock.scheduledJobId, + updateLockTime, + released, + copyLock.seqNo, + copyLock.primaryTerm + ) + + constructor( + scheduledJobId: String, + lockTime: Instant, + released: Boolean + ) : this ( + generateLockId(scheduledJobId), + scheduledJobId, + lockTime, + released, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + .field(SCHEDULED_JOB_ID, scheduledJobId) + .field(LOCK_TIME, lockTime.epochSecond) + .field(RELEASED, released) + .endObject() + return builder + } + + companion object { + const val SCHEDULED_JOB_ID = "scheduled_job_id" + const val LOCK_TIME = "lock_time" + const val RELEASED = "released" + + fun generateLockId(scheduledJobId: String): String { + return "$scheduledJobId-lock" + } + + @JvmStatic + @JvmOverloads + @Throws(IOException::class) + fun parse(xcp: XContentParser, seqNo: Long, primaryTerm: Long): LockModel { + lateinit var scheduledJobId: String + lateinit var lockTime: Instant + var released: Boolean = false + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + SCHEDULED_JOB_ID -> scheduledJobId = xcp.text() + LOCK_TIME -> lockTime = Instant.ofEpochSecond(xcp.longValue()) + RELEASED -> released = xcp.booleanValue() + } + } + return LockModel(generateLockId(scheduledJobId), scheduledJobId, lockTime, released, seqNo, primaryTerm) + } + } +} diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt new file mode 100644 index 000000000..03268eb83 --- /dev/null +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt @@ -0,0 +1,311 @@ +package org.opensearch.alerting.core.lock + +import org.apache.logging.log4j.LogManager +import org.opensearch.ResourceAlreadyExistsException +import org.opensearch.action.ActionListener +import org.opensearch.action.DocWriteResponse +import org.opensearch.action.admin.indices.create.CreateIndexRequest +import org.opensearch.action.admin.indices.create.CreateIndexResponse +import org.opensearch.action.delete.DeleteRequest +import org.opensearch.action.delete.DeleteResponse +import org.opensearch.action.get.GetRequest +import org.opensearch.action.get.GetResponse +import org.opensearch.action.index.IndexRequest +import org.opensearch.action.index.IndexResponse +import org.opensearch.action.update.UpdateRequest +import org.opensearch.action.update.UpdateResponse +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.settings.Settings +import org.opensearch.common.xcontent.LoggingDeprecationHandler +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.index.IndexNotFoundException +import org.opensearch.index.engine.DocumentMissingException +import org.opensearch.index.engine.VersionConflictEngineException +import org.opensearch.index.seqno.SequenceNumbers +import java.io.IOException +import java.time.Instant + +private val log = LogManager.getLogger(LockService::class.java) + +class LockService(private val client: Client, private val clusterService: ClusterService) { + private var testInstant: Instant? = null + + companion object { + const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" + + @JvmStatic + fun lockMapping(): String? { + return LockService::class.java.classLoader.getResource("mappings/opensearch-alerting-config-lock.json") + ?.readText() + } + } + + fun lockIndexExist(): Boolean { + return clusterService.state().routingTable().hasIndex(LOCK_INDEX_NAME) + } + + fun acquireLock( + scheduledJob: ScheduledJob, + listener: ActionListener + ) { + val scheduledJobId = scheduledJob.id + acquireLockWithId(scheduledJobId, listener) + } + + fun acquireLockWithId( + scheduledJobId: String, + listener: ActionListener + ) { + val lockId = LockModel.generateLockId(scheduledJobId) + createLockIndex( + object : ActionListener { + override fun onResponse(created: Boolean) { + if (created) { + try { + findLock( + lockId, + object : ActionListener { + override fun onResponse(existingLock: LockModel?) { + if (existingLock != null) { + if (isLockReleased(existingLock)) { + log.debug("lock is released or expired: {}", existingLock) + val updateLock = LockModel(existingLock, getNow(), false) + updateLock(updateLock, listener) + } else { + log.debug("Lock is NOT released or expired. {}", existingLock) + listener.onResponse(null) + } + } else { + val tempLock = LockModel(scheduledJobId, getNow(), false) + log.debug("Lock does not exist. Creating new lock {}", tempLock) + createLock(tempLock, listener) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } catch (e: VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + listener.onResponse(null) + } + } else { + listener.onResponse(null) + } + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + + private fun createLock( + tempLock: LockModel, + listener: ActionListener + ) { + try { + val request = IndexRequest(LOCK_INDEX_NAME).id(tempLock.lockId) + .source(tempLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .setIfSeqNo(SequenceNumbers.UNASSIGNED_SEQ_NO) + .setIfPrimaryTerm(SequenceNumbers.UNASSIGNED_PRIMARY_TERM) + .create(true) + client.index( + request, + object : ActionListener { + override fun onResponse(response: IndexResponse) { + listener.onResponse(LockModel(tempLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("Lock is already created. {}", e.message) + listener.onResponse(null) + return + } + listener.onFailure(e) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred creating lock", ex) + listener.onFailure(ex) + } + } + + private fun updateLock( + updateLock: LockModel, + listener: ActionListener + ) { + try { + val updateRequest = UpdateRequest().index(LOCK_INDEX_NAME) + .id(updateLock.lockId) + .setIfSeqNo(updateLock.seqNo) + .setIfPrimaryTerm(updateLock.primaryTerm) + .doc(updateLock.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .fetchSource(true) + + client.update( + updateRequest, + object : ActionListener { + override fun onResponse(response: UpdateResponse) { + listener.onResponse(LockModel(updateLock, response.seqNo, response.primaryTerm)) + } + + override fun onFailure(e: Exception) { + if (e is VersionConflictEngineException) { + log.debug("could not acquire lock {}", e.message) + } + if (e is DocumentMissingException) { + log.debug( + "Document is deleted. This happens if the job is already removed and" + " this is the last run." + "{}", + e.message + ) + } + if (e is IOException) { + log.error("IOException occurred updating lock.", e) + } + listener.onResponse(null) + } + } + ) + } catch (ex: IOException) { + log.error("IOException occurred updating lock.", ex) + listener.onResponse(null) + } + } + + fun findLock( + lockId: String, + listener: ActionListener + ) { + val getRequest = GetRequest(LOCK_INDEX_NAME).id(lockId) + client.get( + getRequest, + object : ActionListener { + override fun onResponse(response: GetResponse) { + if (!response.isExists) { + listener.onResponse(null) + } else { + try { + val parser = XContentType.JSON.xContent() + .createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.sourceAsString) + parser.nextToken() + listener.onResponse(LockModel.parse(parser, response.seqNo, response.primaryTerm)) + } catch (e: IOException) { + log.error("IOException occurred finding lock", e) + listener.onResponse(null) + } + } + } + + override fun onFailure(e: Exception) { + log.error("Exception occurred finding lock", e) + listener.onFailure(e) + } + } + ) + } + + fun release( + lock: LockModel?, + listener: ActionListener + ) { + if (lock == null) { + log.debug("Lock is null. Nothing to release.") + listener.onResponse(false) + } else { + log.debug("Releasing lock: {}", lock) + val lockToRelease = LockModel(lock, true) + updateLock( + lockToRelease, + object : ActionListener { + override fun onResponse(releasedLock: LockModel?) { + listener.onResponse(releasedLock != null) + } + + override fun onFailure(e: Exception) { + listener.onFailure(e) + } + } + ) + } + } + + fun deleteLock( + lockId: String, + listener: ActionListener + ) { + val deleteRequest = DeleteRequest(LOCK_INDEX_NAME).id(lockId) + client.delete( + deleteRequest, + object : ActionListener { + override fun onResponse(response: DeleteResponse) { + listener.onResponse( + response.result == DocWriteResponse.Result.DELETED || response.result == DocWriteResponse.Result.NOT_FOUND + ) + } + + override fun onFailure(e: Exception) { + if (e is IndexNotFoundException || e.cause is IndexNotFoundException) { + log.debug("Index is not found to delete lock. {}", e.message) + listener.onResponse(true) + } else { + listener.onFailure(e) + } + } + } + ) + } + + private fun createLockIndex(listener: ActionListener) { + if (lockIndexExist()) { + listener.onResponse(true) + } else { + val indexRequest = CreateIndexRequest(LOCK_INDEX_NAME).mapping(lockMapping()) + .settings(Settings.builder().put("index.hidden", true).build()) + client.admin().indices().create( + indexRequest, + object : ActionListener { + override fun onResponse(response: CreateIndexResponse) { + listener.onResponse(response.isAcknowledged) + } + + override fun onFailure(ex: Exception) { + log.error("Failed to update config index schema", ex) + if (ex is ResourceAlreadyExistsException || ex.cause is ResourceAlreadyExistsException + ) { + listener.onResponse(true) + } else { + listener.onFailure(ex) + } + } + } + ) + } + } + + private fun isLockReleased(lock: LockModel): Boolean { + return lock.released + } + + private fun getNow(): Instant { + return if (testInstant != null) { + testInstant!! + } else { + Instant.now() + } + } + + fun setTime(testInstant: Instant) { + this.testInstant = testInstant + } +} diff --git a/core/src/main/resources/mappings/opensearch-alerting-config-lock.json b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json new file mode 100644 index 000000000..401374a8f --- /dev/null +++ b/core/src/main/resources/mappings/opensearch-alerting-config-lock.json @@ -0,0 +1,18 @@ +{ + "dynamic": "strict", + "properties": { + "scheduled_job_id": { + "type": "keyword" + }, + "lock_time": { + "type": "date", + "format": "epoch_second" + }, + "lock_duration_seconds": { + "type": "long" + }, + "released": { + "type": "boolean" + } + } +} \ No newline at end of file