From e858ab26e3afb7b3dd3a1fd62afbc4d459d11f5b Mon Sep 17 00:00:00 2001 From: Petar Dzepina Date: Fri, 5 Aug 2022 19:52:52 +0200 Subject: [PATCH] added support for mustache scripting of rollup.target_index field (#435) * added support for mustache scripting of rollup.target_index field Signed-off-by: Petar Dzepina * defekt fixes Signed-off-by: Petar Dzepina * tests Signed-off-by: Petar Dzepina * small refactor/improvements Signed-off-by: Petar Dzepina * added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards Signed-off-by: Petar Dzepina * lint fixes Signed-off-by: Petar Dzepina * moved target_index validation in getRollup resp handler Signed-off-by: Petar Dzepina * added using toMap() Signed-off-by: Petar Dzepina * removed catch block Signed-off-by: Petar Dzepina * exception fix Signed-off-by: Petar Dzepina * linter fix Signed-off-by: Petar Dzepina * fixed IT fail Signed-off-by: Petar Dzepina * added Exception catch block Signed-off-by: Petar Dzepina --- .../indexmanagement/IndexManagementPlugin.kt | 2 + .../step/rollup/AttemptCreateRollupJobStep.kt | 5 +- .../indexmanagement/rollup/RollupIndexer.kt | 4 +- .../rollup/RollupMapperService.kt | 38 +++--- .../index/TransportIndexRollupAction.kt | 22 +++ .../TransportUpdateRollupMappingAction.kt | 9 +- .../RollupFieldValueExpressionResolver.kt | 40 ++++++ .../action/RollupActionIT.kt | 127 ++++++++++++++++++ ...RollupFieldValueExpressionResolverTests.kt | 42 ++++++ 9 files changed, 266 insertions(+), 23 deletions(-) create mode 100644 src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt create mode 100644 src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt diff --git a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt index cc36d13d4..116a5b055 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/IndexManagementPlugin.kt @@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings +import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler @@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin this.indexNameExpressionResolver = indexNameExpressionResolver val skipFlag = SkipExecution(client, clusterService) + RollupFieldValueExpressionResolver.registerScriptService(scriptService) val rollupRunner = RollupRunner .registerClient(client) .registerClusterService(clusterService) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt index e4a6c6392..f86d0a3b5 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/indexstatemanagement/step/rollup/AttemptCreateRollupJobStep.kt @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollup import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper +import org.opensearch.OpenSearchException import org.opensearch.action.support.WriteRequest import org.opensearch.action.support.master.AcknowledgedResponse import org.opensearch.index.engine.VersionConflictEngineException @@ -62,7 +63,9 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name) } } catch (e: RemoteTransportException) { processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception) - } catch (e: RemoteTransportException) { + } catch (e: OpenSearchException) { + processFailure(rollup.id, indexName, e) + } catch (e: Exception) { processFailure(rollup.id, indexName, e) } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt index 8c3c5fb93..5e4a7a9ee 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupIndexer.kt @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupStats import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS +import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.rollup.util.getInitialDocValues import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize @@ -123,7 +124,8 @@ class RollupIndexer( } } mapOfKeyValues.putAll(aggResults) - val indexRequest = IndexRequest(job.targetIndex) + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex) + val indexRequest = IndexRequest(targetIndexResolvedName) .id(documentId) .source(mapOfKeyValues, XContentType.JSON) requests.add(indexRequest) diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt index ab7c5b195..0830d60bb 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/RollupMapperService.kt @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings import org.opensearch.indexmanagement.rollup.settings.RollupSettings +import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.rollup.util.isRollupIndex import org.opensearch.indexmanagement.util.IndexUtils.Companion._META import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings @@ -52,14 +53,14 @@ class RollupMapperService( // If the index already exists we need to verify it's a rollup index, // confirm it does not conflict with existing jobs and is a valid job @Suppress("ReturnCount") - private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult { - if (!isRollupIndex(rollup.targetIndex, clusterService.state())) { - return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index") + private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult { + if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) { + return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index") } - return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) { + return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) { is RollupJobValidationResult.Valid -> jobExistsResult - is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup) + is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName) is RollupJobValidationResult.Failure -> jobExistsResult } } @@ -69,14 +70,15 @@ class RollupMapperService( // TODO: error handling @Suppress("ReturnCount") suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult { - if (indexExists(job.targetIndex)) { - return validateAndAttemptToUpdateTargetIndex(job) + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex) + if (indexExists(targetIndexResolvedName)) { + return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName) } else { - val errorMessage = "Failed to create target index [${job.targetIndex}]" + val errorMessage = "Failed to create target index [$targetIndexResolvedName]" return try { - val response = createTargetIndex(job, hasLegacyPlugin) + val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin) if (response.isAcknowledged) { - updateRollupIndexMappings(job) + updateRollupIndexMappings(job, targetIndexResolvedName) } else { RollupJobValidationResult.Failure(errorMessage) } @@ -94,13 +96,13 @@ class RollupMapperService( } } - private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse { + private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse { val settings = if (hasLegacyPlugin) { Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build() } else { Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build() } - val request = CreateIndexRequest(job.targetIndex) + val request = CreateIndexRequest(targetIndexName) .settings(settings) .mapping(IndexManagementIndices.rollupTargetMappings) // TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest @@ -204,19 +206,19 @@ class RollupMapperService( return field != null } - private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult { - val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) { + private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult { + val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) { is GetMappingsResult.Success -> getMappingsResult.response is GetMappingsResult.Failure -> return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause) } - val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex] + val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName] return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) { RollupJobValidationResult.Valid } else { - RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]") + RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]") } } @@ -254,8 +256,8 @@ class RollupMapperService( // where they can both get the same mapping state and only add their own job, meaning one // of the jobs won't be added to the target index _meta @Suppress("BlockingMethodInNonBlockingContext", "ReturnCount") - private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult { - val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job" + private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult { + val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job" try { val response = withContext(Dispatchers.IO) { val resp: AcknowledgedResponse = client.suspendUntil { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt index ce18a94d6..251ab7767 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/index/TransportIndexRollupAction.kt @@ -28,6 +28,7 @@ import org.opensearch.commons.authuser.User import org.opensearch.indexmanagement.IndexManagementIndices import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.rollup.util.parseRollup import org.opensearch.indexmanagement.settings.IndexManagementSettings import org.opensearch.indexmanagement.util.IndexUtils @@ -91,6 +92,14 @@ class TransportIndexRollupAction @Inject constructor( if (response.isAcknowledged) { log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.") if (request.opType() == DocWriteRequest.OpType.CREATE) { + if (!validateTargetIndexName()) { + return actionListener.onFailure( + OpenSearchStatusException( + "target_index value is invalid: ${request.rollup.targetIndex}", + RestStatus.BAD_REQUEST + ) + ) + } putRollup() } else { getRollup() @@ -128,6 +137,14 @@ class TransportIndexRollupAction @Inject constructor( if (modified.isNotEmpty()) { return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST)) } + if (!validateTargetIndexName()) { + return actionListener.onFailure( + OpenSearchStatusException( + "target_index value is invalid: ${request.rollup.targetIndex}", + RestStatus.BAD_REQUEST + ) + ) + } putRollup() } @@ -172,5 +189,10 @@ class TransportIndexRollupAction @Inject constructor( } ) } + + private fun validateTargetIndexName(): Boolean { + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex) + return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false + } } } diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt index 74d94d41f..667bdc82a 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/action/mapping/TransportUpdateRollupMappingAction.kt @@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentHelper import org.opensearch.common.xcontent.XContentType import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE +import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver import org.opensearch.indexmanagement.util.IndexUtils.Companion._META import org.opensearch.threadpool.ThreadPool import org.opensearch.transport.TransportService @@ -50,7 +51,8 @@ class TransportUpdateRollupMappingAction @Inject constructor( private val log = LogManager.getLogger(javaClass) override fun checkBlock(request: UpdateRollupMappingRequest, state: ClusterState): ClusterBlockException? { - return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(request.rollup.targetIndex)) + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex) + return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(targetIndexResolvedName)) } @Suppress("ReturnCount", "LongMethod") @@ -59,7 +61,8 @@ class TransportUpdateRollupMappingAction @Inject constructor( state: ClusterState, listener: ActionListener ) { - val index = state.metadata.index(request.rollup.targetIndex) + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex) + val index = state.metadata.index(targetIndexResolvedName) if (index == null) { log.debug("Could not find index [$index]") return listener.onFailure(IllegalStateException("Could not find index [$index]")) @@ -113,7 +116,7 @@ class TransportUpdateRollupMappingAction @Inject constructor( } // TODO: Does schema_version get overwritten? - val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings) + val putMappingRequest = PutMappingRequest(targetIndexResolvedName).source(metaMappings) client.admin().indices().putMapping( putMappingRequest, object : ActionListener { diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt new file mode 100644 index 000000000..c28db5704 --- /dev/null +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolver.kt @@ -0,0 +1,40 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.util + +import org.opensearch.common.xcontent.XContentFactory +import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE +import org.opensearch.indexmanagement.opensearchapi.toMap +import org.opensearch.indexmanagement.rollup.model.Rollup +import org.opensearch.script.Script +import org.opensearch.script.ScriptService +import org.opensearch.script.ScriptType +import org.opensearch.script.TemplateScript + +object RollupFieldValueExpressionResolver { + + private val validTopContextFields = setOf(Rollup.SOURCE_INDEX_FIELD) + + private lateinit var scriptService: ScriptService + + fun resolve(rollup: Rollup, fieldValue: String): String { + val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf()) + + val contextMap = rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE) + .toMap() + .filterKeys { key -> key in validTopContextFields } + + val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT) + .newInstance(script.params + mapOf("ctx" to contextMap)) + .execute() + + return if (compiledValue.isBlank()) fieldValue else compiledValue + } + + fun registerScriptService(scriptService: ScriptService) { + this.scriptService = scriptService + } +} diff --git a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt index 7b6c06f21..19ae0239b 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/indexstatemanagement/action/RollupActionIT.kt @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Sum import org.opensearch.indexmanagement.rollup.model.metric.ValueCount import org.opensearch.indexmanagement.rollup.toJsonString import org.opensearch.indexmanagement.waitFor +import java.lang.Thread.sleep import java.time.Instant import java.time.temporal.ChronoUnit import java.util.Locale @@ -146,6 +147,74 @@ class RollupActionIT : IndexStateManagementRestTestCase() { assertIndexRolledUp(indexName, policyID, rollup) } + fun `test data stream rollup action with scripted targetIndex`() { + val dataStreamName = "${testIndexName}_data_stream" + val policyID = "${testIndexName}_rollup_policy" + + sleep(10000) + + val rollup = ISMRollup( + description = "data stream rollup", + targetIndex = "rollup_{{ctx.source_index}}", + pageSize = 100, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", + targetField = "passenger_count", + metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average()) + ), + RollupMetrics( + sourceField = "total_amount", + targetField = "total_amount", + metrics = listOf(Max(), Min()) + ) + ) + ) + + // Create an ISM policy to rollup backing indices of a data stream. + val actionConfig = RollupAction(rollup, 0) + val states = listOf(State("rollup", listOf(actionConfig), listOf())) + val policy = Policy( + id = policyID, + description = "data stream rollup policy", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS))) + ) + createPolicy(policy, policyID) + + val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " + + "\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " + + "{ \"type\": \"double\" }}" + + // Create an index template for a data stream with the given source index mapping. + client().makeRequest( + "PUT", + "/_index_template/rollup-data-stream-template", + StringEntity( + "{ " + + "\"index_patterns\": [ \"$dataStreamName\" ], " + + "\"data_stream\": { \"timestamp_field\": { \"name\": \"tpep_pickup_datetime\" } }, " + + "\"template\": { \"mappings\": { $sourceIndexMappingString } } }", + ContentType.APPLICATION_JSON + ) + ) + client().makeRequest("PUT", "/_data_stream/$dataStreamName") + + // Ensure rollup works on backing indices of a data stream. + val indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L) + assertIndexRolledUp(indexName, policyID, rollup) + assertIndexExists("rollup_$indexName") + } + fun `test rollup action failure`() { val indexName = "${testIndexName}_index_failure" val policyID = "${testIndexName}_policy_failure" @@ -215,6 +284,64 @@ class RollupActionIT : IndexStateManagementRestTestCase() { } } + fun `test rollup action create failure due to wildcards in target_index`() { + val indexName = "${testIndexName}_index_failure" + val policyID = "${testIndexName}_policy_failure" + val rollup = ISMRollup( + description = "basic search test", + targetIndex = "target_with_wildcard*", + pageSize = 100, + dimensions = listOf( + DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"), + Terms("RatecodeID", "RatecodeID"), + Terms("PULocationID", "PULocationID") + ), + metrics = listOf( + RollupMetrics( + sourceField = "passenger_count", targetField = "passenger_count", + metrics = listOf( + Sum(), Min(), Max(), + ValueCount(), Average() + ) + ) + ) + ) + val rollupId = rollup.toRollup(indexName).id + val actionConfig = RollupAction(rollup, 0) + val states = listOf( + State("rollup", listOf(actionConfig), listOf()) + ) + val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " + + "\"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " + + "{ \"type\": \"double\" }}" + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID, mapping = sourceIndexMappingString) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will initialize the policy + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Change the start time so we attempt to create rollup step will execute + updateManagedIndexConfigStartTime(managedIndexConfig) + waitFor { + assertEquals( + AttemptCreateRollupJobStep.getFailedMessage(rollupId, indexName), + getExplainManagedIndexMetaData(indexName).info?.get("message") + ) + } + } + fun `test rollup action failure and retry failed step`() { val indexName = "${testIndexName}_index_retry" val policyID = "${testIndexName}_policy_retry" diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt new file mode 100644 index 000000000..fff2f2b4d --- /dev/null +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/util/RollupFieldValueExpressionResolverTests.kt @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.indexmanagement.rollup.util + +import com.nhaarman.mockitokotlin2.any +import com.nhaarman.mockitokotlin2.eq +import com.nhaarman.mockitokotlin2.mock +import com.nhaarman.mockitokotlin2.whenever +import com.nhaarman.mockitokotlin2.doReturn +import org.junit.Before +import org.opensearch.indexmanagement.rollup.randomRollup +import org.opensearch.ingest.TestTemplateService +import org.opensearch.script.ScriptService +import org.opensearch.script.TemplateScript +import org.opensearch.test.OpenSearchTestCase + +class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() { + + private val scriptService: ScriptService = mock() + + @Before + fun settings() { + RollupFieldValueExpressionResolver.registerScriptService(scriptService) + } + + fun `test resolving successfully`() { + whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123")) + val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) + assertEquals("test_index_123", targetIndexResolvedName) + } + + fun `test resolving failed returned passed value`() { + whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("")) + val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}") + val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex) + assertEquals("{{ctx.source_index}}", targetIndexResolvedName) + } +}