Skip to content

Commit

Permalink
added support for mustache scripting of rollup.target_index field (#435)
Browse files Browse the repository at this point in the history
* added support for mustache scripting of rollup.target_index field

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* defekt fixes

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* tests

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* small refactor/improvements

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* lint fixes

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* moved target_index validation in getRollup resp handler

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added using toMap()

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* removed catch block

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* exception fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* linter fix

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* fixed IT fail

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>

* added Exception catch block

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
(cherry picked from commit e858ab2)
  • Loading branch information
petardz authored and github-actions[bot] committed Aug 5, 2022
1 parent a1526f8 commit 78663e7
Show file tree
Hide file tree
Showing 9 changed files with 266 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
Expand Down Expand Up @@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollup

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchException
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.index.engine.VersionConflictEngineException
Expand Down Expand Up @@ -62,7 +63,9 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
}
} catch (e: RemoteTransportException) {
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: RemoteTransportException) {
} catch (e: OpenSearchException) {
processFailure(rollup.id, indexName, e)
} catch (e: Exception) {
processFailure(rollup.id, indexName, e)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getInitialDocValues
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
Expand Down Expand Up @@ -123,7 +124,8 @@ class RollupIndexer(
}
}
mapOfKeyValues.putAll(aggResults)
val indexRequest = IndexRequest(job.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
val indexRequest = IndexRequest(targetIndexResolvedName)
.id(documentId)
.source(mapOfKeyValues, XContentType.JSON)
requests.add(indexRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
Expand All @@ -52,14 +53,14 @@ class RollupMapperService(
// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}
Expand All @@ -69,14 +70,15 @@ class RollupMapperService(
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
if (indexExists(job.targetIndex)) {
return validateAndAttemptToUpdateTargetIndex(job)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
} else {
val errorMessage = "Failed to create target index [${job.targetIndex}]"
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(job, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job)
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
RollupJobValidationResult.Failure(errorMessage)
}
Expand All @@ -94,13 +96,13 @@ class RollupMapperService(
}
}

private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val request = CreateIndexRequest(job.targetIndex)
val request = CreateIndexRequest(targetIndexName)
.settings(settings)
.mapping(IndexManagementIndices.rollupTargetMappings)
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
Expand Down Expand Up @@ -204,19 +206,19 @@ class RollupMapperService(
return field != null
}

private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
is GetMappingsResult.Success -> getMappingsResult.response
is GetMappingsResult.Failure ->
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]

return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
RollupJobValidationResult.Valid
} else {
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
}
}

Expand Down Expand Up @@ -254,8 +256,8 @@ class RollupMapperService(
// where they can both get the same mapping state and only add their own job, meaning one
// of the jobs won't be added to the target index _meta
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
try {
val response = withContext(Dispatchers.IO) {
val resp: AcknowledgedResponse = client.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.commons.authuser.User
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.parseRollup
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexUtils
Expand Down Expand Up @@ -91,6 +92,14 @@ class TransportIndexRollupAction @Inject constructor(
if (response.isAcknowledged) {
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
if (request.opType() == DocWriteRequest.OpType.CREATE) {
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
} else {
getRollup()
Expand Down Expand Up @@ -128,6 +137,14 @@ class TransportIndexRollupAction @Inject constructor(
if (modified.isNotEmpty()) {
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
}
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
}

Expand Down Expand Up @@ -172,5 +189,10 @@ class TransportIndexRollupAction @Inject constructor(
}
)
}

private fun validateTargetIndexName(): Boolean {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
Expand All @@ -50,7 +51,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
private val log = LogManager.getLogger(javaClass)

override fun checkBlock(request: UpdateRollupMappingRequest, state: ClusterState): ClusterBlockException? {
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(request.rollup.targetIndex))
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(targetIndexResolvedName))
}

@Suppress("ReturnCount", "LongMethod")
Expand All @@ -59,7 +61,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
state: ClusterState,
listener: ActionListener<AcknowledgedResponse>
) {
val index = state.metadata.index(request.rollup.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
val index = state.metadata.index(targetIndexResolvedName)
if (index == null) {
log.debug("Could not find index [$index]")
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
Expand Down Expand Up @@ -113,7 +116,7 @@ class TransportUpdateRollupMappingAction @Inject constructor(
}

// TODO: Does schema_version get overwritten?
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings)
val putMappingRequest = PutMappingRequest(targetIndexResolvedName).source(metaMappings)
client.admin().indices().putMapping(
putMappingRequest,
object : ActionListener<AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript

object RollupFieldValueExpressionResolver {

private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService

fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
}

fun registerScriptService(scriptService: ScriptService) {
this.scriptService = scriptService
}
}
Loading

0 comments on commit 78663e7

Please sign in to comment.