diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 05e35c1b7..e0dc4625e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -20,6 +20,7 @@ import org.opensearch.action.support.WriteRequest import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.model.ActionRunResult import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.firstFailureOrNull import org.opensearch.alerting.opensearchapi.retry @@ -190,6 +191,19 @@ class AlertService( ) } + // Including a list of triggered clusters for cluster metrics monitors + var triggeredClusters: MutableList? = null + if (result is ClusterMetricsTriggerRunResult) + result.clusterTriggerResults.forEach { + if (it.triggered) { + // Add an empty list if one isn't already present + if (triggeredClusters.isNullOrEmpty()) triggeredClusters = mutableListOf() + + // Add the cluster to the list of triggered clusters + triggeredClusters!!.add(it.cluster) + } + } + // Merge the alert's error message to the current alert's history val updatedHistory = currentAlert?.errorHistory.update(alertError) return if (alertError == null && !result.triggered) { @@ -199,7 +213,8 @@ class AlertService( errorMessage = null, errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, - schemaVersion = IndexUtils.alertIndexSchemaVersion + schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else if (alertError == null && currentAlert?.isAcknowledged() == true) { null @@ -212,6 +227,7 @@ class AlertService( errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, + clusters = triggeredClusters ) } else { val alertState = if (workflorwRunContext?.auditDelegateMonitorAlerts == true) { @@ -223,7 +239,8 @@ class AlertService( lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message, errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults, schemaVersion = IndexUtils.alertIndexSchemaVersion, executionId = executionId, - workflowId = workflorwRunContext?.workflowId ?: "" + workflowId = workflorwRunContext?.workflowId ?: "", + clusters = triggeredClusters ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index f4c9948a6..c9336e0a4 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.action.ExecuteWorkflowAction import org.opensearch.alerting.action.GetDestinationsAction import org.opensearch.alerting.action.GetEmailAccountAction import org.opensearch.alerting.action.GetEmailGroupAction +import org.opensearch.alerting.action.GetRemoteIndexesAction import org.opensearch.alerting.action.SearchEmailAccountAction import org.opensearch.alerting.action.SearchEmailGroupAction import org.opensearch.alerting.alerts.AlertIndices @@ -34,6 +35,7 @@ import org.opensearch.alerting.resthandler.RestGetEmailAccountAction import org.opensearch.alerting.resthandler.RestGetEmailGroupAction import org.opensearch.alerting.resthandler.RestGetFindingsAction import org.opensearch.alerting.resthandler.RestGetMonitorAction +import org.opensearch.alerting.resthandler.RestGetRemoteIndexesAction import org.opensearch.alerting.resthandler.RestGetWorkflowAction import org.opensearch.alerting.resthandler.RestGetWorkflowAlertsAction import org.opensearch.alerting.resthandler.RestIndexMonitorAction @@ -59,6 +61,7 @@ import org.opensearch.alerting.transport.TransportGetEmailAccountAction import org.opensearch.alerting.transport.TransportGetEmailGroupAction import org.opensearch.alerting.transport.TransportGetFindingsSearchAction import org.opensearch.alerting.transport.TransportGetMonitorAction +import org.opensearch.alerting.transport.TransportGetRemoteIndexesAction import org.opensearch.alerting.transport.TransportGetWorkflowAction import org.opensearch.alerting.transport.TransportGetWorkflowAlertsAction import org.opensearch.alerting.transport.TransportIndexMonitorAction @@ -134,6 +137,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R @JvmField val MONITOR_BASE_URI = "/_plugins/_alerting/monitors" @JvmField val WORKFLOW_BASE_URI = "/_plugins/_alerting/workflows" + @JvmField val REMOTE_BASE_URI = "/_plugins/_alerting/remote" @JvmField val DESTINATION_BASE_URI = "/_plugins/_alerting/destinations" @JvmField val LEGACY_OPENDISTRO_MONITOR_BASE_URI = "/_opendistro/_alerting/monitors" @@ -192,7 +196,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R RestGetWorkflowAlertsAction(), RestGetFindingsAction(), RestGetWorkflowAction(), - RestDeleteWorkflowAction() + RestDeleteWorkflowAction(), + RestGetRemoteIndexesAction(), ) } @@ -219,7 +224,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java), ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java), - ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java) + ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java), + ActionPlugin.ActionHandler(GetRemoteIndexesAction.INSTANCE, TransportGetRemoteIndexesAction::class.java), ) } @@ -356,7 +362,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R AlertingSettings.FINDING_HISTORY_INDEX_MAX_AGE, AlertingSettings.FINDING_HISTORY_ROLLOVER_PERIOD, AlertingSettings.FINDING_HISTORY_RETENTION_PERIOD, - AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE + AlertingSettings.FINDINGS_INDEXING_BATCH_SIZE, + AlertingSettings.REMOTE_MONITORING_ENABLED ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt index b31e21d5f..17ec4670c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/InputService.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse @@ -12,7 +15,9 @@ import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.TriggerAfterKey import org.opensearch.alerting.opensearchapi.convertToMap import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AggregationQueryRewriter +import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.addUserBackendRolesFilter import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.executeTransportAction import org.opensearch.alerting.util.clusterMetricsMonitorHelpers.toMap @@ -43,6 +48,8 @@ import org.opensearch.script.TemplateScript import org.opensearch.search.builder.SearchSourceBuilder import java.time.Instant +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + /** Service that handles the collection of input results for Monitor executions */ class InputService( val client: Client, @@ -100,8 +107,9 @@ class InputService( .newInstance(searchParams) .execute() + val indexes = CrossClusterMonitorUtils.parseIndexesForRemoteSearch(input.indices, clusterService) val searchRequest = SearchRequest() - .indices(*input.indices.toTypedArray()) + .indices(*indexes.toTypedArray()) .preference(Preference.PRIMARY_FIRST.type()) XContentType.JSON.xContent().createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, searchSource).use { searchRequest.source(SearchSourceBuilder.fromXContent(it)) @@ -115,9 +123,36 @@ class InputService( results += searchResponse.convertToMap() } is ClusterMetricsInput -> { - logger.debug("ClusterMetricsInput clusterMetricType: ${input.clusterMetricType}") - val response = executeTransportAction(input, client) - results += response.toMap() + logger.debug("ClusterMetricsInput clusterMetricType: {}", input.clusterMetricType) + + val remoteMonitoringEnabled = clusterService.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + + val responseMap = mutableMapOf>() + if (remoteMonitoringEnabled && input.clusters.isNotEmpty()) { + client.threadPool().threadContext.stashContext().use { + scope.launch { + input.clusters.forEach { cluster -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(cluster, client, clusterService) + val response = executeTransportAction(input, targetClient) + // Not all supported API reference the cluster name in their response. + // Mapping each response to the cluster name before adding to results. + // Not adding this same logic for local-only monitors to avoid breaking existing monitors. + responseMap[cluster] = response.toMap() + } + } + } + val inputTimeout = clusterService.clusterSettings.get(AlertingSettings.INPUT_TIMEOUT) + val startTime = Instant.now().toEpochMilli() + while ( + (Instant.now().toEpochMilli() - startTime >= inputTimeout.millis) || + (responseMap.size < input.clusters.size) + ) { /* Wait for responses */ } + results += responseMap + } else { + val response = executeTransportAction(input, client) + results += response.toMap() + } } else -> { throw IllegalArgumentException("Unsupported input type: ${input.name()}.") diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index 691071517..a77121069 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -11,6 +11,7 @@ import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.opensearchapi.InjectorContextElement import org.opensearch.alerting.opensearchapi.withClosableContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext +import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.isADMonitor import org.opensearch.alerting.workflow.WorkflowRunContext import org.opensearch.commons.alerting.model.Alert @@ -65,7 +66,21 @@ object QueryLevelMonitorRunner : MonitorRunner() { for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) - val triggerResult = monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + val triggerResult = when (monitor.monitorType) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> + monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + val remoteMonitoringEnabled = + monitorCtx.clusterService!!.clusterSettings.get(AlertingSettings.REMOTE_MONITORING_ENABLED) + logger.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (remoteMonitoringEnabled) + monitorCtx.triggerService!!.runClusterMetricsTrigger(monitor, trigger, triggerCtx, monitorCtx.clusterService!!) + else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) + } + else -> + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") + } + triggerResults[trigger.id] = triggerResult if (monitorCtx.triggerService!!.isQueryLevelTriggerActionable(triggerCtx, triggerResult, workflowRunContext)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt index f2356eddf..21ba32475 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/TriggerService.kt @@ -9,6 +9,8 @@ import org.apache.logging.log4j.LogManager import org.opensearch.alerting.chainedAlertCondition.parsers.ChainedAlertExpressionParser import org.opensearch.alerting.model.BucketLevelTriggerRunResult import org.opensearch.alerting.model.ChainedAlertTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult +import org.opensearch.alerting.model.ClusterMetricsTriggerRunResult.ClusterTriggerResult import org.opensearch.alerting.model.DocumentLevelTriggerRunResult import org.opensearch.alerting.model.QueryLevelTriggerRunResult import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext @@ -16,8 +18,10 @@ import org.opensearch.alerting.script.ChainedAlertTriggerExecutionContext import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext import org.opensearch.alerting.script.TriggerScript import org.opensearch.alerting.triggercondition.parsers.TriggerExpressionParser +import org.opensearch.alerting.util.CrossClusterMonitorUtils import org.opensearch.alerting.util.getBucketKeysHash import org.opensearch.alerting.workflow.WorkflowRunContext +import org.opensearch.cluster.service.ClusterService import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.BUCKET_INDICES import org.opensearch.commons.alerting.aggregation.bucketselectorext.BucketSelectorIndices.Fields.PARENT_BUCKET_PATH import org.opensearch.commons.alerting.model.AggregationResultBucket @@ -79,6 +83,52 @@ class TriggerService(val scriptService: ScriptService) { } } + fun runClusterMetricsTrigger( + monitor: Monitor, + trigger: QueryLevelTrigger, + ctx: QueryLevelTriggerExecutionContext, + clusterService: ClusterService + ): ClusterMetricsTriggerRunResult { + var runResult: ClusterMetricsTriggerRunResult? + try { + val inputResults = ctx.results.getOrElse(0) { mapOf() } + var triggered = false + val clusterTriggerResults = mutableListOf() + if (CrossClusterMonitorUtils.isRemoteMonitor(monitor, clusterService)) { + inputResults.forEach { clusterResult -> + // Reducing the inputResults to only include results from 1 cluster at a time + val clusterTriggerCtx = ctx.copy(results = listOf(mapOf(clusterResult.toPair()))) + + val clusterTriggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(clusterTriggerCtx) + + if (clusterTriggered) { + triggered = clusterTriggered + clusterTriggerResults.add(ClusterTriggerResult(cluster = clusterResult.key, triggered = clusterTriggered)) + } + } + } else { + triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT) + .newInstance(trigger.condition.params) + .execute(ctx) + if (triggered) clusterTriggerResults + .add(ClusterTriggerResult(cluster = clusterService.clusterName.value(), triggered = triggered)) + } + runResult = ClusterMetricsTriggerRunResult( + triggerName = trigger.name, + triggered = triggered, + error = null, + clusterTriggerResults = clusterTriggerResults + ) + } catch (e: Exception) { + logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e) + // if the script fails we need to send an alert so set triggered = true + runResult = ClusterMetricsTriggerRunResult(trigger.name, true, e) + } + return runResult!! + } + // TODO: improve performance and support match all and match any fun runDocLevelTrigger( monitor: Monitor, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt new file mode 100644 index 000000000..059110af4 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesAction.kt @@ -0,0 +1,15 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionType + +class GetRemoteIndexesAction private constructor() : ActionType(NAME, ::GetRemoteIndexesResponse) { + companion object { + val INSTANCE = GetRemoteIndexesAction() + const val NAME = "cluster:admin/opensearch/alerting/remote/indexes/get" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt new file mode 100644 index 000000000..733bc3a04 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesRequest.kt @@ -0,0 +1,43 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.action.ActionRequest +import org.opensearch.action.ActionRequestValidationException +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import java.io.IOException + +class GetRemoteIndexesRequest : ActionRequest { + var indexes: List = listOf() + var includeMappings: Boolean + + constructor(indexes: List, includeMappings: Boolean) : super() { + this.indexes = indexes + this.includeMappings = includeMappings + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readStringList(), + sin.readBoolean() + ) + + override fun validate(): ActionRequestValidationException? { + return null + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeStringArray(indexes.toTypedArray()) + out.writeBoolean(includeMappings) + } + + companion object { + const val INDEXES_FIELD = "indexes" + const val INCLUDE_MAPPINGS_FIELD = "include_mappings" + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt new file mode 100644 index 000000000..1572b4228 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/action/GetRemoteIndexesResponse.kt @@ -0,0 +1,135 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.action + +import org.opensearch.cluster.health.ClusterHealthStatus +import org.opensearch.cluster.metadata.MappingMetadata +import org.opensearch.core.action.ActionResponse +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import java.io.IOException + +class GetRemoteIndexesResponse : ActionResponse, ToXContentObject { + var clusterIndexes: List = emptyList() + + constructor(clusterIndexes: List) : super() { + this.clusterIndexes = clusterIndexes + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + clusterIndexes = sin.readList((ClusterIndexes.Companion)::readFrom) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + clusterIndexes.forEach { + it.toXContent(builder, params) + } + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + clusterIndexes.forEach { it.writeTo(out) } + } + + data class ClusterIndexes( + val clusterName: String, + val clusterHealth: ClusterHealthStatus, + val hubCluster: Boolean, + val indexes: List = listOf(), + val latency: Long + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + clusterName = sin.readString(), + clusterHealth = sin.readEnum(ClusterHealthStatus::class.java), + hubCluster = sin.readBoolean(), + indexes = sin.readList((ClusterIndex.Companion)::readFrom), + latency = sin.readLong() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject(clusterName) + builder.field(CLUSTER_NAME_FIELD, clusterName) + builder.field(CLUSTER_HEALTH_FIELD, clusterHealth) + builder.field(HUB_CLUSTER_FIELD, hubCluster) + builder.field(INDEX_LATENCY_FIELD, latency) + builder.startObject(INDEXES_FIELD) + indexes.forEach { + it.toXContent(builder, params) + } + return builder.endObject().endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(clusterName) + out.writeEnum(clusterHealth) + indexes.forEach { it.writeTo(out) } + out.writeLong(latency) + } + + companion object { + const val CLUSTER_NAME_FIELD = "cluster" + const val CLUSTER_HEALTH_FIELD = "health" + const val HUB_CLUSTER_FIELD = "hub_cluster" + const val INDEXES_FIELD = "indexes" + const val INDEX_LATENCY_FIELD = "latency" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterIndexes { + return ClusterIndexes(sin) + } + } + + data class ClusterIndex( + val indexName: String, + val indexHealth: ClusterHealthStatus?, + val mappings: MappingMetadata? + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + indexName = sin.readString(), + indexHealth = sin.readEnum(ClusterHealthStatus::class.java), + mappings = sin.readOptionalWriteable(::MappingMetadata) + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject(indexName) + builder.field(INDEX_NAME_FIELD, indexName) + builder.field(INDEX_HEALTH_FIELD, indexHealth) + if (mappings == null) builder.startObject(MAPPINGS_FIELD).endObject() + else builder.field(MAPPINGS_FIELD, mappings.sourceAsMap()) + return builder.endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(indexName) + out.writeEnum(indexHealth) + if (mappings != null) out.writeMap(mappings.sourceAsMap) + } + + companion object { + const val INDEX_NAME_FIELD = "name" + const val INDEX_HEALTH_FIELD = "health" + const val MAPPINGS_FIELD = "mappings" + + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterIndex { + return ClusterIndex(sin) + } + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt new file mode 100644 index 000000000..a19de0637 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/ClusterMetricsTriggerRunResult.kt @@ -0,0 +1,110 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.commons.alerting.alerts.AlertError +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.common.io.stream.Writeable +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.ToXContentObject +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException +import java.time.Instant + +data class ClusterMetricsTriggerRunResult( + override var triggerName: String, + override var triggered: Boolean, + override var error: Exception?, + override var actionResults: MutableMap = mutableMapOf(), + var clusterTriggerResults: List = listOf() +) : QueryLevelTriggerRunResult( + triggerName = triggerName, + error = error, + triggered = triggered, + actionResults = actionResults +) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + triggered = sin.readBoolean(), + actionResults = sin.readMap() as MutableMap, + clusterTriggerResults = sin.readList((ClusterTriggerResult.Companion)::readFrom) + ) + + override fun alertError(): AlertError? { + if (error != null) { + return AlertError(Instant.now(), "Failed evaluating trigger:\n${error!!.userErrorMessage()}") + } + for (actionResult in actionResults.values) { + if (actionResult.error != null) { + return AlertError(Instant.now(), "Failed running action:\n${actionResult.error.userErrorMessage()}") + } + } + return null + } + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + builder + .field(TRIGGERED_FIELD, triggered) + .field(ACTION_RESULTS_FIELD, actionResults as Map) + .startArray(CLUSTER_RESULTS_FIELD) + clusterTriggerResults.forEach { it.toXContent(builder, params) } + return builder.endArray() + } + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeBoolean(triggered) + out.writeMap(actionResults as Map) + clusterTriggerResults.forEach { it.writeTo(out) } + } + + companion object { + const val TRIGGERED_FIELD = "triggered" + const val ACTION_RESULTS_FIELD = "action_results" + const val CLUSTER_RESULTS_FIELD = "cluster_results" + } + + data class ClusterTriggerResult( + val cluster: String, + val triggered: Boolean, + ) : ToXContentObject, Writeable { + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + cluster = sin.readString(), + triggered = sin.readBoolean() + ) + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .startObject(cluster) + .field(TRIGGERED_FIELD, triggered) + .endObject() + .endObject() + } + + override fun writeTo(out: StreamOutput) { + out.writeString(cluster) + out.writeBoolean(triggered) + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): ClusterTriggerResult { + return ClusterTriggerResult(sin) + } + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt index d123dbae4..5917c1ecf 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/QueryLevelTriggerRunResult.kt @@ -14,11 +14,11 @@ import org.opensearch.script.ScriptException import java.io.IOException import java.time.Instant -data class QueryLevelTriggerRunResult( +open class QueryLevelTriggerRunResult( override var triggerName: String, - var triggered: Boolean, + open var triggered: Boolean, override var error: Exception?, - var actionResults: MutableMap = mutableMapOf() + open var actionResults: MutableMap = mutableMapOf() ) : TriggerRunResult(triggerName, error) { @Throws(IOException::class) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt new file mode 100644 index 000000000..591ab2c3e --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetRemoteIndexesAction.kt @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.resthandler + +import org.apache.logging.log4j.LogManager +import org.opensearch.alerting.AlertingPlugin +import org.opensearch.alerting.action.GetRemoteIndexesAction +import org.opensearch.alerting.action.GetRemoteIndexesRequest +import org.opensearch.client.node.NodeClient +import org.opensearch.core.common.Strings +import org.opensearch.rest.BaseRestHandler +import org.opensearch.rest.RestHandler +import org.opensearch.rest.RestRequest +import org.opensearch.rest.action.RestToXContentListener + +private val log = LogManager.getLogger(RestGetRemoteIndexesAction::class.java) + +class RestGetRemoteIndexesAction : BaseRestHandler() { + val ROUTE = "${AlertingPlugin.REMOTE_BASE_URI}/indexes" + + override fun getName(): String { + return "get_remote_indexes_action" + } + + override fun routes(): List { + return mutableListOf( + RestHandler.Route(RestRequest.Method.GET, ROUTE) + ) + } + + override fun prepareRequest(request: RestRequest, client: NodeClient): RestChannelConsumer { + log.debug("${request.method()} $ROUTE") + val indexes = Strings.splitStringByCommaToArray(request.param(GetRemoteIndexesRequest.INDEXES_FIELD, "")) + val includeMappings = request.paramAsBoolean(GetRemoteIndexesRequest.INCLUDE_MAPPINGS_FIELD, false) + return RestChannelConsumer { + channel -> + client.execute( + GetRemoteIndexesAction.INSTANCE, + GetRemoteIndexesRequest( + indexes = indexes.toList(), + includeMappings = includeMappings + ), + RestToXContentListener(channel) + ) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt index 8d00d4569..092670448 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/settings/AlertingSettings.kt @@ -22,64 +22,55 @@ class AlertingSettings { val ALERTING_MAX_MONITORS = Setting.intSetting( "plugins.alerting.monitor.max_monitors", LegacyOpenDistroAlertingSettings.ALERTING_MAX_MONITORS, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val INPUT_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.input_timeout", LegacyOpenDistroAlertingSettings.INPUT_TIMEOUT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val INDEX_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.index_timeout", LegacyOpenDistroAlertingSettings.INDEX_TIMEOUT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val BULK_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.bulk_timeout", LegacyOpenDistroAlertingSettings.BULK_TIMEOUT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_BACKOFF_MILLIS = Setting.positiveTimeSetting( "plugins.alerting.alert_backoff_millis", LegacyOpenDistroAlertingSettings.ALERT_BACKOFF_MILLIS, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_BACKOFF_COUNT = Setting.intSetting( "plugins.alerting.alert_backoff_count", LegacyOpenDistroAlertingSettings.ALERT_BACKOFF_COUNT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val MOVE_ALERTS_BACKOFF_MILLIS = Setting.positiveTimeSetting( "plugins.alerting.move_alerts_backoff_millis", LegacyOpenDistroAlertingSettings.MOVE_ALERTS_BACKOFF_MILLIS, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val MOVE_ALERTS_BACKOFF_COUNT = Setting.intSetting( "plugins.alerting.move_alerts_backoff_count", LegacyOpenDistroAlertingSettings.MOVE_ALERTS_BACKOFF_COUNT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_HISTORY_ENABLED = Setting.boolSetting( "plugins.alerting.alert_history_enabled", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_ENABLED, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) // TODO: Do we want to let users to disable this? If so, we need to fix the rollover logic @@ -87,95 +78,81 @@ class AlertingSettings { val FINDING_HISTORY_ENABLED = Setting.boolSetting( "plugins.alerting.alert_finding_enabled", true, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.alert_history_rollover_period", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_ROLLOVER_PERIOD, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FINDING_HISTORY_ROLLOVER_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.alert_finding_rollover_period", TimeValue.timeValueHours(12), - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( "plugins.alerting.alert_history_max_age", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_INDEX_MAX_AGE, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FINDING_HISTORY_INDEX_MAX_AGE = Setting.positiveTimeSetting( "plugins.alerting.finding_history_max_age", TimeValue(30, TimeUnit.DAYS), - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val ALERT_HISTORY_MAX_DOCS = Setting.longSetting( "plugins.alerting.alert_history_max_docs", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_MAX_DOCS, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FINDING_HISTORY_MAX_DOCS = Setting.longSetting( "plugins.alerting.alert_finding_max_docs", 1000L, 0L, - Setting.Property.NodeScope, - Setting.Property.Dynamic, - Setting.Property.Deprecated + Setting.Property.NodeScope, Setting.Property.Dynamic, Setting.Property.Deprecated ) val ALERT_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.alert_history_retention_period", LegacyOpenDistroAlertingSettings.ALERT_HISTORY_RETENTION_PERIOD, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FINDING_HISTORY_RETENTION_PERIOD = Setting.positiveTimeSetting( "plugins.alerting.finding_history_retention_period", TimeValue(60, TimeUnit.DAYS), - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val REQUEST_TIMEOUT = Setting.positiveTimeSetting( "plugins.alerting.request_timeout", LegacyOpenDistroAlertingSettings.REQUEST_TIMEOUT, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val MAX_ACTION_THROTTLE_VALUE = Setting.positiveTimeSetting( "plugins.alerting.action_throttle_max_value", LegacyOpenDistroAlertingSettings.MAX_ACTION_THROTTLE_VALUE, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FILTER_BY_BACKEND_ROLES = Setting.boolSetting( "plugins.alerting.filter_by_backend_roles", LegacyOpenDistroAlertingSettings.FILTER_BY_BACKEND_ROLES, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val MAX_ACTIONABLE_ALERT_COUNT = Setting.longSetting( "plugins.alerting.max_actionable_alert_count", DEFAULT_MAX_ACTIONABLE_ALERT_COUNT, -1L, - Setting.Property.NodeScope, - Setting.Property.Dynamic + Setting.Property.NodeScope, Setting.Property.Dynamic ) val FINDINGS_INDEXING_BATCH_SIZE = Setting.intSetting( @@ -184,5 +161,11 @@ class AlertingSettings { 1, Setting.Property.NodeScope, Setting.Property.Dynamic ) + + val REMOTE_MONITORING_ENABLED = Setting.boolSetting( + "plugins.alerting.remote_monitoring_enabled", + false, + Setting.Property.NodeScope, Setting.Property.Dynamic + ) } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt new file mode 100644 index 000000000..5b35d493a --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetRemoteIndexesAction.kt @@ -0,0 +1,193 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.transport + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch +import kotlinx.coroutines.newSingleThreadContext +import kotlinx.coroutines.withContext +import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException +import org.opensearch.action.admin.cluster.health.ClusterHealthRequest +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse +import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest +import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse +import org.opensearch.action.admin.indices.resolve.ResolveIndexAction +import org.opensearch.action.support.ActionFilters +import org.opensearch.action.support.HandledTransportAction +import org.opensearch.action.support.IndicesOptions +import org.opensearch.alerting.action.GetRemoteIndexesAction +import org.opensearch.alerting.action.GetRemoteIndexesRequest +import org.opensearch.alerting.action.GetRemoteIndexesResponse +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes +import org.opensearch.alerting.action.GetRemoteIndexesResponse.ClusterIndexes.ClusterIndex +import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.settings.AlertingSettings +import org.opensearch.alerting.settings.AlertingSettings.Companion.REMOTE_MONITORING_ENABLED +import org.opensearch.alerting.util.AlertingException +import org.opensearch.alerting.util.CrossClusterMonitorUtils +import org.opensearch.client.Client +import org.opensearch.cluster.service.ClusterService +import org.opensearch.common.inject.Inject +import org.opensearch.common.settings.Settings +import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus +import org.opensearch.core.xcontent.NamedXContentRegistry +import org.opensearch.tasks.Task +import org.opensearch.transport.TransportService +import java.time.Duration +import java.time.Instant + +private val log = LogManager.getLogger(TransportGetRemoteIndexesAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) + +class TransportGetRemoteIndexesAction @Inject constructor( + val transportService: TransportService, + val client: Client, + actionFilters: ActionFilters, + val xContentRegistry: NamedXContentRegistry, + val clusterService: ClusterService, + settings: Settings, +) : HandledTransportAction( + GetRemoteIndexesAction.NAME, + transportService, + actionFilters, + ::GetRemoteIndexesRequest +), + SecureTransportAction { + + @Volatile override var filterByEnabled = AlertingSettings.FILTER_BY_BACKEND_ROLES.get(settings) + + @Volatile private var remoteMonitoringEnabled = REMOTE_MONITORING_ENABLED.get(settings) + + init { + clusterService.clusterSettings.addSettingsUpdateConsumer(REMOTE_MONITORING_ENABLED) { remoteMonitoringEnabled = it } + listenFilterBySettingChange(clusterService) + } + + override fun doExecute( + task: Task, + request: GetRemoteIndexesRequest, + actionListener: ActionListener + ) { + log.debug("Remote monitoring enabled: {}", remoteMonitoringEnabled) + if (!remoteMonitoringEnabled) { + actionListener.onFailure( + AlertingException.wrap( + OpenSearchStatusException("Remote monitoring is not enabled.", RestStatus.FORBIDDEN) + ) + ) + return + } + + val user = readUserFromThreadContext(client) + if (!validateUserBackendRoles(user, actionListener)) return + + client.threadPool().threadContext.stashContext().use { + scope.launch { + val singleThreadContext = newSingleThreadContext("GetRemoteIndexesActionThread") + withContext(singleThreadContext) { + it.restore() + val clusterIndexesList = mutableListOf() + + var resolveIndexResponse: ResolveIndexAction.Response? = null + try { + resolveIndexResponse = + getRemoteClusters(CrossClusterMonitorUtils.parseIndexesForRemoteSearch(request.indexes, clusterService)) + } catch (e: Exception) { + log.error("Failed to retrieve indexes for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + + val resolvedIndexes: MutableList = mutableListOf() + if (resolveIndexResponse != null) { + resolveIndexResponse.indices.forEach { resolvedIndexes.add(it.name) } + resolveIndexResponse.aliases.forEach { resolvedIndexes.add(it.name) } + } + + val clusterIndexesMap = CrossClusterMonitorUtils.separateClusterIndexes(resolvedIndexes, clusterService) + + clusterIndexesMap.forEach { (clusterName, indexes) -> + val targetClient = CrossClusterMonitorUtils.getClientForCluster(clusterName, client, clusterService) + + val startTime = Instant.now() + var clusterHealthResponse: ClusterHealthResponse? = null + try { + clusterHealthResponse = getHealthStatuses(targetClient, indexes) + } catch (e: Exception) { + log.error("Failed to retrieve health statuses for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + val endTime = Instant.now() + val latency = Duration.between(startTime, endTime).toMillis() + + var mappingsResponse: GetMappingsResponse? = null + if (request.includeMappings) { + try { + mappingsResponse = getIndexMappings(targetClient, indexes) + } catch (e: Exception) { + log.error("Failed to retrieve mappings for request $request", e) + actionListener.onFailure(AlertingException.wrap(e)) + } + } + + val clusterIndexList = mutableListOf() + if (clusterHealthResponse != null) { + indexes.forEach { + clusterIndexList.add( + ClusterIndex( + indexName = it, + indexHealth = clusterHealthResponse.indices[it]?.status, + mappings = mappingsResponse?.mappings?.get(it) + ) + ) + } + } + + clusterIndexesList.add( + ClusterIndexes( + clusterName = clusterName, + clusterHealth = clusterHealthResponse!!.status, + hubCluster = clusterName == clusterService.clusterName.value(), + indexes = clusterIndexList, + latency = latency + ) + ) + } + actionListener.onResponse(GetRemoteIndexesResponse(clusterIndexes = clusterIndexesList)) + } + } + } + } + + private suspend fun getRemoteClusters(parsedIndexes: List): ResolveIndexAction.Response { + val resolveRequest = ResolveIndexAction.Request( + parsedIndexes.toTypedArray(), + ResolveIndexAction.Request.DEFAULT_INDICES_OPTIONS + ) + + return client.suspendUntil { + admin().indices().resolveIndex(resolveRequest, it) + } + } + private suspend fun getHealthStatuses(targetClient: Client, parsedIndexesNames: List): ClusterHealthResponse { + val clusterHealthRequest = ClusterHealthRequest() + .indices(*parsedIndexesNames.toTypedArray()) + .indicesOptions(IndicesOptions.lenientExpandHidden()) + + return targetClient.suspendUntil { + admin().cluster().health(clusterHealthRequest, it) + } + } + + private suspend fun getIndexMappings(targetClient: Client, parsedIndexNames: List): GetMappingsResponse { + val getMappingsRequest = GetMappingsRequest().indices(*parsedIndexNames.toTypedArray()) + return targetClient.suspendUntil { + admin().indices().getMappings(getMappingsRequest, it) + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt new file mode 100644 index 000000000..6ec14ffa2 --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/CrossClusterMonitorUtils.kt @@ -0,0 +1,231 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.util + +import org.opensearch.action.search.SearchRequest +import org.opensearch.client.Client +import org.opensearch.client.node.NodeClient +import org.opensearch.cluster.service.ClusterService +import org.opensearch.commons.alerting.model.ClusterMetricsInput +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.commons.alerting.model.SearchInput + +class CrossClusterMonitorUtils { + companion object { + + /** + * Uses the monitor inputs to determine whether the monitor makes calls to remote clusters. + * @param monitor The monitor to evaluate. + * @param localClusterName The name of the local cluster. + * @return TRUE if the monitor makes calls to remote clusters; otherwise returns FALSE. + */ + @JvmStatic + fun isRemoteMonitor(monitor: Monitor, localClusterName: String): Boolean { + var isRemoteMonitor = false + monitor.inputs.forEach inputCheck@{ + when (it) { + is ClusterMetricsInput -> { + it.clusters.forEach { clusterName -> + if (clusterName != localClusterName) { + isRemoteMonitor = true + return@inputCheck + } + } + } + is SearchInput -> { + // Remote indexes follow the pattern ":". + // Index entries without a CLUSTER_NAME indicate they're store on the local cluster. + it.indices.forEach { index -> + val clusterName = parseClusterName(index) + if (clusterName != localClusterName) { + isRemoteMonitor = true + return@inputCheck + } + } + } + is DocLevelMonitorInput -> { + // TODO: When document level monitors are supported, this check will be similar to SearchInput. + throw IllegalArgumentException("Per document monitors do not currently support cross-cluster search.") + } + else -> { + throw IllegalArgumentException("Unsupported input type: ${it.name()}.") + } + } + } + return isRemoteMonitor + } + + /** + * Uses the monitor inputs to determine whether the monitor makes calls to remote clusters. + * @param monitor The monitor to evaluate. + * @param clusterService Used to retrieve the name of the local cluster. + * @return TRUE if the monitor makes calls to remote clusters; otherwise returns FALSE. + */ + @JvmStatic + fun isRemoteMonitor(monitor: Monitor, clusterService: ClusterService): Boolean { + return isRemoteMonitor(monitor = monitor, localClusterName = clusterService.clusterName.value()) + } + + /** + * Parses the list of indexes into a map of CLUSTER_NAME to List. + * @param indexes A list of index names in ":" format. + * @param localClusterName The name of the local cluster. + * @return A map of CLUSTER_NAME to List + */ + @JvmStatic + fun separateClusterIndexes(indexes: List, localClusterName: String): HashMap> { + val output = hashMapOf>() + indexes.forEach { index -> + var clusterName = parseClusterName(index) + val indexName = parseIndexName(index) + + // If the index entry does not have a CLUSTER_NAME, it indicates the index is on the local cluster. + if (clusterName.isEmpty()) clusterName = localClusterName + + output.getOrPut(clusterName) { mutableListOf() }.add(indexName) + } + return output + } + + /** + * Parses the list of indexes into a map of CLUSTER_NAME to List. + * @param indexes A list of index names in ":" format. + * Local indexes can also be in "" format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A map of CLUSTER_NAME to List + */ + @JvmStatic + fun separateClusterIndexes(indexes: List, clusterService: ClusterService): HashMap> { + return separateClusterIndexes(indexes = indexes, localClusterName = clusterService.clusterName.value()) + } + + /** + * The [NodeClient] used by the plugin cannot execute searches against local indexes + * using format ":". That format only supports querying remote indexes. + * This function formats a list of indexes to be supplied directly to a [SearchRequest]. + * @param indexes A list of index names in ":" format. + * @param localClusterName The name of the local cluster. + * @return A list of indexes with any remote indexes in ":" format, + * and any local indexes in "" format. + */ + @JvmStatic + fun parseIndexesForRemoteSearch(indexes: List, localClusterName: String): List { + return indexes.map { + var index = it + val clusterName = parseClusterName(it) + if (clusterName.isNotEmpty() && clusterName == localClusterName) { + index = parseIndexName(it) + } + index + } + } + + /** + * The [NodeClient] used by the plugin cannot execute searches against local indexes + * using format ":". That format only supports querying remote indexes. + * This function formats a list of indexes to be supplied directly to a [SearchRequest]. + * @param indexes A list of index names in ":" format. + * @param clusterService Used to retrieve the name of the local cluster. + * @return A list of indexes with any remote indexes in ":" format, + * and any local indexes in "" format. + */ + @JvmStatic + fun parseIndexesForRemoteSearch(indexes: List, clusterService: ClusterService): List { + return parseIndexesForRemoteSearch(indexes = indexes, localClusterName = clusterService.clusterName.value()) + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local [NodeClient]. + * @param localClusterName The name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForCluster(clusterName: String, client: Client, localClusterName: String): Client { + return if (clusterName == localClusterName) client else client.getRemoteClusterClient(clusterName) + } + + /** + * Uses the clusterName to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param clusterName The name of the cluster to evaluate. + * @param client The local [NodeClient]. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForCluster(clusterName: String, client: Client, clusterService: ClusterService): Client { + return getClientForCluster(clusterName = clusterName, client = client, localClusterName = clusterService.clusterName.value()) + } + + /** + * Uses the index name to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @param client The local [NodeClient]. + * @param localClusterName The name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForIndex(index: String, client: Client, localClusterName: String): Client { + val clusterName = parseClusterName(index) + return if (clusterName.isNotEmpty() && clusterName != localClusterName) + client.getRemoteClusterClient(clusterName) else client + } + + /** + * Uses the index name to determine whether the target client is the local or a remote client, + * and returns the appropriate client. + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @param client The local [NodeClient]. + * @param clusterService Used to retrieve the name of the local cluster. + * @return The local [NodeClient] for the local cluster, or a remote client for a remote cluster. + */ + @JvmStatic + fun getClientForIndex(index: String, client: Client, clusterService: ClusterService): Client { + return getClientForIndex(index = index, client = client, localClusterName = clusterService.clusterName.value()) + } + + /** + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @return The cluster name if present; else an empty string. + */ + @JvmStatic + fun parseClusterName(index: String): String { + return if (index.contains(":")) index.split(":").getOrElse(0) { "" } + else "" + } + + /** + * @param index The name of the index to evaluate. + * Can be in either ":" or "" format. + * @return The index name. + */ + @JvmStatic + fun parseIndexName(index: String): String { + return if (index.contains(":")) index.split(":").getOrElse(1) { index } + else index + } + + /** + * If clusterName is provided, combines the inputs into ":" format. + * @param clusterName + * @param indexName + * @return The formatted string. + */ + @JvmStatic + fun formatClusterAndIndexName(clusterName: String, indexName: String): String { + return if (clusterName.isNotEmpty()) "$clusterName:$indexName" + else indexName + } + } +} diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json index 53fb5b0a2..76e5104cc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/alert_mapping.json @@ -169,6 +169,14 @@ "type": "text" } } + }, + "clusters": { + "type" : "text", + "fields" : { + "keyword" : { + "type" : "keyword" + } + } } } } \ No newline at end of file diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt index 72b7c0423..ff79afffb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorRunnerServiceIT.kt @@ -969,7 +969,7 @@ class MonitorRunnerServiceIT : AlertingRestTestCase() { // GIVEN val indices = (1..5).map { createTestIndex() }.toTypedArray() val pathParams = indices.joinToString(",") - val path = "/_cluster/health/" + val path = "/_cluster/health" val input = randomClusterMetricsInput( path = path, pathParams = pathParams diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt index 6851c471d..6ef15f8d8 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/model/WriteableTests.kt @@ -40,7 +40,10 @@ class WriteableTests : OpenSearchTestCase() { runResult.writeTo(out) val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes) val newRunResult = QueryLevelTriggerRunResult(sin) - assertEquals("Round tripping ActionRunResult doesn't work", runResult, newRunResult) + assertEquals(runResult.triggerName, newRunResult.triggerName) + assertEquals(runResult.triggered, newRunResult.triggered) + assertEquals(runResult.error, newRunResult.error) + assertEquals(runResult.actionResults, newRunResult.actionResults) } fun `test bucket-level triggerrunresult as stream`() {