Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Implemented cross-cluster monitor support (#1404) #1412

Merged
merged 2 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.model.ActionRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.firstFailureOrNull
import org.opensearch.alerting.opensearchapi.retry
Expand Down Expand Up @@ -190,6 +191,19 @@ class AlertService(
)
}

// Including a list of triggered clusters for cluster metrics monitors
var triggeredClusters: MutableList<String>? = null
if (result is ClusterMetricsTriggerRunResult)
result.clusterTriggerResults.forEach {
if (it.triggered) {
// Add an empty list if one isn't already present
if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf()

// Add the cluster to the list of triggered clusters
triggeredClusters!!.add(it.cluster)
}
}

// Merge the alert's error message to the current alert's history
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
Expand All @@ -199,7 +213,8 @@ class AlertService(
errorMessage = null,
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
Expand All @@ -212,6 +227,7 @@ class AlertService(
errorHistory = updatedHistory,
actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion,
clusters = triggeredClusters
)
} else {
val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) {
Expand All @@ -223,7 +239,8 @@ class AlertService(
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId,
workflowId = workflorwRunContext?.workflowId ?: ""
workflowId = workflorwRunContext?.workflowId ?: "",
clusters = triggeredClusters
)
}
}
Expand Down
13 changes: 10 additions & 3 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
import org.opensearch.alerting.action.GetEmailAccountAction
import org.opensearch.alerting.action.GetEmailGroupAction
import org.opensearch.alerting.action.GetRemoteIndexesAction
import org.opensearch.alerting.action.SearchEmailAccountAction
import org.opensearch.alerting.action.SearchEmailGroupAction
import org.opensearch.alerting.alerts.AlertIndices
Expand All @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction
import org.opensearch.alerting.resthandler.RestGetEmailGroupAction
import org.opensearch.alerting.resthandler.RestGetFindingsAction
import org.opensearch.alerting.resthandler.RestGetMonitorAction
import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAction
import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction
import org.opensearch.alerting.resthandler.RestIndexMonitorAction
Expand All @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction
import org.opensearch.alerting.transport.TransportGetEmailGroupAction
import org.opensearch.alerting.transport.TransportGetFindingsSearchAction
import org.opensearch.alerting.transport.TransportGetMonitorAction
import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction
import org.opensearch.alerting.transport.TransportGetWorkflowAction
import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction
import org.opensearch.alerting.transport.TransportIndexMonitorAction
Expand Down Expand Up @@ -134,6 +137,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

@JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors"
@JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows"
@JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote"
@JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations"

@JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors"
Expand Down Expand Up @@ -192,7 +196,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
RestGetWorkflowAlertsAction(),
RestGetFindingsAction(),
RestGetWorkflowAction(),
RestDeleteWorkflowAction()
RestDeleteWorkflowAction(),
RestGetRemoteIndexesAction(),
)
}

Expand All @@ -219,7 +224,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java),
)
}

Expand Down Expand Up @@ -356,7 +362,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE,
AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD,
AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD,
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE
AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE,
AlertingSettings.REMOTE_MONITORING_ENABLED
)
}

Expand Down
43 changes: 39 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,19 @@

package org.opensearch.alerting

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.TriggerAfterKey
import org.opensearch.alerting.opensearchapi.convertToMap
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AggregationQueryRewriter
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction
import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap
Expand Down Expand Up @@ -43,6 +48,8 @@ import org.opensearch.script.TemplateScript
import org.opensearch.search.builder.SearchSourceBuilder
import java.time.Instant

private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

/** Service that handles the collection of input results for Monitor executions */
class InputService(
val client: Client,
Expand Down Expand Up @@ -100,8 +107,9 @@ class InputService(
.newInstance(searchParams)
.execute()

val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService)
val searchRequest = SearchRequest()
.indices(*input.indices.toTypedArray())
.indices(*indexes.toTypedArray())
.preference(Preference.PRIMARY_FIRST.type())
XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use {
searchRequest.source(SearchSourceBuilder.fromXContent(it))
Expand All @@ -115,9 +123,36 @@ class InputService(
results += searchResponse.convertToMap()
}
is ClusterMetricsInput -> {
logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}")
val response = executeTransportAction(input, client)
results += response.toMap()
logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType)

val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)

val responseMap = mutableMapOf<String, Map<String, Any>>()
if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) {
client.threadPool().threadContext.stashContext().use {
scope.launch {
input.clusters.forEach { cluster ->
val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService)
val response = executeTransportAction(input, targetClient)
// Not all supported API reference the cluster name in their response.
// Mapping each response to the cluster name before adding to results.
// Not adding this same logic for local-only monitors to avoid breaking existing monitors.
responseMap[cluster] = response.toMap()
}
}
}
val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT)
val startTime = Instant.now().toEpochMilli()
while (
(Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) ||
(responseMap.size < input.clusters.size)
) { /* Wait for responses */ }
results += responseMap
} else {
val response = executeTransportAction(input, client)
results += response.toMap()
}
}
else -> {
throw IllegalArgumentException("Unsupported input type: ${input.name()}.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -65,7 +66,21 @@ object QueryLevelMonitorRunner : MonitorRunner() {
for (trigger in monitor.triggers) {
val currentAlert = currentAlerts[trigger]
val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert)
val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
val triggerResult = when (monitor.monitorType) {
Monitor.MonitorType.QUERY_LEVEL_MONITOR ->
monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> {
val remoteMonitoringEnabled =
monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED)
logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled)
if (remoteMonitoringEnabled)
monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!)
else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx)
}
else ->
throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.")
}

triggerResults[trigger.id] = triggerResult

if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) {
Expand Down
50 changes: 50 additions & 0 deletions alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,19 @@ import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.ChainedAlertTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult
import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser
import org.opensearch.alerting.util.CrossClusterMonitorUtils
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.service.ClusterService
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES
import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH
import org.opensearch.commons.alerting.model.AggregationResultBucket
Expand Down Expand Up @@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) {
}
}

fun runClusterMetricsTrigger(
monitor: Monitor,
trigger: QueryLevelTrigger,
ctx: QueryLevelTriggerExecutionContext,
clusterService: ClusterService
): ClusterMetricsTriggerRunResult {
var runResult: ClusterMetricsTriggerRunResult?
try {
val inputResults = ctx.results.getOrElse(0) { mapOf() }
var triggered = false
val clusterTriggerResults = mutableListOf<ClusterTriggerResult>()
if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) {
inputResults.forEach { clusterResult ->
// Reducing the inputResults to only include results from 1 cluster at a time
val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair())))

val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(clusterTriggerCtx)

if (clusterTriggered) {
triggered = clusterTriggered
clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered))
}
}
} else {
triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(trigger.condition.params)
.execute(ctx)
if (triggered) clusterTriggerResults
.add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered))
}
runResult = ClusterMetricsTriggerRunResult(
triggerName = trigger.name,
triggered = triggered,
error = null,
clusterTriggerResults = clusterTriggerResults
)
} catch (e: Exception) {
logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e)
}
return runResult!!
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionType

class GetRemoteIndexesAction private constructor() : ActionType<GetRemoteIndexesResponse>(NAME, ::GetRemoteIndexesResponse) {
companion object {
val INSTANCE = GetRemoteIndexesAction()
const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.action

import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionRequestValidationException
import org.opensearch.core.common.io.stream.StreamInput
import org.opensearch.core.common.io.stream.StreamOutput
import java.io.IOException

class GetRemoteIndexesRequest : ActionRequest {
var indexes: List<String> = listOf()
var includeMappings: Boolean

constructor(indexes: List<String>, includeMappings: Boolean) : super() {
this.indexes = indexes
this.includeMappings = includeMappings
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readStringList(),
sin.readBoolean()
)

override fun validate(): ActionRequestValidationException? {
return null
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeStringArray(indexes.toTypedArray())
out.writeBoolean(includeMappings)
}

companion object {
const val INDEXES_FIELD = "indexes"
const val INCLUDE_MAPPINGS_FIELD = "include_mappings"
}
}
Loading
Loading