Skip to content

Commit

Permalink
doc-level monitor fan-out approach (#1523)
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Apr 19, 2024
1 parent fd8febb commit 274b125
Show file tree
Hide file tree
Showing 38 changed files with 2,520 additions and 1,036 deletions.
23 changes: 19 additions & 4 deletions alerting/src/main/kotlin/org/opensearch/alerting/AlertingPlugin.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.alerting

import org.opensearch.action.ActionRequest
import org.opensearch.alerting.action.DocLevelMonitorFanOutAction
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteWorkflowAction
import org.opensearch.alerting.action.GetDestinationsAction
Expand Down Expand Up @@ -54,6 +55,7 @@ import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportDocLevelMonitorFanOutAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
import org.opensearch.alerting.transport.TransportExecuteWorkflowAction
import org.opensearch.alerting.transport.TransportGetAlertsAction
Expand Down Expand Up @@ -223,7 +225,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(DocLevelMonitorFanOutAction.INSTANCE, TransportDocLevelMonitorFanOutAction::class.java)
)
}

Expand Down Expand Up @@ -257,6 +260,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
val alertService = AlertService(client, xContentRegistry, alertIndices)
val triggerService = TriggerService(scriptService)
runner = MonitorRunnerService
.registerClusterService(clusterService)
.registerClient(client)
Expand All @@ -267,8 +272,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerThreadPool(threadPool)
.registerAlertIndices(alertIndices)
.registerInputService(InputService(client, scriptService, namedWriteableRegistry, xContentRegistry, clusterService, settings))
.registerTriggerService(TriggerService(scriptService))
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerTriggerService(triggerService)
.registerAlertService(alertService)
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
Expand Down Expand Up @@ -298,7 +303,16 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

DeleteMonitorService.initialize(client)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
return listOf(
sweeper,
scheduler,
runner,
scheduledJobIndices,
docLevelMonitorQueries,
destinationMigrationCoordinator,
alertService,
triggerService
)
}

override fun getSettings(): List<Setting<*>> {
Expand Down Expand Up @@ -329,6 +343,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
AlertingSettings.ALERT_HISTORY_RETENTION_PERIOD,
AlertingSettings.ALERTING_MAX_MONITORS,
AlertingSettings.PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
AlertingSettings.DOC_LEVEL_MONITOR_FAN_OUT_NODES,
DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
AlertingSettings.PERCOLATE_QUERY_MAX_NUM_DOCS_IN_MEMORY,
AlertingSettings.REQUEST_TIMEOUT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import org.opensearch.search.aggregations.AggregatorFactories
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.transport.TransportService
import java.time.Instant
import java.util.UUID

Expand All @@ -68,7 +69,8 @@ object BucketLevelMonitorRunner : MonitorRunner() {
periodEnd: Instant,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
executionId: String
executionId: String,
transportService: TransportService
): MonitorRunResult<BucketLevelTriggerRunResult> {
val roles = MonitorRunnerService.getRolesForMonitor(monitor)
logger.debug("Running monitor: ${monitor.name} with roles: $roles Thread: ${Thread.currentThread().name}")
Expand Down
Loading

0 comments on commit 274b125

Please sign in to comment.