Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

alias in rollup target_index field #445

Merged
merged 20 commits into from
Nov 3, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ import org.opensearch.indexmanagement.rollup.model.RollupJobValidationResult
import org.opensearch.indexmanagement.rollup.settings.LegacyOpenDistroRollupSettings
import org.opensearch.indexmanagement.rollup.settings.RollupSettings
import org.opensearch.indexmanagement.rollup.util.RollupFieldValueExpressionResolver
import org.opensearch.indexmanagement.rollup.util.getRollupJobs
import org.opensearch.indexmanagement.rollup.util.isRollupIndex
import org.opensearch.indexmanagement.rollup.util.isTargetIndexAlias
import org.opensearch.indexmanagement.util.IndexUtils.Companion._META
import org.opensearch.indexmanagement.util.IndexUtils.Companion.getFieldFromMappings
import org.opensearch.transport.RemoteTransportException
Expand All @@ -61,12 +63,20 @@ class RollupMapperService(
targetIndexResolvedName: String,
hasLegacyPlugin: Boolean
): RollupJobValidationResult {
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return if (targetIndexIsValidAlias(rollup, targetIndexResolvedName)) {
prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
} else {
RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
/**
* Target Index is valid alias if either all backing indices have this job in _meta
* or there isn't any rollup job present in _meta
*/
var isValidAlias = isTargetIndexValidAlias(rollup, targetIndexResolvedName)
petardz marked this conversation as resolved.
Show resolved Hide resolved
if (rollup.isTargetIndexAlias() && isValidAlias) {
// During first run on alias, backing index is not prepared. We should set it up here. (settings, mappings, _meta)
if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return prepareTargetIndex(rollup, targetIndexResolvedName, hasLegacyPlugin)
}
} else if (rollup.isTargetIndexAlias() && !isValidAlias) {
return RollupJobValidationResult.Failure("Target index alias [${rollup.targetIndex}] validation failed")
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
} else if (!isRollupIndex(targetIndexResolvedName, clusterService.state())) {
return RollupJobValidationResult.Invalid("Target index [$targetIndexResolvedName] is a non rollup index")
}
return when (val jobExistsResult = jobExistsInRollupIndex(rollup, targetIndexResolvedName)) {
is RollupJobValidationResult.Valid -> jobExistsResult
Expand All @@ -76,31 +86,31 @@ class RollupMapperService(
}

@Suppress("ReturnCount")
suspend fun targetIndexIsValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {
suspend fun isTargetIndexValidAlias(rollup: Rollup, targetIndexResolvedName: String): Boolean {

if (!RollupFieldValueExpressionResolver.indexAliasUtils.hasAlias(targetIndexResolvedName)) {
return false
}
// All other backing indices have to have this rollup job in _META field
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(targetIndexResolvedName)

val rollupJobs = clusterService.state().metadata.index(targetIndexResolvedName).getRollupJobs()
if (rollupJobs != null &&
(rollupJobs.size > 1 || rollupJobs[0].id != rollup.id)
) {
logger.error("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
petardz marked this conversation as resolved.
Show resolved Hide resolved
return false
}

// All other backing indices have to have this rollup job in _META field and it has to be the only one!
val backingIndices = RollupFieldValueExpressionResolver.indexAliasUtils.getBackingIndicesForAlias(rollup.targetIndex)
backingIndices?.forEach {
if (it.index.name != targetIndexResolvedName) {
khushbr marked this conversation as resolved.
Show resolved Hide resolved
when (jobExistsInRollupIndex(rollup, it.index.name)) {
is RollupJobValidationResult.Invalid, is RollupJobValidationResult.Failure -> return false
else -> {}
val rollupJobs = clusterService.state().metadata.index(it.index.name).getRollupJobs()
khushbr marked this conversation as resolved.
Show resolved Hide resolved
if (rollupJobs == null || rollupJobs.size > 1 || rollupJobs[0].id != rollup.id) {
logger.error("If target_index is alias, all backing indicies must have only this rollup job present in _meta!")
return false
}
}
}
val mappings = getMappings(targetIndexResolvedName)
if (mappings is GetMappingsResult.Failure) {
logger.error("Failed to get mappings for target index: $targetIndexResolvedName")
return false
} else if (mappings is GetMappingsResult.Success &&
mappings.response.mappings()?.get(targetIndexResolvedName)?.sourceAsMap()?.isNotEmpty() == true
) {
logger.error("If target_index is alias, backing index must be empty: $targetIndexResolvedName")
return false
}
return true
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ fun isRollupIndex(index: String, clusterState: ClusterState): Boolean {
return false
}

fun Rollup.isTargetIndexAlias(): Boolean {
return RollupFieldValueExpressionResolver.indexAliasUtils.isAlias(targetIndex)
}

fun Rollup.getRollupSearchRequest(metadata: RollupMetadata): SearchRequest {
val query = if (metadata.continuous != null) {
RangeQueryBuilder(this.getDateHistogram().sourceField)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,7 +890,7 @@ class RollupRunnerIT : RollupRestTestCase() {
}

fun `test rollup action with alias as target_index with multiple backing indices successfully`() {
generateNYCTaxiData("source_runner_sixth_2")
generateNYCTaxiData("source_runner_sixth_29932")

// Create index with alias, without mappings
val indexAlias = "alias_as_target_index_2"
Expand All @@ -914,7 +914,7 @@ class RollupRunnerIT : RollupRestTestCase() {
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = "source_runner_sixth_2",
sourceIndex = "source_runner_sixth_29932",
targetIndex = indexAlias,
metadataID = null,
roles = emptyList(),
Expand Down Expand Up @@ -980,6 +980,205 @@ class RollupRunnerIT : RollupRestTestCase() {
assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)
}

/**
* Index with alias is created and job1 successfully ran first time.
* Then Job2 ran on first backing index once and made this alias invalid for further use by any rollup job
*/
fun `test rollup action with alias as target_index with multiple backing indices failed`() {
generateNYCTaxiData("source_runner_sixth_2123")

// Create index with alias, without mappings
val indexAlias = "alias_as_target_index_failed"
val backingIndex1 = "backing_target_index1_f-000001"
val backingIndex2 = "backing_target_index1_f-000002"
val builtSettings = Settings.builder().let {
it.put(INDEX_NUMBER_OF_REPLICAS, "1")
it.put(INDEX_NUMBER_OF_SHARDS, "1")
it
}.build()
val aliases = "\"$indexAlias\": { \"is_write_index\": true }"
createIndex(backingIndex1, builtSettings, null, aliases)

refreshAllIndices()

val job1 = Rollup(
id = "rollup_with1_alias_1",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = "source_runner_sixth_2123",
targetIndex = indexAlias,
metadataID = null,
roles = emptyList(),
pageSize = 1000,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count",
targetField = "passenger_count",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
)
)
).let { createRollup(it, it.id) }

// First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META
updateRollupStartTime(job1)

waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) }

var startedRollup1 = waitFor {
val rollupJob = getRollup(rollupId = job1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
assertTrue("Rollup is not disabled", !rollupJob.enabled)
rollupJob
}
var rollupMetadataID = startedRollup1.metadataID!!
var rollupMetadata = getRollupMetadata(rollupMetadataID)
assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)

// Run job #2 on same target_index
val job2 = job1.copy(id = "some_other_job_999", targetIndex = backingIndex1)
.let { createRollup(it, it.id) }

// Job2 First run, it should add itself to _meta in the same index job1 did.
updateRollupStartTime(job2)

var startedRollup2 = waitFor {
val rollupJob = getRollup(rollupId = job2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
assertTrue("Rollup is not disabled", !rollupJob.enabled)
rollupJob
}
rollupMetadataID = startedRollup2.metadataID!!
rollupMetadata = getRollupMetadata(rollupMetadataID)
assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)

// do rollover on alias
val rolloverResponse = client().makeRequest("POST", "/$indexAlias/_rollover")
assertEquals(RestStatus.OK, rolloverResponse.restStatus())
waitFor { assertTrue("index was not created after rollover", indexExists(backingIndex2)) }

refreshAllIndices()

// restart job #1
client().makeRequest(
"PUT",
"$ROLLUP_JOBS_BASE_URI/${startedRollup1.id}?if_seq_no=${startedRollup1.seqNo}&if_primary_term=${startedRollup1.primaryTerm}",
emptyMap(), job1.copy(enabled = true).toHttpEntity()
)
// Second run, backing index is setup just like any other rollup index
updateRollupStartTime(job1)

startedRollup1 = waitFor {
val rollupJob = getRollup(rollupId = job1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status)
rollupJob
}

rollupMetadataID = startedRollup1.metadataID!!
rollupMetadata = getRollupMetadata(rollupMetadataID)

assertEquals("Target index alias [${startedRollup1.targetIndex}] validation failed", rollupMetadata.failureReason)
}

fun `test rollup action with alias as target_index reuse failed`() {
generateNYCTaxiData("source_runner_sixth_2209")

// Create index with alias, without mappings
val indexAlias = "alias_as_target_index_failed_1"
val backingIndex1 = "backing-000001"
val backingIndex2 = "backing-000002"
val builtSettings = Settings.builder().let {
it.put(INDEX_NUMBER_OF_REPLICAS, "1")
it.put(INDEX_NUMBER_OF_SHARDS, "1")
it
}.build()
val aliases = "\"$indexAlias\": { \"is_write_index\": true }"
createIndex(backingIndex1, builtSettings, null, aliases)

refreshAllIndices()

val job1 = Rollup(
id = "rollup_with_alias_11",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic change of page size",
sourceIndex = "source_runner_sixth_2209",
targetIndex = indexAlias,
metadataID = null,
roles = emptyList(),
pageSize = 1000,
delay = 0,
continuous = false,
dimensions = listOf(
DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1s"),
Terms("RatecodeID", "RatecodeID"),
Terms("PULocationID", "PULocationID")
),
metrics = listOf(
RollupMetrics(
sourceField = "passenger_count",
targetField = "passenger_count",
metrics = listOf(Sum(), Min(), Max(), ValueCount(), Average())
)
)
).let { createRollup(it, it.id) }

// First run, backing index is empty: no mappings, no rollup_index setting, no rollupjobs in _META
updateRollupStartTime(job1)

waitFor { assertTrue("Target rollup index was not created", indexExists(backingIndex1)) }

var startedRollup1 = waitFor {
val rollupJob = getRollup(rollupId = job1.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FINISHED, rollupMetadata.status)
assertTrue("Rollup is not disabled", !rollupJob.enabled)
rollupJob
}
var rollupMetadataID = startedRollup1.metadataID!!
var rollupMetadata = getRollupMetadata(rollupMetadataID)
assertTrue("Did not process any doc during rollup", rollupMetadata.stats.documentsProcessed > 0)

// Run job #2 on same target_index alias
val job2 = job1.copy(id = "some_other_job_9991", targetIndex = indexAlias)
.let { createRollup(it, it.id) }

// Job2 First run, it should fail because job1 already wrote to backing index
updateRollupStartTime(job2)

var startedRollup2 = waitFor {
val rollupJob = getRollup(rollupId = job2.id)
assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID)
val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!)
assertEquals("Rollup is not finished", RollupMetadata.Status.FAILED, rollupMetadata.status)
assertTrue("Rollup is not disabled", !rollupJob.enabled)
rollupJob
}
rollupMetadataID = startedRollup2.metadataID!!
rollupMetadata = getRollupMetadata(rollupMetadataID)
assertEquals("Target index alias [${startedRollup1.targetIndex}] validation failed", rollupMetadata.failureReason)
}

// TODO: Test scenarios:
// - Source index deleted after first execution
// * If this is with a source index pattern and the underlying indices are recreated but with different data
Expand Down