Skip to content

Commit

Permalink
added support for mustache scripting of rollup.target_index field (op…
Browse files Browse the repository at this point in the history
…ensearch-project#435)

* tests
* small refactor/improvements
* added wildcard check when creating rollup job; removed resolving targetIndex on Rollup init; added test for wildcards
* lint fixes
* moved target_index validation in getRollup resp handler
* removed catch block
* exception fix
* linter fix
* fixed IT fail
* added Exception catch block

Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz committed Aug 10, 2022
1 parent 7d4bace commit b88b996
Show file tree
Hide file tree
Showing 6 changed files with 204 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.rollup

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchException
import org.opensearch.action.support.WriteRequest
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.index.engine.VersionConflictEngineException
Expand Down Expand Up @@ -62,7 +63,9 @@ class AttemptCreateRollupJobStep(private val action: RollupAction) : Step(name)
}
} catch (e: RemoteTransportException) {
processFailure(rollup.id, indexName, ExceptionsHelper.unwrapCause(e) as Exception)
} catch (e: RemoteTransportException) {
} catch (e: OpenSearchException) {
processFailure(rollup.id, indexName, e)
} catch (e: Exception) {
processFailure(rollup.id, indexName, e)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.model.RollupStats
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_COUNT
import org.opensearch.indexmanagement.rollup.settings.RollupSettings.Companion.ROLLUP_INGEST_BACKOFF_MILLIS
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getInitialDocValues
import org.opensearch.indexmanagement.util.IndexUtils.Companion.ODFE_MAGIC_NULL
import org.opensearch.indexmanagement.util.IndexUtils.Companion.hashToFixedSize
Expand Down Expand Up @@ -123,7 +124,8 @@ class RollupIndexer(
}
}
mapOfKeyValues.putAll(aggResults)
val indexRequest = IndexRequest(job.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(job, job.targetIndex)
val indexRequest = IndexRequest(targetIndexResolvedName)
.id(documentId)
.source(mapOfKeyValues, XContentType.JSON)
requests.add(indexRequest)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.commons.authuser.User
import org.opensearch.indexmanagement.IndexManagementIndices
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import org.opensearch.indexmanagement.rollup.model.Rollup
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.parseRollup
import org.opensearch.indexmanagement.settings.IndexManagementSettings
import org.opensearch.indexmanagement.util.IndexUtils
Expand Down Expand Up @@ -91,6 +92,14 @@ class TransportIndexRollupAction @Inject constructor(
if (response.isAcknowledged) {
log.info("Successfully created or updated $INDEX_MANAGEMENT_INDEX with newest mappings.")
if (request.opType() == DocWriteRequest.OpType.CREATE) {
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
} else {
getRollup()
Expand Down Expand Up @@ -128,6 +137,14 @@ class TransportIndexRollupAction @Inject constructor(
if (modified.isNotEmpty()) {
return actionListener.onFailure(OpenSearchStatusException("Not allowed to modify $modified", RestStatus.BAD_REQUEST))
}
if (!validateTargetIndexName()) {
return actionListener.onFailure(
OpenSearchStatusException(
"target_index value is invalid: ${request.rollup.targetIndex}",
RestStatus.BAD_REQUEST
)
)
}
putRollup()
}

Expand Down Expand Up @@ -172,5 +189,10 @@ class TransportIndexRollupAction @Inject constructor(
}
)
}

private fun validateTargetIndexName(): Boolean {
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return targetIndexResolvedName.contains("*") == false && targetIndexResolvedName.contains("?") == false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentHelper
import org.opensearch.common.xcontent.XContentType
import org.opensearch.indexmanagement.indexstatemanagement.util.XCONTENT_WITHOUT_TYPE
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.threadpool.ThreadPool
import org.opensearch.transport.TransportService
Expand All @@ -50,7 +51,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
private val log = LogManager.getLogger(javaClass)

override fun checkBlock(request: UpdateRollupMappingRequest, state: ClusterState): ClusterBlockException? {
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(request.rollup.targetIndex))
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
return state.blocks.indicesBlockedException(ClusterBlockLevel.METADATA_WRITE, arrayOf(targetIndexResolvedName))
}

@Suppress("ReturnCount", "LongMethod")
Expand All @@ -59,7 +61,8 @@ class TransportUpdateRollupMappingAction @Inject constructor(
state: ClusterState,
listener: ActionListener<AcknowledgedResponse>
) {
val index = state.metadata.index(request.rollup.targetIndex)
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(request.rollup, request.rollup.targetIndex)
val index = state.metadata.index(targetIndexResolvedName)
if (index == null) {
log.debug("Could not find index [$index]")
return listener.onFailure(IllegalStateException("Could not find index [$index]"))
Expand Down Expand Up @@ -113,7 +116,7 @@ class TransportUpdateRollupMappingAction @Inject constructor(
}

// TODO: Does schema_version get overwritten?
val putMappingRequest = PutMappingRequest(request.rollup.targetIndex).source(metaMappings)
val putMappingRequest = PutMappingRequest(targetIndexResolvedName).source(metaMappings)
client.admin().indices().putMapping(
putMappingRequest,
object : ActionListener<AcknowledgedResponse> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.indexmanagement.rollup.model.metric.Sum
import org.opensearch.indexmanagement.rollup.model.metric.ValueCount
import org.opensearch.indexmanagement.rollup.toJsonString
import org.opensearch.indexmanagement.waitFor
import java.lang.Thread.sleep
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand Down Expand Up @@ -146,6 +147,74 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
assertIndexRolledUp(indexName, policyID, rollup)
}

fun `test data stream rollup action with scripted targetIndex`() {
val dataStreamName = "${testIndexName}_data_stream"
val policyID = "${testIndexName}_rollup_policy"

sleep(10000)

val rollup = ISMRollup(
description = "data stream rollup",
targetIndex = "rollup_{{ctx.source_index}}",
pageSize = 100,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count",
targetField = "passenger_count",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
),
RollupMetrics(
sourceField = "total_amount",
targetField = "total_amount",
metrics = listOf(Max(), Min())
)
)
)

// Create an ISM policy to rollup backing indices of a data stream.
val actionConfig = RollupAction(rollup, 0)
val states = listOf(State("rollup", listOf(actionConfig), listOf()))
val policy = Policy(
id = policyID,
description = "data stream rollup policy",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states,
ismTemplate = listOf(ISMTemplate(listOf(dataStreamName), 100, Instant.now().truncatedTo(ChronoUnit.MILLIS)))
)
createPolicy(policy, policyID)

val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " +
"\"keyword\" }, \"PULocationID\": { \"type\": \"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " +
"{ \"type\": \"double\" }}"

// Create an index template for a data stream with the given source index mapping.
client().makeRequest(
"PUT",
"/_index_template/rollup-data-stream-template",
StringEntity(
"{ " +
"\"index_patterns\": [ \"$dataStreamName\" ], " +
"\"data_stream\": { \"timestamp_field\": { \"name\": \"tpep_pickup_datetime\" } }, " +
"\"template\": { \"mappings\": { $sourceIndexMappingString } } }",
ContentType.APPLICATION_JSON
)
)
client().makeRequest("PUT", "/_data_stream/$dataStreamName")

// Ensure rollup works on backing indices of a data stream.
val indexName = DataStream.getDefaultBackingIndexName(dataStreamName, 1L)
assertIndexRolledUp(indexName, policyID, rollup)
assertIndexExists("rollup_$indexName")
}

fun `test rollup action failure`() {
val indexName = "${testIndexName}_index_failure"
val policyID = "${testIndexName}_policy_failure"
Expand Down Expand Up @@ -215,6 +284,64 @@ class RollupActionIT : IndexStateManagementRestTestCase() {
}
}

fun `test rollup action create failure due to wildcards in target_index`() {
val indexName = "${testIndexName}_index_failure"
val policyID = "${testIndexName}_policy_failure"
val rollup = ISMRollup(
description = "basic search test",
targetIndex = "target_with_wildcard*",
pageSize = 100,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count", targetField = "passenger_count",
metrics = listOf(
Sum(), Min(), Max(),
ValueCount(), Average()
)
)
)
)
val rollupId = rollup.toRollup(indexName).id
val actionConfig = RollupAction(rollup, 0)
val states = listOf(
State("rollup", listOf(actionConfig), listOf())
)
val sourceIndexMappingString = "\"properties\": {\"tpep_pickup_datetime\": { \"type\": \"date\" }, \"RatecodeID\": { \"type\": " +
"\"keyword\" }, \"passenger_count\": { \"type\": \"integer\" }, \"total_amount\": " +
"{ \"type\": \"double\" }}"
val policy = Policy(
id = policyID,
description = "$testIndexName description",
schemaVersion = 1L,
lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS),
errorNotification = randomErrorNotification(),
defaultState = states[0].name,
states = states
)
createPolicy(policy, policyID)
createIndex(indexName, policyID, mapping = sourceIndexMappingString)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)

// Change the start time so the job will initialize the policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) }

// Change the start time so we attempt to create rollup step will execute
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor {
assertEquals(
AttemptCreateRollupJobStep.getFailedMessage(rollupId, indexName),
getExplainManagedIndexMetaData(indexName).info?.get("message")
)
}
}

fun `test rollup action failure and retry failed step`() {
val indexName = "${testIndexName}_index_retry"
val policyID = "${testIndexName}_policy_retry"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.indexmanagement.rollup.util

import com.nhaarman.mockitokotlin2.any
import com.nhaarman.mockitokotlin2.eq
import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import com.nhaarman.mockitokotlin2.doReturn
import org.junit.Before
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.ingest.TestTemplateService
import org.opensearch.script.ScriptService
import org.opensearch.script.TemplateScript
import org.opensearch.test.OpenSearchTestCase

class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() {

private val scriptService: ScriptService = mock()

@Before
fun settings() {
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
}

fun `test resolving successfully`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("test_index_123", targetIndexResolvedName)
}

fun `test resolving failed returned passed value`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory(""))
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("{{ctx.source_index}}", targetIndexResolvedName)
}
}

0 comments on commit b88b996

Please sign in to comment.