Skip to content

Commit

Permalink
Adds cluster setting to configure index state management jitter (#153)
Browse files Browse the repository at this point in the history
* Adds jitter cluster setting, sets jitter to 0 for ISM tests

Signed-off-by: Clay Downs <downsrob@amazon.com>
  • Loading branch information
downsrob committed Oct 29, 2021
1 parent b30d3c3 commit ab12279
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JITTER
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.METADATA_SERVICE_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD
Expand Down Expand Up @@ -145,6 +146,7 @@ class ManagedIndexCoordinator(
@Volatile private var retryPolicy =
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)
@Volatile private var jobJitter = JITTER.get(settings)

@Volatile private var isMaster = false

Expand All @@ -158,6 +160,9 @@ class ManagedIndexCoordinator(
clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_INTERVAL) {
jobInterval = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(JITTER) {
jobJitter = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_STATE_MANAGEMENT_ENABLED) {
indexStateManagementEnabled = it
if (!indexStateManagementEnabled) disable() else enable()
Expand Down Expand Up @@ -328,7 +333,8 @@ class ManagedIndexCoordinator(
indexUuid,
policy.id,
jobInterval,
policy
policy,
jobJitter
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ data class ManagedIndexConfig(
val policySeqNo: Long?,
val policyPrimaryTerm: Long?,
val policy: Policy?,
val changePolicy: ChangePolicy?
val changePolicy: ChangePolicy?,
val jobJitter: Double?
) : ScheduledJobParameter {

init {
Expand All @@ -79,6 +80,10 @@ data class ManagedIndexConfig(

override fun getLockDurationSeconds(): Long = 3600L // 1 hour

override fun getJitter(): Double? {
return jobJitter
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
.startObject()
Expand All @@ -95,6 +100,7 @@ data class ManagedIndexConfig(
.field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm)
.field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE)
.field(CHANGE_POLICY_FIELD, changePolicy)
.field(JITTER, jobJitter)
builder.endObject()
return builder.endObject()
}
Expand All @@ -114,6 +120,7 @@ data class ManagedIndexConfig(
const val POLICY_SEQ_NO_FIELD = "policy_seq_no"
const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
const val CHANGE_POLICY_FIELD = "change_policy"
const val JITTER = "jitter"

@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
Expand All @@ -137,6 +144,7 @@ data class ManagedIndexConfig(
var enabled = true
var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO
var jitter: Double? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down Expand Up @@ -164,6 +172,9 @@ data class ManagedIndexConfig(
CHANGE_POLICY_FIELD -> {
changePolicy = if (xcp.currentToken() == Token.VALUE_NULL) null else ChangePolicy.parse(xcp)
}
JITTER -> {
jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue()
}
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.")
}
}
Expand Down Expand Up @@ -192,7 +203,8 @@ data class ManagedIndexConfig(
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
changePolicy = changePolicy
changePolicy = changePolicy,
jobJitter = jitter
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ManagedIndexSettings {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_METADATA_SERVICE_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
const val DEFAULT_JITTER = 0.6
private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type }
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()
Expand Down Expand Up @@ -179,5 +180,14 @@ class ManagedIndexSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val JITTER: Setting<Double> = Setting.doubleSetting(
"plugins.index_state_management.jitter",
DEFAULT_JITTER,
0.0,
1.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,16 @@ class TransportAddPolicyAction @Inject constructor(
) {

@Volatile private var jobInterval = ManagedIndexSettings.JOB_INTERVAL.get(settings)
@Volatile private var jobJitter = ManagedIndexSettings.JITTER.get(settings)
@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JOB_INTERVAL) {
jobInterval = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JITTER) {
jobJitter = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
filterByEnabled = it
}
Expand Down Expand Up @@ -329,7 +333,7 @@ class TransportAddPolicyAction @Inject constructor(
val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout))
indicesToAdd.forEach { (uuid, name) ->
bulkReq.add(
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user))
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ import java.net.InetAddress
import java.time.Instant
import java.time.temporal.ChronoUnit

@Suppress("LongParameterList")
fun managedIndexConfigIndexRequest(
index: String,
uuid: String,
policyID: String,
jobInterval: Int,
policy: Policy? = null
policy: Policy? = null,
jobJitter: Double?
): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
jobName = index,
Expand All @@ -95,7 +97,8 @@ fun managedIndexConfigIndexRequest(
policy = policy,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
changePolicy = null
changePolicy = null,
jobJitter = jobJitter
)

return IndexRequest(INDEX_MANAGEMENT_INDEX)
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,9 @@
}
}
}
},
"jitter": {
"type": "double"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
ManagedIndexSettings.SNAPSHOT_DENY_LIST,
ManagedIndexSettings.JITTER,
RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Before
import org.opensearch.OpenSearchParseException
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
Expand All @@ -54,6 +55,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
Expand All @@ -73,6 +75,11 @@ import java.time.Duration
import java.time.Instant

abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
@Before
fun disableIndexStateManagementJitter() {
// jitter would add a test-breaking delay to the integration tests
updateIndexStateManagementJitterSetting(0.0)
}

protected val isMixedNodeRegressionTest = System.getProperty("cluster.mixed", "false")!!.toBoolean()

Expand Down Expand Up @@ -357,4 +364,8 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexStateManagementJitterSetting(value: Double?) {
updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.junit.Before
import org.opensearch.OpenSearchParseException
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -95,6 +96,12 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
val explainResponseOpendistroPolicyIdSetting = "index.opendistro.index_state_management.policy_id"
val explainResponseOpenSearchPolicyIdSetting = "index.plugins.index_state_management.policy_id"

@Before
protected fun disableIndexStateManagementJitter() {
// jitter would add a test-breaking delay to the integration tests
updateIndexStateManagementJitterSetting(0.0)
}

protected fun createPolicy(
policy: Policy,
policyId: String = OpenSearchTestCase.randomAlphaOfLength(10),
Expand Down Expand Up @@ -268,6 +275,10 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexStateManagementJitterSetting(value: Double) {
updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false)
}

protected fun updateIndexSetting(
index: String,
key: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
fun cleanClusterSetting() {
// need to clean up otherwise will throw error
updateClusterSetting(ManagedIndexSettings.METADATA_SERVICE_ENABLED.key, null, false)
updateIndexStateManagementJitterSetting(null)
}

@AwaitsFix(bugUrl = "https://github.com/opensearch-project/index-management/issues/176")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ fun randomManagedIndexConfig(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
policy: Policy? = randomPolicy(),
changePolicy: ChangePolicy? = randomChangePolicy()
changePolicy: ChangePolicy? = randomChangePolicy(),
jitter: Double? = 0.0
): ManagedIndexConfig {
return ManagedIndexConfig(
jobName = name,
Expand All @@ -289,7 +290,8 @@ fun randomManagedIndexConfig(
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
changePolicy = changePolicy
changePolicy = changePolicy,
jobJitter = jitter
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() {
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
settingSet.add(ManagedIndexSettings.SWEEP_PERIOD)
settingSet.add(ManagedIndexSettings.JITTER)
settingSet.add(ManagedIndexSettings.JOB_INTERVAL)
settingSet.add(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED)
settingSet.add(ManagedIndexSettings.METADATA_SERVICE_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,20 +107,41 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() {
val managedIndexConfig = getExistingManagedIndexConfig(indexName)

assertEquals(
"Created managed index did not default to ${ManagedIndexSettings.DEFAULT_JOB_INTERVAL}minutes",
"Created managed index did not default to ${ManagedIndexSettings.DEFAULT_JOB_INTERVAL} minutes",
ManagedIndexSettings.DEFAULT_JOB_INTERVAL, (managedIndexConfig.jobSchedule as IntervalSchedule).interval
)

// init policy
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID) }
waitFor {
assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID)
val currInterval = (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval
assertEquals("Managed index was not created with default job interval", ManagedIndexSettings.DEFAULT_JOB_INTERVAL, currInterval)
}

// change cluster job interval setting to 2 (minutes)
updateClusterSetting(ManagedIndexSettings.JOB_INTERVAL.key, "2")
val newJobInterval = 2
updateClusterSetting(ManagedIndexSettings.JOB_INTERVAL.key, newJobInterval.toString())

// fast forward to next execution where at the end we should change the job interval time
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { (getManagedIndexConfigByDocId(managedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval == 2 }
// Create a new index and policy to check if they have the updated interval
val newIndexName = indexName + "new"
val newCreatedPolicy = createRandomPolicy()
createIndex(newIndexName, newCreatedPolicy.id)

val newManagedIndexConfig = getExistingManagedIndexConfig(newIndexName)

assertEquals(
"New managed index did not have updated job schedule interval",
newJobInterval, (newManagedIndexConfig.jobSchedule as IntervalSchedule).interval
)

// init new policy
updateManagedIndexConfigStartTime(newManagedIndexConfig)
waitFor {
assertEquals(newCreatedPolicy.id, getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.policyID)
val currInterval = (getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.jobSchedule as? IntervalSchedule)?.interval
assertEquals("Failed to update ManagedIndexConfig interval", newJobInterval, currInterval)
}
}

fun `test allow list fails execution`() {
Expand Down Expand Up @@ -171,4 +192,42 @@ class ManagedIndexRunnerIT : IndexStateManagementRestTestCase() {
updateManagedIndexConfigStartTime(managedIndexConfig)
waitFor { assertEquals("Attempted to execute action=read_only which is not allowed.", getExplainManagedIndexMetaData(indexName).info?.get("message")) }
}

fun `test jitter changing`() {
val indexName = "jitter_index_"

val createdPolicy = createRandomPolicy()
createIndex(indexName, createdPolicy.id)

val managedIndexConfig = getExistingManagedIndexConfig(indexName)
assertEquals(
"Created managed index did not default to 0.0", 0.0, managedIndexConfig.jitter
)

waitFor {
assertEquals(createdPolicy.id, getManagedIndexConfigByDocId(managedIndexConfig.id)?.policyID)
val currJitter = getManagedIndexConfigByDocId(managedIndexConfig.id)?.jitter
assertEquals("Managed index was not created with 0.0 jitter", 0.0, currJitter)
}

// change jitter to 0.5
val newJitter = 0.5
updateIndexStateManagementJitterSetting(newJitter)

// Create a new index and policy to check if they have the updated jitter
val newIndexName = indexName + "new"
val newCreatedPolicy = createRandomPolicy()
createIndex(newIndexName, newCreatedPolicy.id)

val newManagedIndexConfig = getExistingManagedIndexConfig(newIndexName)
assertEquals(
"New managed index did not have updated jitter", newJitter, newManagedIndexConfig.jitter
)

waitFor {
assertEquals(newCreatedPolicy.id, getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.policyID)
val currJitter = getManagedIndexConfigByDocId(newManagedIndexConfig.id)?.jitter
assertEquals("Failed to update ManagedIndexConfig jitter", newJitter, currJitter)
}
}
}
Loading

0 comments on commit ab12279

Please sign in to comment.