Skip to content

Commit

Permalink
fixed rollupFieldValueExpressionResolverTests
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz committed Aug 10, 2022
1 parent e0d03eb commit 7a6bade
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,15 +78,16 @@ class RollupMapperService(
@Suppress("ReturnCount")
suspend fun targetIndexIsValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {

if (!RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {
if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
return false
}
// All other backing indices have to have this rollup job in _META field
val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(targetIndexResolvedName)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
when (jobExistsInRollupIndex(rollup, it.index.name)) {
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return false
else -> {}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ object RollupFieldValueExpressionResolver {

private lateinit var scriptService: ScriptService
private lateinit var clusterService: ClusterService
lateinit var indexAliasUtils: IndexAliasUtils
fun resolve(rollup: Rollup, fieldValue: String): String {
val script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, fieldValue, mapOf())

Expand All @@ -34,36 +35,39 @@ object RollupFieldValueExpressionResolver {
.newInstance(script.params + mapOf("ctx" to contextMap))
.execute()

if (isAlias(compiledValue)) {
compiledValue = getWriteIndexNameForAlias(compiledValue)
if (indexAliasUtils.isAlias(compiledValue)) {
compiledValue = indexAliasUtils.getWriteIndexNameForAlias(compiledValue)
}

return if (compiledValue.isNullOrBlank()) fieldValue else compiledValue
}

fun registerScriptService(scriptService: ScriptService) {
fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
this.indexAliasUtils = IndexAliasUtils(clusterService)
}
fun hasAlias(index: String): Boolean {
val aliases = clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0

class IndexAliasUtils(val clusterService: ClusterService) {

fun hasAlias(index: String): Boolean {
val aliases = this.clusterService.state().metadata().indices.get(index)?.aliases
if (aliases != null) {
return aliases.size() > 0
}
return false
}
return false
}
fun isAlias(index: String): Boolean {
return clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}
fun getWriteIndexNameForAlias(alias: String): String? {
return clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}
fun isAlias(index: String): Boolean {
return this.clusterService.state().metadata().indicesLookup?.get(index) is IndexAbstraction.Alias
}

fun registerServices(scriptService: ScriptService, clusterService: ClusterService) {
this.scriptService = scriptService
this.clusterService = clusterService
fun getWriteIndexNameForAlias(alias: String): String? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.writeIndex?.index?.name
}

fun getBackingIndicesForAlias(alias: String): MutableList<IndexMetadata>? {
return this.clusterService.state().metadata().indicesLookup?.get(alias)?.indices
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -734,7 +734,7 @@ class RollupRunnerIT : RollupRestTestCase() {
}

fun `test rollup action with alias as target_index successfully`() {
generateNYCTaxiData("source_runner_sixth")
generateNYCTaxiData("source_runner_sixth_eleventh")

// Create index with alias, without mappings
val indexAlias = "alias_as_target_index"
Expand All @@ -750,14 +750,14 @@ class RollupRunnerIT : RollupRestTestCase() {
refreshAllIndices()

val rollup = Rollup(
id = "page_size_runner_sixth",
id = "runner_with_alias_as_target",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = "source_runner_sixth",
sourceIndex = "source_runner_sixth_eleventh",
targetIndex = indexAlias,
metadataID = null,
roles = emptyList(),
Expand All @@ -774,6 +774,7 @@ class RollupRunnerIT : RollupRestTestCase() {
)
).let { createRollup(it, it.id) }

// First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META
updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex)) }
Expand All @@ -785,13 +786,18 @@ class RollupRunnerIT : RollupRestTestCase() {
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
rollupJob
}
var rollupMetadataID = startedRollup.metadataID!!
var rollupMetadata = getRollupMetadata(rollupMetadataID)
assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)

// restart job
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}",
emptyMap(), rollup.copy(enabled = true).toHttpEntity()
)
// Second run, backing index is setup just like any other rollup index
updateRollupStartTime(rollup)

startedRollup = waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
Expand All @@ -801,11 +807,10 @@ class RollupRunnerIT : RollupRestTestCase() {
rollupJob
}

val rollupMetadataID = startedRollup.metadataID!!
val rollupMetadata = getRollupMetadata(rollupMetadataID)
rollupMetadataID = startedRollup.metadataID!!
rollupMetadata = getRollupMetadata(rollupMetadataID)

// Randomly choosing 100.. if it didn't work we'd either fail hitting the timeout in waitFor or we'd have thousands of pages processed
assertTrue("Did not have less than 100 pages processed", rollupMetadata.stats.documentsProcessed > 0)
assertEquals("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)
}

// TODO: Test scenarios:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.nhaarman.mockitokotlin2.mock
import com.nhaarman.mockitokotlin2.whenever
import com.nhaarman.mockitokotlin2.doReturn
import org.junit.Before
import org.opensearch.cluster.service.ClusterService
import org.opensearch.indexmanagement.rollup.randomRollup
import org.opensearch.ingest.TestTemplateService
import org.opensearch.script.ScriptService
Expand All @@ -20,19 +21,31 @@ import org.opensearch.test.OpenSearchTestCase
class RollupFieldValueExpressionResolverTests : OpenSearchTestCase() {

private val scriptService: ScriptService = mock()

private val clusterService: ClusterService = mock()
private val indexAliasUtils: RollupFieldValueExpressionResolver.IndexAliasUtils = mock()
@Before
fun settings() {
RollupFieldValueExpressionResolver.registerScriptService(scriptService)
RollupFieldValueExpressionResolver.registerServices(scriptService, clusterService)
clusterService.state()
}

fun `test resolving successfully`() {
fun `test resolving no alias successfully`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
whenever(indexAliasUtils.hasAlias(any())).doReturn(false)
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("test_index_123", targetIndexResolvedName)
}

fun `test resolving with alias successfully`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory("test_index_123"))
whenever(indexAliasUtils.hasAlias(any())).doReturn(true)
whenever(indexAliasUtils.getWriteIndexNameForAlias(any())).doReturn("backing_index")
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
val targetIndexResolvedName = RollupFieldValueExpressionResolver.resolve(rollup, rollup.targetIndex)
assertEquals("backing_index", targetIndexResolvedName)
}

fun `test resolving failed returned passed value`() {
whenever(scriptService.compile(any(), eq(TemplateScript.CONTEXT))).doReturn(TestTemplateService.MockTemplateScript.Factory(""))
val rollup = randomRollup().copy(sourceIndex = "test_index_123", targetIndex = "{{ctx.source_index}}")
Expand Down

0 comments on commit 7a6bade

Please sign in to comment.