Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alias in rollup target_index field #445

Merged
merged 20 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest
import org.opensearch.action.admin.indices.create.CreateIndexResponse
import org.opensearch.action.admin.indices.mapping.get.GetMappingsRequest
import org.opensearch.action.admin.indices.mapping.get.GetMappingsResponse
import org.opensearch.action.admin.indices.mapping.put.PutMappingRequest
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest
import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.client.Client
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.metadata.MappingMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.common.model.dimension.Histogram
Expand All @@ -33,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.transport.RemoteTransportException
Expand All @@ -53,26 +58,70 @@ class RollupMapperService(
// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
private suspend fun validateAndAttemptToUpdateTargetIndex(
rollup: Rollup,
targetIndexResolvedName: String,
Comment on lines +70 to +71
Copy link
Contributor

@khushbr khushbr Oct 31, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of documentation chages:

  1. Let us add KDoc for description of parameters and return values. Explain the various terminology and any assumptions we are making with their operation.
  2. rename rollup to rollupJob
  3. rename targetIndexResolvedName

Copy link
Contributor Author

@petardz petardz Nov 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. done
  2. "rollup" is used throughout all Rollup related files. We should probably rename all or none.
  3. What do you suggest? This is called "resolved name" since it can be resolved to concrete index name from mustache script or from alias

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I was thinking of calling it plainly as concreteIndex ?

hasLegacyPlugin: Boolean
): RollupJobValidationResult {
/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
var isValidAlias = isTargetIndexValidAlias(rollup, targetIndexResolvedName)
petardz marked this conversation as resolved.
Show resolved Hide resolved
if (rollup.isTargetIndexAlias() && isValidAlias) {
// During first run on alias, backing index is not prepared. We should set it up here. (settings, mappings, _meta)
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
}
} else if (rollup.isTargetIndexAlias() && !isValidAlias) {
return RollupJobValidationResult.Failure("Target index alias [${rollup.targetIndex}] validation failed")
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}

@Suppress("ReturnCount")
suspend fun isTargetIndexValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {

if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
return false
}

val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
if (rollupJobs != null &&
(rollupJobs.size > 1 || rollupJobs[0].id != rollup.id)
) {
logger.error("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
petardz marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// All other backing indices have to have this rollup job in _META field and it has to be the only one!
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
val rollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (rollupJobs == null || rollupJobs.size > 1 || rollupJobs[0].id != rollup.id) {
logger.error("If target_index is alias, all backing indicies must have only this rollup job present in _meta!")
return false
}
}
}
return true
}

// This creates the target index if it doesn't already else validate the target index is rollup index
// If the target index mappings doesn't contain rollup job attempts to update the mappings.
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName, hasLegacyPlugin)
} else {
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
Expand All @@ -96,6 +145,52 @@ class RollupMapperService(
}
}

suspend fun addRollupSettingToIndex(targetIndexResolvedName: String, hasLegacyPlugin: Boolean): Boolean {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the builder have the retries built it ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean on updateSettings call on client? If yes, then exception would be thrown and job would fail. We don't have any retry in place for these kind of errors.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a conversation outside this PR, but I am assuming we do not retry the updateSettings call as we expect the next cycle of ISM execution to take care of any transient errors ?

} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
}
return resp.isAcknowledged
}
@Suppress("ReturnCount")
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
var errorMessage = ""
try {
// 1. First we need to add index.plugins.rollup_index setting to index
if (addRollupSettingToIndex(targetIndexResolvedName, hasLegacyPlugin) == false) {
return RollupJobValidationResult.Invalid("Failed to update rollup settings for target index: [$targetIndexResolvedName]")
khushbr marked this conversation as resolved.
Show resolved Hide resolved
}

// 2. Put rollup mappings
val putMappingRequest: PutMappingRequest =
PutMappingRequest(targetIndexResolvedName).source(IndexManagementIndices.rollupTargetMappings, XContentType.JSON)
val respMappings: AcknowledgedResponse = client.admin().indices().suspendUntil {
putMapping(putMappingRequest, it)
}
if (!respMappings.isAcknowledged) {
return RollupJobValidationResult.Invalid("Failed to put initial rollup mappings for target index [$targetIndexResolvedName]")
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code has been repeated multiple times in repo (refer this), can we instead re-use the checkAndUpdateIndexMapping() helper function in IndexUtils.kt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkAndUpdateIndexMapping() wouldn't work because of shouldUpdate call

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldUpdateIndex() will fail because at this point there are no Index mapping. Would it make sense to add a mappingCheck as param, default value as true to checkAndUpdateIndexMapping() and then re-use it ?

// 3.
khushbr marked this conversation as resolved.
Show resolved Hide resolved
errorMessage = "Failed to update mappings for target index [$targetIndexResolvedName]"
updateRollupIndexMappings(rollup, targetIndexResolvedName)
} catch (e: RemoteTransportException) {
val unwrappedException = ExceptionsHelper.unwrapCause(e) as Exception
logger.error(errorMessage, unwrappedException)
RollupJobValidationResult.Failure(errorMessage, unwrappedException)
} catch (e: OpenSearchSecurityException) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure("$errorMessage - missing required cluster permissions: ${e.localizedMessage}", e)
} catch (e: Exception) {
logger.error("$errorMessage because ", e)
RollupJobValidationResult.Failure(errorMessage, e)
}
return RollupJobValidationResult.Valid
}

private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RollupSettings {
"index.plugins.rollup_index",
LegacyOpenDistroRollupSettings.ROLLUP_INDEX,
Setting.Property.IndexScope,
Setting.Property.InternalIndex
Setting.Property.Dynamic
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
)

val ROLLUP_INGEST_BACKOFF_MILLIS: Setting<TimeValue> = Setting.positiveTimeSetting(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.opensearchapi.toMap
Expand All @@ -19,22 +22,58 @@ object RollupFieldValueExpressionResolver {
private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD)

private lateinit var scriptService: ScriptService

private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils
fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)
.toMap()
.filterKeys { key -> key in validTopContextFields }

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
var compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
if (indexAliasUtils.isAlias(compiledValue)) {
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
}

return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
}

fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = IndexAliasUtils(clusterService)
}

fun registerScriptService(scriptService: ScriptService) {
fun registerServices(scriptService: ScriptService, clusterService: ClusterService, indexAliasUtils: IndexAliasUtils) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = indexAliasUtils
}

open class IndexAliasUtils(val clusterService: ClusterService) {

open fun hasAlias(index: String): Boolean {
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0
}
return false
}

open fun isAlias(index: String): Boolean {
return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}

open fun getWriteIndexNameForAlias(alias: String): String? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

open fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ fun isRollupIndex(index: String, clusterState: ClusterState): Boolean {
return false
}

fun Rollup.isTargetIndexAlias(): Boolean {
return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex)
}

fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
val query = if (metadata.continuous != null) {
RangeQueryBuilder(this.getDateHistogram().sourceField)
Expand Down
Loading