Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for mustache scripting of rollup.target_index field #435

Merged
merged 14 commits into from
Aug 5, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ import org.opensearch.indexmanagement.rollup.resthandler.RestStartRollupAction
import org.opensearch.indexmanagement.rollup.resthandler.RestStopRollupAction
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestCreateSMPolicyHandler
import org.opensearch.indexmanagement.snapshotmanagement.api.resthandler.RestDeleteSMPolicyHandler
Expand Down Expand Up @@ -370,6 +371,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
this.indexNameExpressionResolver = indexNameExpressionResolver

val skipFlag = SkipExecution(client, clusterService)
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
val rollupRunner = RollupRunner
.registerClient(client)
.registerClusterService(clusterService)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getInitialDocValues
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
Expand Down Expand Up @@ -123,7 +124,8 @@ class RollupIndexer(
}
}
mapOfKeyValues.putAll(aggResults)
val indexRequest = IndexRequest(job.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
val indexRequest = IndexRequest(targetIndexResolvedName)
.id(documentId)
.source(mapOfKeyValues, XContentType.JSON)
requests.add(indexRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
Expand All @@ -52,14 +53,14 @@ class RollupMapperService(
// If the index already exists we need to verify it's a rollup index,
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup): RollupJobValidationResult {
if (!isRollupIndex(rollup.targetIndex, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [${rollup.targetIndex}] is a non rollup index")
private suspend fun validateAndAttemptToUpdateTargetIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}

return when (val jobExistsResult = jobExistsInRollupIndex(rollup)) {
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup)
is RollupJobValidationResult.Invalid -> updateRollupIndexMappings(rollup, targetIndexResolvedName)
is RollupJobValidationResult.Failure -> jobExistsResult
}
}
Expand All @@ -69,14 +70,15 @@ class RollupMapperService(
// TODO: error handling
@Suppress("ReturnCount")
suspend fun attemptCreateRollupTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): RollupJobValidationResult {
if (indexExists(job.targetIndex)) {
return validateAndAttemptToUpdateTargetIndex(job)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
if (indexExists(targetIndexResolvedName)) {
return validateAndAttemptToUpdateTargetIndex(job, targetIndexResolvedName)
} else {
val errorMessage = "Failed to create target index [${job.targetIndex}]"
val errorMessage = "Failed to create target index [$targetIndexResolvedName]"
return try {
val response = createTargetIndex(job, hasLegacyPlugin)
val response = createTargetIndex(targetIndexResolvedName, hasLegacyPlugin)
if (response.isAcknowledged) {
updateRollupIndexMappings(job)
updateRollupIndexMappings(job, targetIndexResolvedName)
} else {
RollupJobValidationResult.Failure(errorMessage)
}
Expand All @@ -94,13 +96,13 @@ class RollupMapperService(
}
}

private suspend fun createTargetIndex(job: Rollup, hasLegacyPlugin: Boolean): CreateIndexResponse {
private suspend fun createTargetIndex(targetIndexName: String, hasLegacyPlugin: Boolean): CreateIndexResponse {
val settings = if (hasLegacyPlugin) {
Settings.builder().put(LegacyOpenDistroRollupSettings.ROLLUP_INDEX.key, true).build()
} else {
Settings.builder().put(RollupSettings.ROLLUP_INDEX.key, true).build()
}
val request = CreateIndexRequest(job.targetIndex)
val request = CreateIndexRequest(targetIndexName)
.settings(settings)
.mapping(IndexManagementIndices.rollupTargetMappings)
// TODO: Perhaps we can do better than this for mappings... as it'll be dynamic for rest
Expand Down Expand Up @@ -204,19 +206,19 @@ class RollupMapperService(
return field != null
}

private suspend fun jobExistsInRollupIndex(rollup: Rollup): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(rollup.targetIndex)) {
private suspend fun jobExistsInRollupIndex(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val res = when (val getMappingsResult = getMappings(targetIndexResolvedName)) {
is GetMappingsResult.Success -> getMappingsResult.response
is GetMappingsResult.Failure ->
return RollupJobValidationResult.Failure(getMappingsResult.message, getMappingsResult.cause)
}

val indexMapping: MappingMetadata = res.mappings[rollup.targetIndex]
val indexMapping: MappingMetadata = res.mappings[targetIndexResolvedName]

return if (((indexMapping.sourceAsMap?.get(_META) as Map<*, *>?)?.get(ROLLUPS) as Map<*, *>?)?.containsKey(rollup.id) == true) {
RollupJobValidationResult.Valid
} else {
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [${rollup.targetIndex}]")
RollupJobValidationResult.Invalid("Rollup job [${rollup.id}] does not exist in rollup index [$targetIndexResolvedName]")
}
}

Expand Down Expand Up @@ -254,8 +256,8 @@ class RollupMapperService(
// where they can both get the same mapping state and only add their own job, meaning one
// of the jobs won't be added to the target index _meta
@Suppress("BlockingMethodInNonBlockingContext", "ReturnCount")
private suspend fun updateRollupIndexMappings(rollup: Rollup): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [${rollup.targetIndex}] with rollup job"
private suspend fun updateRollupIndexMappings(rollup: Rollup, targetIndexResolvedName: String): RollupJobValidationResult {
val errorMessage = "Failed to update mappings of target index [$targetIndexResolvedName] with rollup job"
try {
val response = withContext(Dispatchers.IO) {
val resp: AcknowledgedResponse = client.suspendUntil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -59,7 +60,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
state: ClusterState,
listener: ActionListener<AcknowledgedResponse>
) {
val index = state.metadata.index(request.rollup.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
petardz marked this conversation as resolved.
Show resolved Hide resolved
val index = state.metadata.index(targetIndexResolvedName)
if (index == null) {
log.debug("Could not find index [$index]")
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
Expand Down Expand Up @@ -113,7 +115,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
}

// TODO: Does schema_version get overwritten?
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings)
val targetIndexResovledName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
petardz marked this conversation as resolved.
Show resolved Hide resolved
val putMappingRequest = PutMappingRequest(targetIndexResovledName).source(metaMappings)
client.admin().indices().putMapping(
putMappingRequest,
object : ActionListener<AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import org.opensearch.common.bytes.BytesReference
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.script.Script
import org.opensearch.script.ScriptService
import org.opensearch.script.ScriptType
import org.opensearch.script.TemplateScript

object RollupFieldValueExpressionResolver {

private val validTopContextFields = setOf("source_index")
petardz marked this conversation as resolved.
Show resolved Hide resolved

private lateinit var scriptService: ScriptService

fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

val contextMap = XContentHelper.convertToMap(
BytesReference.bytes(rollup.toXContent(XContentFactory.jsonBuilder(), XCONTENT_WITHOUT_TYPE)),
false,
XContentType.JSON
).v2().filterKeys { key ->
petardz marked this conversation as resolved.
Show resolved Hide resolved
key in validTopContextFields
}

val compiledValue = scriptService.compile(script, TemplateScript.CONTEXT)
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

return if (compiledValue.isBlank()) fieldValue else compiledValue
}

fun registerScriptService(scriptService: ScriptService) {
this.scriptService = scriptService
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,76 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
assertIndexRolledUp(indexName, policyID, rollup)
}

fun `test data stream rollup action with scripted targetIndex`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollup_policy"

val rollup = ISMRollup(
description = "data stream rollup",
targetIndex = "rollup_{{ctx.source_index}}",
pageSize = 100,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count",
targetField = "passenger_count",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
),
RollupMetrics(
sourceField = "total_amount",
targetField = "total_amount",
metrics = listOf(Max(), Min())
)
)
)

// Create an ISM policy to rollup backing indices of a data stream.
val actionConfig = RollupAction(rollup, 0)
val states = listOf(State("rollup", listOf(actionConfig), listOf()))
val policy = Policy(
id = policyID,
description = "data stream rollup policy",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS)))
)
createPolicy(policy, policyID)

val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " +
"\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " +
"{ \"type\": \"double\" }}"

// Create an index template for a data stream with the given source index mapping.
client().makeRequest(
"PUT",
"/_index_template/rollup-data-stream-template",
StringEntity(
"{ " +
"\"index_patterns\": [ \"$dataStreamName\" ], " +
"\"data_stream\": { \"timestamp_field\": { \"name\": \"tpep_pickup_datetime\" } }, " +
"\"template\": { \"mappings\": { $sourceIndexMappingString } } }",
ContentType.APPLICATION_JSON
)
)
client().makeRequest("PUT", "/_data_stream/$dataStreamName")

// Ensure rollup works on backing indices of a data stream.
val indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L)
assertIndexRolledUp(indexName, policyID, rollup)

val catIndex = (cat("indices/rollup_$indexName?format=json&bytes=b") as List<Map<String, Any>>)
petardz marked this conversation as resolved.
Show resolved Hide resolved
.find { it["index"] == "rollup_$indexName" }

assertNotNull(catIndex)
}

fun `test rollup action failure`() {
val indexName = "${testIndexName}_index_failure"
val policyID = "${testIndexName}_policy_failure"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import com.nhaarman.mockitokotlin2.doReturn
import org.junit.Before
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.ingest.TestTemplateService
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.test.OpenSearchTestCase

class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() {

private val scriptService: ScriptService = mock()

@Before
fun settings() {
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
}

fun `test resolving successfully`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("test_index_123", targetIndexResolvedName)
}

fun `test resolving failed returned passed value`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory(""))
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("{{ctx.source_index}}", targetIndexResolvedName)
}
}