Skip to content

Commit

Permalink
alias in rollup target_index field (opensearch-project#445)
Browse files Browse the repository at this point in the history
* added support for mustache scripting of rollup.target_index field (opensearch-project#435)
* tests
* small refactor/improvements
* added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards
* lint fixes
* moved target_index validation in getRollup resp handler
* removed catch block
* fixed IT fail
* added Exception catch block

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz committed Nov 3, 2022
1 parent 1856295 commit 70cf4ea
Show file tree
Hide file tree
Showing 7 changed files with 645 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Histogram
Expand All @@ -33,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.transport.RemoteTransportException
Expand All @@ -50,29 +55,105 @@ class RollupMapperService(

private val logger = LogManager.getLogger(javaClass)

// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
/**
* If the index already exists we need to verify it's a rollup index,
* confirm it does not conflict with existing jobs and is a valid job.
* If
*
* @param rollup Rollup job we're currently executing
* @param targetIndexResolvedName concrete index name
* @param hasLegacyPlugin flag to indicate if we're running legacy plugin
* @return RollupJobValidationResult indicating success or failure with appropriate error message included.
*/
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
private suspend fun validateAndAttemptToUpdateTargetIndex(
rollup: Rollup,
targetIndexResolvedName: String,
hasLegacyPlugin: Boolean
): RollupJobValidationResult {
if (rollup.isTargetIndexAlias()) {
val aliasValidationResult = validateTargetIndexAlias(rollup, targetIndexResolvedName)
if (aliasValidationResult !is RollupJobValidationResult.Valid) {
return aliasValidationResult
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
}
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}

/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
@Suppress("ReturnCount")
suspend fun validateTargetIndexAlias(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {

var errorMessage: String

if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
logger.error("[${rollup.targetIndex}] is not an alias!")
return RollupJobValidationResult.Failure("[${rollup.targetIndex}] is not an alias!")
}

val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
if (rollupJobs != null &&
(rollupJobs.size > 1 || rollupJobs[0].id != rollup.id)
) {
errorMessage = "More than one rollup job present on the backing index, cannot add alias for target index: [$targetIndexResolvedName]"
logger.error(errorMessage)
return RollupJobValidationResult.Failure(errorMessage)
}

// All other backing indices have to have this rollup job in _META field and it has to be the only one!
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
val allRollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
val validationResult = validateNonWriteBackingIndex(it.index.name, rollup, allRollupJobs)
if (validationResult !is RollupJobValidationResult.Valid) {
return validationResult
}
}
}
return RollupJobValidationResult.Valid
}

suspend fun validateNonWriteBackingIndex(backingIndex: String, currentRollupJob: Rollup, rollupJobs: List<Rollup>?): RollupJobValidationResult {
var errorMessage = ""
if (rollupJobs == null) {
errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]"
} else if (rollupJobs.size == 1 && rollupJobs[0].id != currentRollupJob.id) {
errorMessage = "Backing index [$backingIndex] has to have owner rollup job with id:[${currentRollupJob.id}]"
} else if (rollupJobs.size > 1) {
errorMessage = "Backing index [$backingIndex] has multiple rollup job owners"
}
if (errorMessage.isNotEmpty()) {
logger.error(errorMessage)
return RollupJobValidationResult.Failure(errorMessage)
}
return RollupJobValidationResult.Valid
}

// This creates the target index if it doesn't already else validate the target index is rollup index
// If the target index mappings doesn't contain rollup job attempts to update the mappings.
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
val validationResult = validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
when (validationResult) {
is RollupJobValidationResult.Failure -> logger.error(validationResult.message)
is RollupJobValidationResult.Invalid -> logger.error(validationResult.reason)
}
return validationResult
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
Expand All @@ -96,6 +177,53 @@ class RollupMapperService(
}
}

suspend fun addRollupSettingToIndex(targetIndexResolvedName: String, hasLegacyPlugin: Boolean): Boolean {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
}
return resp.isAcknowledged
}
@Suppress("ReturnCount")
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
var errorMessage = ""
try {
// 1. First we need to add index.plugins.rollup_index setting to index
if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false) {
logger.error("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
}

// 2. Put rollup target_index mappings
val putMappingRequest: PutMappingRequest =
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
putMapping(putMappingRequest, it)
}
if (!respMappings.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to put initial rollup mappings for target index [$targetIndexResolvedName]")
}
// 3. Add this rollup job to target_index's _meta
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
updateRollupIndexMappings(rollup, targetIndexResolvedName)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error(errorMessage, unwrappedException)
RollupJobValidationResult.Failure(errorMessage, unwrappedException)
} catch (e: OpenSearchSecurityException) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure(errorMessage, e)
}
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RollupSettings {
"index.plugins.rollup_index",
LegacyOpenDistroRollupSettings.ROLLUP_INDEX,
Setting.Property.IndexScope,
Setting.Property.InternalIndex
Setting.Property.Dynamic
)

val ROLLUP_INGEST_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
Expand All @@ -19,22 +22,58 @@ object RollupFieldValueExpressionResolver {
private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService

private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils
fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
if (indexAliasUtils.isAlias(compiledValue)) {
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
}

return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
}

fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = IndexAliasUtils(clusterService)
}

fun registerScriptService(scriptService: ScriptService) {
fun registerServices(scriptService: ScriptService, clusterService: ClusterService, indexAliasUtils: IndexAliasUtils) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = indexAliasUtils
}

open class IndexAliasUtils(val clusterService: ClusterService) {

open fun hasAlias(index: String): Boolean {
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0
}
return false
}

open fun isAlias(index: String): Boolean {
return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}

open fun getWriteIndexNameForAlias(alias: String): String? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

open fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ fun isRollupIndex(index: String, clusterState: ClusterState): Boolean {
return false
}

fun Rollup.isTargetIndexAlias(): Boolean {
return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex)
}

fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
val query = if (metadata.continuous != null) {
RangeQueryBuilder(this.getDateHistogram().sourceField)
Expand Down
Loading

0 comments on commit 70cf4ea

Please sign in to comment.