diff --git a/alerting/build.gradle b/alerting/build.gradle index 0e920fee6..bdf842123 100644 --- a/alerting/build.gradle +++ b/alerting/build.gradle @@ -3,6 +3,7 @@ * SPDX-License-Identifier: Apache-2.0 */ +import com.github.jengelman.gradle.plugins.shadow.ShadowPlugin import java.util.concurrent.Callable import org.opensearch.gradle.test.RestIntegTestTask import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask @@ -114,6 +115,7 @@ dependencies { api project(":alerting-core") implementation "com.github.seancfoley:ipaddress:5.4.1" + implementation project(path: ":alerting-spi", configuration: 'shadow') testImplementation "org.antlr:antlr4-runtime:${versions.antlr4}" testImplementation "org.jetbrains.kotlin:kotlin-test:${kotlin_version}" diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt index d531d844d..d77957432 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt @@ -25,6 +25,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler import org.opensearch.alerting.core.schedule.JobScheduler import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings import org.opensearch.alerting.core.settings.ScheduledJobSettings +import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction import org.opensearch.alerting.resthandler.RestDeleteMonitorAction @@ -52,6 +53,7 @@ import org.opensearch.alerting.settings.AlertingSettings.Companion.DOC_LEVEL_MON import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings +import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction import org.opensearch.alerting.transport.TransportDeleteMonitorAction @@ -110,6 +112,7 @@ import org.opensearch.painless.spi.AllowlistLoader import org.opensearch.painless.spi.PainlessExtension import org.opensearch.percolator.PercolatorPluginExt import org.opensearch.plugins.ActionPlugin +import org.opensearch.plugins.ExtensiblePlugin import org.opensearch.plugins.ReloadablePlugin import org.opensearch.plugins.ScriptPlugin import org.opensearch.plugins.SearchPlugin @@ -162,6 +165,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R lateinit var alertIndices: AlertIndices lateinit var clusterService: ClusterService lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator + var monitorTypeToMonitorRunners: MutableMap = mutableMapOf() override fun getRestHandlers( settings: Settings, @@ -277,6 +281,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R .registerLockService(lockService) .registerConsumers() .registerDestinationSettings() + .registerRemoteMonitors(monitorTypeToMonitorRunners) scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService) docLevelMonitorQueries = DocLevelMonitorQueries(client, clusterService) scheduler = JobScheduler(threadPool, runner) @@ -409,4 +414,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R ) ) } + + override fun loadExtensions(loader: ExtensiblePlugin.ExtensionLoader) { + for (monitorExtension in loader.loadExtensions(RemoteMonitorRunnerExtension::class.java)) { + val monitorType = monitorExtension.getMonitorType() + val monitorRunner = monitorExtension.getMonitorRunner() + + if (!this.monitorTypeToMonitorRunners.containsKey(monitorType)) { + val monitorRegistry = RemoteMonitorRegistry(monitorType, monitorRunner) + this.monitorTypeToMonitorRunners[monitorType] = monitorRegistry + } + } + } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt index 8c7e28734..247f29ec0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorMetadataService.kt @@ -41,6 +41,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.ToXContent @@ -48,6 +49,7 @@ import org.opensearch.core.xcontent.XContentParser import org.opensearch.core.xcontent.XContentParserUtils import org.opensearch.index.seqno.SequenceNumbers import org.opensearch.transport.RemoteTransportException +import java.util.* private val log = LogManager.getLogger(MonitorMetadataService::class.java) @@ -185,10 +187,16 @@ object MonitorMetadataService : suspend fun recreateRunContext(metadata: MonitorMetadata, monitor: Monitor): MonitorMetadata { try { - val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + val monitorIndex = if ( + monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) (monitor.inputs[0] as DocLevelMonitorInput).indices[0] else null - val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + val runContext = if ( + monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) createFullRunContext(monitorIndex, metadata.lastRunContext as MutableMap>) else null return if (runContext != null) { @@ -208,10 +216,16 @@ object MonitorMetadataService : createWithRunContext: Boolean, workflowMetadataId: String? = null, ): MonitorMetadata { - val monitorIndex = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) + val monitorIndex = if ( + monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) (monitor.inputs[0] as DocLevelMonitorInput).indices[0] else null - val runContext = if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext) + val runContext = if ( + monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR && createWithRunContext + ) createFullRunContext(monitorIndex) else emptyMap() return MonitorMetadata( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt index 459743f9d..523c2be3a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerExecutionContext.kt @@ -9,6 +9,7 @@ import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.alerting.alerts.AlertIndices import org.opensearch.alerting.core.lock.LockService import org.opensearch.alerting.model.destination.DestinationContextFactory +import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.DestinationSettings import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings @@ -41,6 +42,7 @@ data class MonitorRunnerExecutionContext( var workflowService: WorkflowService? = null, var jvmStats: JvmStats? = null, var findingsToTriggeredQueries: Map>? = null, + var remoteMonitors: Map = mapOf(), @Volatile var retryPolicy: BackoffPolicy? = null, @Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 3ddff1b03..bab5b1064 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -11,6 +11,7 @@ import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager +import org.opensearch.OpenSearchStatusException import org.opensearch.action.bulk.BackoffPolicy import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING import org.opensearch.action.support.master.AcknowledgedResponse @@ -26,11 +27,15 @@ import org.opensearch.alerting.core.JobRunner import org.opensearch.alerting.core.ScheduledJobIndices import org.opensearch.alerting.core.lock.LockModel import org.opensearch.alerting.core.lock.LockService +import org.opensearch.alerting.model.ActionRunResult +import org.opensearch.alerting.model.InputRunResults import org.opensearch.alerting.model.MonitorRunResult +import org.opensearch.alerting.model.RemoteMonitorTriggerRunResult import org.opensearch.alerting.model.WorkflowRunResult import org.opensearch.alerting.model.destination.DestinationContextFactory import org.opensearch.alerting.opensearchapi.retry import org.opensearch.alerting.opensearchapi.suspendUntil +import org.opensearch.alerting.remote.monitors.RemoteMonitorRegistry import org.opensearch.alerting.script.TriggerExecutionContext import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT @@ -63,7 +68,9 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.core.action.ActionListener +import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.monitor.jvm.JvmStats import org.opensearch.script.Script @@ -156,6 +163,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon return this } + fun registerRemoteMonitors(monitorRegistry: Map): MonitorRunnerService { + this.monitorCtx.remoteMonitors = monitorRegistry + return this + } + // Must be called after registerClusterService and registerSettings in AlertingPlugin fun registerConsumers(): MonitorRunnerService { monitorCtx.retryPolicy = BackoffPolicy.constantBackoff( @@ -423,43 +435,88 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon CompositeWorkflowRunner.runWorkflow(workflow = job, monitorCtx, periodStart, periodEnd, dryrun, transportService) } val monitor = job as Monitor - val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}" - logger.info( - "Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType.name}, periodStart: $periodStart, " + - "periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId" - ) - val runResult = if (monitor.isBucketLevelMonitor()) { - BucketLevelMonitorRunner.runMonitor( - monitor, - monitorCtx, - periodStart, - periodEnd, - dryrun, - executionId = executionId, - transportService = transportService - ) - } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner().runMonitor( - monitor, - monitorCtx, - periodStart, - periodEnd, - dryrun, - executionId = executionId, - transportService = transportService + + if (monitor.isMonitorOfStandardType()) { + val executionId = "${monitor.id}_${LocalDateTime.now(ZoneOffset.UTC)}_${UUID.randomUUID()}" + logger.info( + "Executing scheduled monitor - id: ${monitor.id}, type: ${monitor.monitorType}, periodStart: $periodStart, " + + "periodEnd: $periodEnd, dryrun: $dryrun, executionId: $executionId" ) + val runResult = if (monitor.isBucketLevelMonitor()) { + BucketLevelMonitorRunner.runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) + } else if (monitor.isDocLevelMonitor()) { + DocumentLevelMonitorRunner().runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) + } else { + QueryLevelMonitorRunner.runMonitor( + monitor, + monitorCtx, + periodStart, + periodEnd, + dryrun, + executionId = executionId, + transportService = transportService + ) + } + return runResult } else { - QueryLevelMonitorRunner.runMonitor( - monitor, - monitorCtx, - periodStart, - periodEnd, - dryrun, - executionId = executionId, - transportService = transportService - ) + if (monitorCtx.remoteMonitors.containsKey(monitor.monitorType)) { + val remoteRunResult = monitorCtx.remoteMonitors[monitor.monitorType]!!.monitorRunner.runMonitor( + monitor, + periodStart, + periodEnd, + dryrun, + transportService + ) + return MonitorRunResult( + monitor.name, + periodStart, + periodEnd, + remoteRunResult.error, + InputRunResults(remoteRunResult.results, remoteRunResult.error), + remoteRunResult.triggerResults.map { triggerResult -> + triggerResult.key to RemoteMonitorTriggerRunResult( + triggerResult.value.triggerName, + triggerResult.value.error, + triggerResult.value.actionResultsMap.map { actionResult -> + actionResult.key to actionResult.value.map { + it.key to ActionRunResult( + it.value.actionId, + it.value.actionName, + it.value.output, + it.value.throttled, + it.value.executionTime, + it.value.error + ) + }.associate { it.first to it.second }.toMutableMap() + }.associate { it.first to it.second }.toMutableMap() + ) + }.associate { it.first to it.second } + ) + } else { + return MonitorRunResult( + monitor.name, + periodStart, + periodEnd, + OpenSearchStatusException("Monitor Type ${monitor.monitorType} not known", RestStatus.BAD_REQUEST) + ) + } } - return runResult } // TODO: See if we can move below methods (or few of these) to a common utils diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt index b975af728..e60cbac33 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/QueryLevelMonitorRunner.kt @@ -19,6 +19,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.transport.TransportService import java.time.Instant +import java.util.* object QueryLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) @@ -68,7 +69,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { for (trigger in monitor.triggers) { val currentAlert = currentAlerts[trigger] val triggerCtx = QueryLevelTriggerExecutionContext(monitor, trigger as QueryLevelTrigger, monitorResult, currentAlert) - val triggerResult = when (monitor.monitorType) { + val triggerResult = when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { Monitor.MonitorType.QUERY_LEVEL_MONITOR -> monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { @@ -80,7 +81,7 @@ object QueryLevelMonitorRunner : MonitorRunner() { else monitorCtx.triggerService!!.runQueryLevelTrigger(monitor, trigger, triggerCtx) } else -> - throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType.name}.") + throw IllegalArgumentException("Unsupported monitor type: ${monitor.monitorType}.") } triggerResults[trigger.id] = triggerResult diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/RemoteMonitorTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/RemoteMonitorTriggerRunResult.kt new file mode 100644 index 000000000..f6fee226f --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/RemoteMonitorTriggerRunResult.kt @@ -0,0 +1,76 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.model + +import org.opensearch.core.common.io.stream.StreamInput +import org.opensearch.core.common.io.stream.StreamOutput +import org.opensearch.core.xcontent.ToXContent +import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.script.ScriptException +import java.io.IOException + +open class RemoteMonitorTriggerRunResult( + override var triggerName: String, + override var error: Exception? = null, + var actionResultsMap: MutableMap> = mutableMapOf() +) : TriggerRunResult(triggerName, error) { + + @Throws(IOException::class) + @Suppress("UNCHECKED_CAST") + constructor(sin: StreamInput) : this( + triggerName = sin.readString(), + error = sin.readException(), + actionResultsMap = readActionResults(sin) + ) + + override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + if (error is ScriptException) error = Exception((error as ScriptException).toJsonString(), error) + return builder.field("action_results", actionResultsMap as Map) + } + + override fun writeTo(out: StreamOutput) { + super.writeTo(out) + out.writeInt(actionResultsMap.size) + actionResultsMap.forEach { (alert, actionResults) -> + out.writeString(alert) + out.writeInt(actionResults.size) + actionResults.forEach { (id, result) -> + out.writeString(id) + result.writeTo(out) + } + } + } + + companion object { + @JvmStatic + @Throws(IOException::class) + fun readFrom(sin: StreamInput): TriggerRunResult { + return RemoteMonitorTriggerRunResult(sin) + } + + @JvmStatic + fun readActionResults(sin: StreamInput): MutableMap> { + val actionResultsMapReconstruct: MutableMap> = mutableMapOf() + val size = sin.readInt() + var idx = 0 + while (idx < size) { + val alert = sin.readString() + val actionResultsSize = sin.readInt() + val actionRunResultElem = mutableMapOf() + var i = 0 + while (i < actionResultsSize) { + val actionId = sin.readString() + val actionResult = ActionRunResult.readFrom(sin) + actionRunResultElem[actionId] = actionResult + ++i + } + actionResultsMapReconstruct[alert] = actionRunResultElem + ++idx + } + return actionResultsMapReconstruct + } + } +} diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteMonitorRegistry.kt b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteMonitorRegistry.kt new file mode 100644 index 000000000..4994e7d7b --- /dev/null +++ b/alerting/src/main/kotlin/org/opensearch/alerting/remote/monitors/RemoteMonitorRegistry.kt @@ -0,0 +1,10 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.remote.monitors + +import org.opensearch.alerting.spi.RemoteMonitorRunner + +class RemoteMonitorRegistry(val monitorType: String, val monitorRunner: RemoteMonitorRunner) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt index e4c7eb1da..69a38cbd0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestIndexMonitorAction.kt @@ -22,6 +22,7 @@ import org.opensearch.commons.alerting.model.DocumentLevelTrigger import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.QueryLevelTrigger import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.commons.utils.getInvalidNameChars import org.opensearch.commons.utils.isValidName import org.opensearch.core.rest.RestStatus @@ -42,6 +43,7 @@ import org.opensearch.rest.RestResponse import org.opensearch.rest.action.RestResponseListener import java.io.IOException import java.time.Instant +import java.util.Locale private val log = LogManager.getLogger(RestIndexMonitorAction::class.java) @@ -98,33 +100,38 @@ class RestIndexMonitorAction : BaseRestHandler() { validateDataSources(monitor) val monitorType = monitor.monitorType val triggers = monitor.triggers - when (monitorType) { - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")) + if (monitor.isMonitorOfStandardType()) { + when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw (IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for query level monitor")) + } } } - } - Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { - triggers.forEach { - if (it !is BucketLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> { + triggers.forEach { + if (it !is BucketLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for bucket level monitor") + } } } - } - Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { - triggers.forEach { - if (it !is QueryLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + + Monitor.MonitorType.CLUSTER_METRICS_MONITOR -> { + triggers.forEach { + if (it !is QueryLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for cluster metrics monitor") + } } } - } - Monitor.MonitorType.DOC_LEVEL_MONITOR -> { - validateDocLevelQueryName(monitor) - triggers.forEach { - if (it !is DocumentLevelTrigger) { - throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + + Monitor.MonitorType.DOC_LEVEL_MONITOR -> { + validateDocLevelQueryName(monitor) + triggers.forEach { + if (it !is DocumentLevelTrigger) { + throw IllegalArgumentException("Illegal trigger type, ${it.javaClass.name}, for document level monitor") + } } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt index 3f01f470c..084800c22 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteMonitorAction.kt @@ -35,6 +35,7 @@ import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.ConfigConstants import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.commons.authuser.User import org.opensearch.core.action.ActionListener import org.opensearch.core.rest.RestStatus @@ -42,6 +43,7 @@ import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.time.Instant +import java.util.* private val log = LogManager.getLogger(TransportExecuteMonitorAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) @@ -82,7 +84,7 @@ class TransportExecuteMonitorAction @Inject constructor( } try { log.info( - "Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType.name}, " + + "Executing monitor from API - id: ${monitor.id}, type: ${monitor.monitorType}, " + "periodStart: $periodStart, periodEnd: $periodEnd, dryrun: ${execMonitorRequest.dryrun}" ) val monitorRunResult = runner.runJob(monitor, periodStart, periodEnd, execMonitorRequest.dryrun, transportService) @@ -134,7 +136,10 @@ class TransportExecuteMonitorAction @Inject constructor( false -> (execMonitorRequest.monitor as Monitor).copy(user = user) } - if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if ( + monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) { try { scope.launch { if (!docLevelMonitorQueries.docLevelQueryIndexExists(monitor.dataSources)) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt index 809fb78a6..b1f1ed668 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexMonitorAction.kt @@ -65,6 +65,7 @@ import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.SearchInput +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener @@ -82,6 +83,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService import java.io.IOException import java.time.Duration +import java.util.Locale private val log = LogManager.getLogger(TransportIndexMonitorAction::class.java) private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) @@ -525,7 +527,10 @@ class TransportIndexMonitorAction @Inject constructor( throw t } try { - if (request.monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if ( + request.monitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(request.monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) { indexDocLevelMonitorQueries(request.monitor, indexResponse.id, metadata, request.refreshPolicy) } // When inserting queries in queryIndex we could update sourceToQueryIndexMapping @@ -683,7 +688,10 @@ class TransportIndexMonitorAction @Inject constructor( val (metadata, created) = MonitorMetadataService.getOrCreateMetadata(request.monitor) // Recreate runContext if metadata exists // Delete and insert all queries from/to queryIndex - if (created == false && currentMonitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if (!created && + currentMonitor.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(currentMonitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) { updatedMetadata = MonitorMetadataService.recreateRunContext(metadata, currentMonitor) client.suspendUntil { DeleteByQueryRequestBuilder(client, DeleteByQueryAction.INSTANCE) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt index f2eace7e9..d1b41405e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportIndexWorkflowAction.kt @@ -66,6 +66,7 @@ import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.ScheduledJob.Companion.SCHEDULED_JOBS_INDEX import org.opensearch.commons.alerting.model.SearchInput import org.opensearch.commons.alerting.model.Workflow +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.commons.authuser.User import org.opensearch.commons.utils.recreateObject import org.opensearch.core.action.ActionListener @@ -79,6 +80,7 @@ import org.opensearch.rest.RestRequest import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.tasks.Task import org.opensearch.transport.TransportService +import java.util.Locale import java.util.UUID import java.util.stream.Collectors @@ -400,7 +402,9 @@ class TransportIndexWorkflowAction @Inject constructor( log.warn("Metadata doc id:${monitorMetadata.id} exists, but it shouldn't!") } - if (monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if ( + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) { val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor) monitorMetadata = monitorMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping) } @@ -554,7 +558,9 @@ class TransportIndexWorkflowAction @Inject constructor( workflowMetadataId = workflowMetadata.id ) - if (created == false && monitor.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR) { + if (!created && + Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR + ) { var updatedMetadata = MonitorMetadataService.recreateRunContext(monitorMetadata, monitor) val oldMonitorMetadata = MonitorMetadataService.getMetadata(monitor) updatedMetadata = updatedMetadata.copy(sourceToQueryIndexMapping = oldMonitorMetadata!!.sourceToQueryIndexMapping) @@ -632,24 +638,28 @@ class TransportIndexWorkflowAction @Inject constructor( * Returns list of indices for the given monitor depending on it's type */ private fun getMonitorIndices(monitor: Monitor): List { - return when (monitor.monitorType) { - Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices - Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices } - Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { - if (isADMonitor(monitor)) monitor.inputs.flatMap { s -> (s as SearchInput).indices } - else { - val indices = mutableListOf() - for (input in monitor.inputs) { - when (input) { - is SearchInput -> indices.addAll(input.indices) - else -> indices + if (monitor.isMonitorOfStandardType()) { + return when (Monitor.MonitorType.valueOf(monitor.monitorType.uppercase(Locale.ROOT))) { + Monitor.MonitorType.DOC_LEVEL_MONITOR -> (monitor.inputs[0] as DocLevelMonitorInput).indices + Monitor.MonitorType.BUCKET_LEVEL_MONITOR -> monitor.inputs.flatMap { s -> (s as SearchInput).indices } + Monitor.MonitorType.QUERY_LEVEL_MONITOR -> { + if (isADMonitor(monitor)) monitor.inputs.flatMap { s -> (s as SearchInput).indices } + else { + val indices = mutableListOf() + for (input in monitor.inputs) { + when (input) { + is SearchInput -> indices.addAll(input.indices) + else -> indices + } } + indices } - indices } - } - else -> emptyList() + else -> emptyList() + } + } else { + return emptyList() } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt index 355945939..7d98731fb 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/AlertingUtils.kt @@ -26,7 +26,9 @@ import org.opensearch.commons.alerting.model.action.Action import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.ActionExecutionScope import org.opensearch.commons.alerting.util.isBucketLevelMonitor +import org.opensearch.commons.alerting.util.isMonitorOfStandardType import org.opensearch.script.Script +import java.util.Locale import kotlin.math.max private val logger = LogManager.getLogger("AlertingUtils") @@ -78,9 +80,13 @@ fun Destination.isAllowed(allowList: List): Boolean = allowList.contains fun Destination.isTestAction(): Boolean = this.type == DestinationType.TEST_ACTION -fun Monitor.isDocLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.DOC_LEVEL_MONITOR +fun Monitor.isDocLevelMonitor(): Boolean = + this.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.DOC_LEVEL_MONITOR -fun Monitor.isQueryLevelMonitor(): Boolean = this.monitorType == Monitor.MonitorType.QUERY_LEVEL_MONITOR +fun Monitor.isQueryLevelMonitor(): Boolean = + this.isMonitorOfStandardType() && + Monitor.MonitorType.valueOf(this.monitorType.uppercase(Locale.ROOT)) == Monitor.MonitorType.QUERY_LEVEL_MONITOR /** * Since buckets can have multi-value keys, this converts the bucket key values to a string that can be used diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt index 6eda9ec30..49cb863e0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/ADTestHelpers.kt @@ -494,7 +494,7 @@ fun randomADMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index 6a66579ef..8fa3833b1 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -95,7 +95,7 @@ fun randomQueryLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -113,7 +113,7 @@ fun randomQueryLevelMonitorWithoutUser( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.QUERY_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = null, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -137,7 +137,7 @@ fun randomBucketLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -162,7 +162,7 @@ fun randomBucketLevelMonitor( dataSources: DataSources ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.BUCKET_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources @@ -181,7 +181,7 @@ fun randomClusterMetricsMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.CLUSTER_METRICS_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -199,7 +199,7 @@ fun randomDocumentLevelMonitor( withMetadata: Boolean = false ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf() ) @@ -219,7 +219,7 @@ fun randomDocumentLevelMonitor( owner: String? = null ): Monitor { return Monitor( - name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR, enabled = enabled, inputs = inputs, + name = name, monitorType = Monitor.MonitorType.DOC_LEVEL_MONITOR.value, enabled = enabled, inputs = inputs, schedule = schedule, triggers = triggers, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user, uiMetadata = if (withMetadata) mapOf("foo" to "bar") else mapOf(), dataSources = dataSources, owner = owner ) diff --git a/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip b/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip new file mode 100644 index 000000000..29afea2cd Binary files /dev/null and b/alerting/src/test/resources/bwc/alerting/1.1.0.0/opensearch-alerting-1.1.0.0.zip differ diff --git a/sample-remote-monitor-plugin/build.gradle b/sample-remote-monitor-plugin/build.gradle new file mode 100644 index 000000000..19e681b02 --- /dev/null +++ b/sample-remote-monitor-plugin/build.gradle @@ -0,0 +1,178 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +apply plugin: 'opensearch.opensearchplugin' +apply plugin: 'opensearch.testclusters' +apply plugin: 'opensearch.java-rest-test' + +import org.opensearch.gradle.test.RestIntegTestTask +import org.opensearch.gradle.testclusters.StandaloneRestIntegTestTask +import org.apache.tools.ant.taskdefs.condition.Os + +import java.util.concurrent.Callable + + +opensearchplugin { + name 'sample-remote-monitor-plugin' + description 'Sample plugin that extends OpenSearch Alerting plugin' + classname 'org.opensearch.alerting.SampleRemoteMonitorPlugin' + extendedPlugins = ['opensearch-alerting'] +} + +ext { + projectSubstitutions = [:] + licenseFile = rootProject.file('LICENSE.txt') + noticeFile = rootProject.file('NOTICE.txt') +} + +repositories { + mavenLocal() + mavenCentral() + maven { url "https://aws.oss.sonatype.org/content/repositories/snapshots" } +} + +configurations { + zipArchive +} + +dependencies { + compileOnly project(path: ":alerting-spi", configuration: 'shadow') + compileOnly "org.opensearch:common-utils:${common_utils_version}@jar" + compileOnly "com.cronutils:cron-utils:9.1.7" + compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + compileOnly 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1' + compileOnly "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}" + // Needed for integ tests + zipArchive group: 'org.opensearch.plugin', name:'alerting', version: "${opensearch_build}" +} + +def es_tmp_dir = rootProject.file('build/private/es_tmp').absoluteFile +es_tmp_dir.mkdirs() + +File repo = file("$buildDir/testclusters/repo") +def _numNodes = findProperty('numNodes') as Integer ?: 1 + +licenseHeaders.enabled = true +validateNebulaPom.enabled = false +testingConventions.enabled = false +loggerUsageCheck.enabled = false + +javaRestTest.dependsOn(rootProject.assemble) +javaRestTest { + systemProperty 'tests.security.manager', 'false' +} +testClusters.javaRestTest { + testDistribution = 'INTEG_TEST' +} + +task integTest(type: RestIntegTestTask) { + description = "Run tests against a cluster" + testClassesDirs = sourceSets.test.output.classesDirs + classpath = sourceSets.test.runtimeClasspath +} +tasks.named("check").configure { dependsOn(integTest) } + +integTest { + if (project.hasProperty('excludeTests')) { + project.properties['excludeTests']?.replaceAll('\\s', '')?.split('[,;]')?.each { + exclude "${it}" + } + } + systemProperty 'tests.security.manager', 'false' + systemProperty 'java.io.tmpdir', es_tmp_dir.absolutePath + + systemProperty "https", System.getProperty("https") + systemProperty "user", System.getProperty("user") + systemProperty "password", System.getProperty("password") + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can use longer timeouts for + // requests. The 'doFirst' delays reading the debug setting on the cluster till execution time. + doFirst { + // Tell the test JVM if the cluster JVM is running under a debugger so that tests can + // use longer timeouts for requests. + def isDebuggingCluster = getDebug() || System.getProperty("test.debug") != null + systemProperty 'cluster.debug', isDebuggingCluster + // Set number of nodes system property to be used in tests + systemProperty 'cluster.number_of_nodes', "${_numNodes}" + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() + } + } + + // The -Dcluster.debug option makes the cluster debuggable; this makes the tests debuggable + if (System.getProperty("test.debug") != null) { + jvmArgs '-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=8000' + } +} +//project.getTasks().getByName('bundlePlugin').dependsOn(rootProject.project(":alerting").tasks.getByName('build')) +Zip bundle = (Zip) project.getTasks().getByName("bundlePlugin"); +//Zip rootBundle = (Zip) rootProject.project(":alerting").getTasks().getByName("bundlePlugin"); +integTest.dependsOn(bundle) +integTest.getClusters().forEach{c -> { +// c.plugin(rootProject.project(":alerting").getObjects().fileProperty().value(rootBundle.getArchiveFile())) + c.plugin(project.getObjects().fileProperty().value(bundle.getArchiveFile())) +}} + +testClusters.integTest { + testDistribution = 'ARCHIVE' + + // Cluster shrink exception thrown if we try to set numberOfNodes to 1, so only apply if > 1 + if (_numNodes > 1) numberOfNodes = _numNodes + // When running integration tests it doesn't forward the --debug-jvm to the cluster anymore + // i.e. we have to use a custom property to flag when we want to debug OpenSearch JVM + // since we also support multi node integration tests we increase debugPort per node + if (System.getProperty("cluster.debug") != null) { + def debugPort = 5005 + nodes.forEach { node -> + node.jvmArgs("-agentlib:jdwp=transport=dt_socket,server=n,suspend=y,address=*:${debugPort}") + debugPort += 1 + } + } + setting 'path.repo', repo.absolutePath + plugin(provider({ + new RegularFile() { + @Override + File getAsFile() { + return configurations.zipArchive.asFileTree.matching { + include '**/alerting*' + }.singleFile + } + } + })) + nodes.each { node -> + def plugins = node.plugins + def firstPlugin = plugins.get(0) + plugins.remove(0) + plugins.add(firstPlugin) + } +} + +run { + doFirst { + // There seems to be an issue when running multi node run or integ tasks with unicast_hosts + // not being written, the waitForAllConditions ensures it's written + getClusters().forEach { cluster -> + cluster.waitForAllConditions() + } + } + useCluster testClusters.integTest +} + +// As of ES 7.7 the sample-extension-plugin is being added to the list of plugins for the testCluster during build before +// the job-scheduler plugin is causing build failures. +// The job-scheduler zip is added explicitly above but the sample-extension-plugin is added implicitly at some time during evaluation. +// Will need to do a deep dive to find out exactly what task adds the sample-extension-plugin and add job-scheduler there but a temporary hack is to +// reorder the plugins list after evaluation but prior to task execution when the plugins are installed. +afterEvaluate { + testClusters.javaRestTest.nodes.each { node -> + def nodePlugins = node.plugins + def firstPlugin = nodePlugins.get(0) + if (firstPlugin.provider == project.bundlePlugin.archiveFile) { + nodePlugins.remove(0) + nodePlugins.add(firstPlugin) + } + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorPlugin.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorPlugin.java new file mode 100644 index 000000000..1c1df9bfe --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorPlugin.java @@ -0,0 +1,82 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.alerting.spi.RemoteMonitorRunner; +import org.opensearch.alerting.spi.RemoteMonitorRunnerExtension; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.env.Environment; +import org.opensearch.env.NodeEnvironment; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.repositories.RepositoriesService; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; +import org.opensearch.script.ScriptService; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.watcher.ResourceWatcherService; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.function.Supplier; + +public class SampleRemoteMonitorPlugin extends Plugin implements ActionPlugin, RemoteMonitorRunnerExtension { + + private static final Logger log = LogManager.getLogger(SampleRemoteMonitorPlugin.class); + + @Override + public Collection createComponents( + Client client, + ClusterService clusterService, + ThreadPool threadPool, + ResourceWatcherService resourceWatcherService, + ScriptService scriptService, + NamedXContentRegistry xContentRegistry, + Environment environment, + NodeEnvironment nodeEnvironment, + NamedWriteableRegistry namedWriteableRegistry, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier repositoriesServiceSupplier + ) { + SampleRemoteMonitorRunner monitorRunner = SampleRemoteMonitorRunner.getMonitorRunner(); + return Collections.emptyList(); + } + + @Override + public String getMonitorType() { + return "sample_remote_monitor"; + } + + @Override + public RemoteMonitorRunner getMonitorRunner() { + return SampleRemoteMonitorRunner.getMonitorRunner(); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return Collections.singletonList(new SampleRemoteMonitorRestHandler()); + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java new file mode 100644 index 000000000..4c7c31a73 --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRestHandler.java @@ -0,0 +1,103 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting; + +import org.opensearch.action.support.WriteRequest; +import org.opensearch.client.node.NodeClient; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.commons.alerting.action.AlertingActions; +import org.opensearch.commons.alerting.action.IndexMonitorRequest; +import org.opensearch.commons.alerting.action.IndexMonitorResponse; +import org.opensearch.commons.alerting.model.DataSources; +import org.opensearch.commons.alerting.model.IntervalSchedule; +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.index.seqno.SequenceNumbers; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; + +import java.io.IOException; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class SampleRemoteMonitorRestHandler extends BaseRestHandler { + + @Override + public String getName() { + return "sample-remote-monitor-rest-handler"; + } + + @Override + public List routes() { + return Collections.unmodifiableList( + Arrays.asList(new Route(RestRequest.Method.POST, "_plugins/_sample_remote_monitor/monitor")) + ); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest restRequest, NodeClient client) throws IOException { + Monitor monitor = new Monitor( + Monitor.NO_ID, + Monitor.NO_VERSION, + "sample_remote_monitor", + true, + new IntervalSchedule(5, ChronoUnit.MINUTES, null), + Instant.now(), + Instant.now(), + "sample_remote_monitor", + null, + 0, + List.of(), + List.of(), + Map.of(), + new DataSources(), + "sample-remote-monitor-plugin" + ); + IndexMonitorRequest indexMonitorRequest = new IndexMonitorRequest( + Monitor.NO_ID, + SequenceNumbers.UNASSIGNED_SEQ_NO, + SequenceNumbers.UNASSIGNED_PRIMARY_TERM, + WriteRequest.RefreshPolicy.IMMEDIATE, + RestRequest.Method.POST, + monitor, + null + ); + + return restChannel -> { + client.doExecute( + AlertingActions.INDEX_MONITOR_ACTION_TYPE, + indexMonitorRequest, + new ActionListener<>() { + @Override + public void onResponse(IndexMonitorResponse indexMonitorResponse) { + try { + RestResponse restResponse = new BytesRestResponse( + RestStatus.OK, + indexMonitorResponse.toXContent(JsonXContent.contentBuilder(), ToXContent.EMPTY_PARAMS) + ); + restChannel.sendResponse(restResponse); + } catch (IOException e) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + + @Override + public void onFailure(Exception e) { + restChannel.sendResponse(new BytesRestResponse(RestStatus.INTERNAL_SERVER_ERROR, e.getMessage())); + } + } + ); + }; + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRunner.java b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRunner.java new file mode 100644 index 000000000..71491d9ad --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/java/org/opensearch/alerting/SampleRemoteMonitorRunner.java @@ -0,0 +1,53 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.alerting.spi.RemoteMonitorRunResult; +import org.opensearch.alerting.spi.RemoteMonitorRunner; +import org.opensearch.commons.alerting.model.Monitor; +import org.opensearch.transport.TransportService; + +import java.time.Instant; +import java.util.List; +import java.util.Map; + +public class SampleRemoteMonitorRunner implements RemoteMonitorRunner { + + private static final Logger log = LogManager.getLogger(SampleRemoteMonitorRunner.class); + + private static SampleRemoteMonitorRunner INSTANCE; + + public static SampleRemoteMonitorRunner getMonitorRunner() { + if (INSTANCE != null) { + return INSTANCE; + } + synchronized (SampleRemoteMonitorRunner.class) { + if (INSTANCE != null) { + return INSTANCE; + } + INSTANCE = new SampleRemoteMonitorRunner(); + return INSTANCE; + } + } + + @Override + public RemoteMonitorRunResult runMonitor( + Monitor monitor, + Instant periodStart, + Instant periodEnd, + boolean dryrun, + TransportService transportService + ) { + log.info("hit here"); + return new RemoteMonitorRunResult( + List.of(), + null, + Map.of() + ); + } +} \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/main/resources/META-INF/services/org.opensearch.alerting.spi.RemoteMonitorRunnerExtension b/sample-remote-monitor-plugin/src/main/resources/META-INF/services/org.opensearch.alerting.spi.RemoteMonitorRunnerExtension new file mode 100644 index 000000000..16ebe5897 --- /dev/null +++ b/sample-remote-monitor-plugin/src/main/resources/META-INF/services/org.opensearch.alerting.spi.RemoteMonitorRunnerExtension @@ -0,0 +1,6 @@ +# +# Copyright OpenSearch Contributors +# SPDX-License-Identifier: Apache-2.0 +# + +org.opensearch.alerting.SampleRemoteMonitorPlugin \ No newline at end of file diff --git a/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java new file mode 100644 index 000000000..c3a6ec444 --- /dev/null +++ b/sample-remote-monitor-plugin/src/test/java/org/opensearch/alerting/SampleRemoteMonitorIT.java @@ -0,0 +1,61 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting; + +import org.apache.hc.core5.http.Header; +import org.apache.hc.core5.http.HttpEntity; +import org.junit.Assert; +import org.opensearch.client.Request; +import org.opensearch.client.RequestOptions; +import org.opensearch.client.Response; +import org.opensearch.client.RestClient; +import org.opensearch.client.WarningsHandler; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.test.rest.OpenSearchRestTestCase; + +import java.io.IOException; +import java.util.Map; + +public class SampleRemoteMonitorIT extends OpenSearchRestTestCase { + + public void testE2E() throws IOException { + Response response = makeRequest(client(), "POST", "_plugins/_sample_remote_monitor/monitor", Map.of(), null); + Assert.assertEquals("Unable to create a watcher job", RestStatus.OK, RestStatus.fromCode(response.getStatusLine().getStatusCode())); + + Map responseJson = JsonXContent.jsonXContent.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getEntity().getContent() + ).map(); + Assert.assertEquals(1, 1); + } + + protected Response makeRequest( + RestClient client, + String method, + String endpoint, + Map params, + HttpEntity entity, + Header... headers + ) throws IOException { + Request request = new Request(method, endpoint); + RequestOptions.Builder options = RequestOptions.DEFAULT.toBuilder(); + options.setWarningsHandler(WarningsHandler.PERMISSIVE); + + for (Header header : headers) { + options.addHeader(header.getName(), header.getValue()); + } + request.setOptions(options.build()); + request.addParameters(params); + if (entity != null) { + request.setEntity(entity); + } + return client.performRequest(request); + } +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index bd847942f..bf5b63e34 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,5 +6,11 @@ rootProject.name = 'opensearch-alerting' include 'alerting' include 'core' +include 'spi' project(":core").name = 'alerting-core' +project(":spi").name = 'alerting-spi' + +include 'sample-remote-monitor-plugin' +project(":sample-remote-monitor-plugin").name = "alerting-sample-remote-monitor-plugin" +startParameter.excludedTaskNames=["publishPluginZipPublicationToMavenLocal", "publishPluginZipPublicationToStagingRepository"] diff --git a/spi/build.gradle b/spi/build.gradle new file mode 100644 index 000000000..1da8d06e4 --- /dev/null +++ b/spi/build.gradle @@ -0,0 +1,16 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +apply plugin: 'java' +apply plugin: 'opensearch.java-rest-test' +apply plugin: 'org.jetbrains.kotlin.jvm' +apply plugin: 'jacoco' +apply plugin: 'com.github.johnrengelman.shadow' + +dependencies { + compileOnly "org.opensearch:opensearch:${opensearch_version}" + compileOnly "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}" + compileOnly "org.opensearch:common-utils:${common_utils_version}@jar" +} \ No newline at end of file diff --git a/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunResult.kt b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunResult.kt new file mode 100644 index 000000000..f5259b3a9 --- /dev/null +++ b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunResult.kt @@ -0,0 +1,29 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.spi + +import java.time.Instant + +class RemoteActionRunResult( + val actionId: String, + val actionName: String, + val output: Map, + val throttled: Boolean = false, + val executionTime: Instant? = null, + val error: Exception? = null +) + +class RemoteMonitorTriggerRunResult( + val triggerName: String, + val error: Exception? = null, + val actionResultsMap: MutableMap> +) + +class RemoteMonitorRunResult( + val results: List>, + val error: Exception?, + val triggerResults: Map = mapOf() +) \ No newline at end of file diff --git a/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunner.kt b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunner.kt new file mode 100644 index 000000000..2792df4c9 --- /dev/null +++ b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunner.kt @@ -0,0 +1,21 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.spi + +import org.opensearch.commons.alerting.model.Monitor +import org.opensearch.transport.TransportService +import java.time.Instant + +interface RemoteMonitorRunner { + + fun runMonitor( + monitor: Monitor, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService + ): RemoteMonitorRunResult +} \ No newline at end of file diff --git a/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunnerExtension.kt b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunnerExtension.kt new file mode 100644 index 000000000..a432c59f3 --- /dev/null +++ b/spi/src/main/kotlin/org/opensearch/alerting/spi/RemoteMonitorRunnerExtension.kt @@ -0,0 +1,13 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.alerting.spi + +interface RemoteMonitorRunnerExtension { + + fun getMonitorType(): String + + fun getMonitorRunner(): RemoteMonitorRunner +} \ No newline at end of file