Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Set the cancelAfterTimeInterval parameter on SearchRequest object in all MonitorRunners. (#1366) #1504

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -453,6 +455,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
Expand All @@ -41,6 +42,7 @@ import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.routing.Preference
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
Expand Down Expand Up @@ -116,7 +118,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -881,7 +882,9 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
.size(monitorCtx.docLevelMonitorShardFetchSize)
)
.preference(Preference.PRIMARY_FIRST.type())

request.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
if (monitorCtx.fetchOnlyQueryFieldNames && fieldsToFetch.isNotEmpty()) {
request.source().fetchSource(false)
for (field in fieldsToFetch) {
Expand Down Expand Up @@ -936,7 +939,12 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
"$monitorInputIndices against query index $queryIndices"
)
var response: SearchResponse

try {
searchRequest.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)

response = monitorCtx.client!!.suspendUntil {
monitorCtx.client!!.execute(SearchAction.INSTANCE, searchRequest, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
Expand Down Expand Up @@ -153,6 +154,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
ALERT_BACKOFF_MILLIS.get(monitorCtx.settings),
ALERT_BACKOFF_COUNT.get(monitorCtx.settings)
)

monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count ->
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
Expand All @@ -169,6 +173,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
monitorCtx.cancelAfterTimeInterval = it
}
monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
monitorCtx.allowList = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,6 @@ class TransportGetFindingsSearchAction @Inject constructor(
)
}
searchSourceBuilder.query(queryBuilder).trackTotalHits(true)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
Expand All @@ -24,6 +26,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -149,6 +152,16 @@ fun defaultToPerExecutionAction(
return false
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes)
}

/**
* Mustache template supports iterating through a list using a `{{#listVariable}}{{/listVariable}}` block.
* https://mustache.github.io/mustache.5.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)

settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down
Loading