Skip to content

Commit

Permalink
refactor and ITs
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
petardz committed Aug 10, 2022
1 parent 9adfb37 commit 4e7d173
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,30 +57,15 @@ class RollupMapperService(
// confirm it does not conflict with existing jobs and is a valid job
@Suppress("ReturnCount")
private suspend fun validateAndAttemptToUpdateTargetIndex(
rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {

if (!isRollupIndex(targetIndexResolvedName, clusterService.state()) &&
RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {

val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
when (val jobExistsResult = jobExistsInRollupIndex(rollup, it.index.name)) {
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return jobExistsResult
}
}
}
val mappings = getMappings(targetIndexResolvedName)
if (mappings is GetMappingsResult.Failure) {
return RollupJobValidationResult.Failure("Failed to get mappings for target index: $targetIndexResolvedName")
} else if (mappings is GetMappingsResult.Success &&
mappings.response.mappings()?.get(targetIndexResolvedName)?.sourceAsMap().isNullOrEmpty() == false) {
return RollupJobValidationResult.Failure("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
}
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
} else {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
rollup: Rollup,
targetIndexResolvedName: String,
hasLegacyPlugin: Boolean
): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return if (targetIndexIsValidAlias(rollup, targetIndexResolvedName)) {
prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
} else {
RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}
}
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
Expand All @@ -90,6 +75,34 @@ class RollupMapperService(
}
}

@Suppress("ReturnCount")
suspend fun targetIndexIsValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {

if (!RollupFieldValueExpressionResolver.hasAlias(targetIndexResolvedName)) {
return false
}
// All other backing indices have to have this rollup job in _META field
val backingIndices = RollupFieldValueExpressionResolver.getBackingIndicesForAlias(targetIndexResolvedName)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
when (jobExistsInRollupIndex(rollup, it.index.name)) {
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return false
}
}
}
val mappings = getMappings(targetIndexResolvedName)
if (mappings is GetMappingsResult.Failure) {
logger.error("Failed to get mappings for target index: $targetIndexResolvedName")
return false
} else if (mappings is GetMappingsResult.Success &&
mappings.response.mappings()?.get(targetIndexResolvedName)?.sourceAsMap()?.isNotEmpty() == true
) {
logger.error("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
return false
}
return true
}

// This creates the target index if it doesn't already else validate the target index is rollup index
// If the target index mappings doesn't contain rollup job attempts to update the mappings.
// TODO: error handling
Expand Down Expand Up @@ -130,8 +143,9 @@ class RollupMapperService(
val resp: AcknowledgedResponse = client.admin().indices().suspendUntil {
updateSettings(UpdateSettingsRequest(settings, targetIndexResolvedName), it)
}
return !resp.isAcknowledged
return resp.isAcknowledged
}
@Suppress("ReturnCount")
suspend fun prepareTargetIndex(rollup: Rollup, targetIndexResolvedName: String, hasLegacyPlugin: Boolean): RollupJobValidationResult {
var errorMessage = ""
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ package org.opensearch.indexmanagement.rollup.runner

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.opensearch.common.settings.Settings
import org.opensearch.indexmanagement.IndexManagementPlugin
import org.opensearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI
import org.opensearch.indexmanagement.common.model.dimension.DateHistogram
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_REPLICAS
import org.opensearch.indexmanagement.indexstatemanagement.util.INDEX_NUMBER_OF_SHARDS
import org.opensearch.indexmanagement.makeRequest
import org.opensearch.indexmanagement.rollup.RollupRestTestCase
import org.opensearch.indexmanagement.rollup.model.Rollup
Expand Down Expand Up @@ -730,6 +733,81 @@ class RollupRunnerIT : RollupRestTestCase() {
assertTrue("Did not spend time searching", rollupMetadata.stats.searchTimeInMillis > 0L)
}

fun `test rollup action with alias as target_index successfully`() {
generateNYCTaxiData("source_runner_sixth")

// Create index with alias, without mappings
val indexAlias = "alias_as_target_index"
val backingIndex = "backing_target_index"
val builtSettings = Settings.builder().let {
it.put(INDEX_NUMBER_OF_REPLICAS, "1")
it.put(INDEX_NUMBER_OF_SHARDS, "1")
it
}.build()
val aliases = "\"$indexAlias\": { \"is_write_index\": true }"
createIndex(backingIndex, builtSettings, null, aliases)

refreshAllIndices()

val rollup = Rollup(
id = "page_size_runner_sixth",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = "source_runner_sixth",
targetIndex = indexAlias,
metadataID = null,
roles = emptyList(),
pageSize = 1,
delay = 0,
continuous = false,
dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s")),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count",
targetField = "passenger_count",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
)
)
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)

waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex)) }

var startedRollup = waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
rollupJob
}

// restart job
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${startedRollup.id}?if_seq_no=${startedRollup.seqNo}&if_primary_term=${startedRollup.primaryTerm}",
emptyMap(), rollup.copy(enabled = true).toHttpEntity()
)

startedRollup = waitFor {
val rollupJob = getRollup(rollupId = rollup.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
rollupJob
}

val rollupMetadataID = startedRollup.metadataID!!
val rollupMetadata = getRollupMetadata(rollupMetadataID)

// Randomly choosing 100.. if it didn't work we'd either fail hitting the timeout in waitFor or we'd have thousands of pages processed
assertTrue("Did not have less than 100 pages processed", rollupMetadata.stats.documentsProcessed > 0)
}

// TODO: Test scenarios:
// - Source index deleted after first execution
// * If this is with a source index pattern and the underlying indices are recreated but with different data
Expand Down

0 comments on commit 4e7d173

Please sign in to comment.