Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Introducing new ISM Action to create rollups #371

Merged
merged 15 commits into from
Jan 7, 2021
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.ActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.RollupActionConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup.AttemptCreateRollupJobStep
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup.WaitForRollupCompletionStep
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ISMRollup
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class RollupAction(
clusterService: ClusterService,
client: Client,
ismRollup: ISMRollup,
thalurur marked this conversation as resolved.
Show resolved Hide resolved
managedIndexMetaData: ManagedIndexMetaData,
config: RollupActionConfig
) : Action(ActionConfig.ActionType.ROLLUP, config, managedIndexMetaData) {

private val attemptCreateRollupJobStep = AttemptCreateRollupJobStep(clusterService, client, ismRollup, managedIndexMetaData)
private val waitForRollupCompletionStep = WaitForRollupCompletionStep(clusterService, client, managedIndexMetaData)

override fun getSteps(): List<Step> = listOf(attemptCreateRollupJobStep, waitForRollupCompletionStep)

@Suppress("ReturnCount")
override fun getStepToExecute(): Step {
// If stepMetaData is null, return the first step
val stepMetaData = managedIndexMetaData.stepMetaData ?: return attemptCreateRollupJobStep

// If the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptCreateRollupJobStep.name -> waitForRollupCompletionStep
else -> attemptCreateRollupJobStep
}
}

return when (stepMetaData.name) {
AttemptCreateRollupJobStep.name -> attemptCreateRollupJobStep
else -> waitForRollupCompletionStep
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ abstract class ActionConfig(
NOTIFICATION("notification"),
SNAPSHOT("snapshot"),
INDEX_PRIORITY("index_priority"),
ALLOCATION("allocation");
ALLOCATION("allocation"),
ROLLUP("rollup");

override fun toString(): String {
return type
Expand Down Expand Up @@ -105,6 +106,7 @@ abstract class ActionConfig(
ActionType.SNAPSHOT.type -> SnapshotActionConfig(sin)
ActionType.INDEX_PRIORITY.type -> IndexPriorityActionConfig(sin)
ActionType.ALLOCATION.type -> AllocationActionConfig(sin)
ActionType.ROLLUP.type -> RollupActionConfig(sin)
else -> throw IllegalArgumentException("Invalid field: [${type.type}] found in Action.")
}

Expand Down Expand Up @@ -142,6 +144,7 @@ abstract class ActionConfig(
ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index)
ActionType.INDEX_PRIORITY.type -> actionConfig = IndexPriorityActionConfig.parse(xcp, index)
ActionType.ALLOCATION.type -> actionConfig = AllocationActionConfig.parse(xcp, index)
ActionType.ROLLUP.type -> actionConfig = RollupActionConfig.parse(xcp, index)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.action.RollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ISMRollup
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.io.stream.StreamInput
import org.elasticsearch.common.io.stream.StreamOutput
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.script.ScriptService
import java.io.IOException

class RollupActionConfig(
val ismRollup: ISMRollup,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.ROLLUP, index) {

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params)
.startObject(ActionType.ROLLUP.type)
.field(ISM_ROLLUP_FIELD, ismRollup)
.endObject()
.endObject()
return builder
}

override fun toAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData
): Action = RollupAction(clusterService, client, ismRollup, managedIndexMetaData, this)

override fun isFragment(): Boolean = super<ToXContentObject>.isFragment()

@Throws(IOException::class)
constructor(sin: StreamInput) : this(ismRollup = ISMRollup(sin), index = sin.readInt())

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
ismRollup.writeTo(out)
out.writeInt(actionIndex)
}

companion object {
const val ISM_ROLLUP_FIELD = "ism_rollup"
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
var ismRollup: ISMRollup? = null

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, actionIndex: Int): RollupActionConfig {
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
ISM_ROLLUP_FIELD -> ismRollup = ISMRollup.parse(xcp)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in RollupActionConfig.")
}
}

return RollupActionConfig(
ismRollup = requireNotNull(ismRollup) { "RollupActionConfig rollup is null" },
index = actionIndex)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,23 @@ import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken
/** Properties that will persist across steps of a single Action. Will be stored in the [ActionMetaData]. */
data class ActionProperties(
val maxNumSegments: Int? = null,
val snapshotName: String? = null
val snapshotName: String? = null,
val rollupId: String? = null,
val hasRollupFailed: Boolean? = null
thalurur marked this conversation as resolved.
Show resolved Hide resolved
) : Writeable, ToXContentFragment {

override fun writeTo(out: StreamOutput) {
out.writeOptionalInt(maxNumSegments)
out.writeOptionalString(snapshotName)
out.writeOptionalString(rollupId)
out.writeOptionalBoolean(hasRollupFailed)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
if (maxNumSegments != null) builder.field(Properties.MAX_NUM_SEGMENTS.key, maxNumSegments)
if (snapshotName != null) builder.field(Properties.SNAPSHOT_NAME.key, snapshotName)
if (rollupId != null) builder.field(Properties.ROLLUP_ID.key, rollupId)
if (hasRollupFailed != null) builder.field(Properties.HAS_ROLLUP_FAILED.key, hasRollupFailed)
return builder
}

Expand All @@ -48,13 +54,17 @@ data class ActionProperties(
fun fromStreamInput(si: StreamInput): ActionProperties {
val maxNumSegments: Int? = si.readOptionalInt()
val snapshotName: String? = si.readOptionalString()
val rollupId: String? = si.readOptionalString()
val hasRollupFailed: Boolean? = si.readOptionalBoolean()

return ActionProperties(maxNumSegments, snapshotName)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
}

fun parse(xcp: XContentParser): ActionProperties {
var maxNumSegments: Int? = null
var snapshotName: String? = null
var rollupId: String? = null
var hasRollupFailed: Boolean? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand All @@ -64,12 +74,19 @@ data class ActionProperties(
when (fieldName) {
Properties.MAX_NUM_SEGMENTS.key -> maxNumSegments = xcp.intValue()
Properties.SNAPSHOT_NAME.key -> snapshotName = xcp.text()
Properties.ROLLUP_ID.key -> rollupId = xcp.text()
Properties.HAS_ROLLUP_FAILED.key -> hasRollupFailed = xcp.booleanValue()
}
}

return ActionProperties(maxNumSegments, snapshotName)
return ActionProperties(maxNumSegments, snapshotName, rollupId, hasRollupFailed)
}
}

enum class Properties(val key: String) { MAX_NUM_SEGMENTS("max_num_segments"), SNAPSHOT_NAME("snapshot_name") }
enum class Properties(val key: String) {
MAX_NUM_SEGMENTS("max_num_segments"),
SNAPSHOT_NAME("snapshot_name"),
ROLLUP_ID("rollup_id"),
HAS_ROLLUP_FAILED("has_rollup_failed")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.rollup

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.ActionProperties
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.index.IndexRollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.index.IndexRollupRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.start.StartRollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.start.StartRollupRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.ISMRollup
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.support.WriteRequest
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.index.engine.VersionConflictEngineException
import java.lang.Exception

class AttemptCreateRollupJobStep(
val clusterService: ClusterService,
val client: Client,
val ismRollup: ISMRollup,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null
private var rollupId: String? = null
private var previousRunRollupId: String? = null
private var hasPreviousRollupAttemptFailed: Boolean? = null

override fun isIdempotent() = true

override suspend fun execute(): Step {
previousRunRollupId = managedIndexMetaData.actionMetaData?.actionProperties?.rollupId
hasPreviousRollupAttemptFailed = managedIndexMetaData.actionMetaData?.actionProperties?.hasRollupFailed

// Creating a rollup job
val rollup = ismRollup.toRollup(indexName)
rollupId = rollup.id
logger.info("Attempting to create a rollup job $rollupId for index $indexName")

val indexRollupRequest = IndexRollupRequest(rollup, WriteRequest.RefreshPolicy.IMMEDIATE)

try {
val response = withContext(Dispatchers.IO) { client.execute(IndexRollupAction.INSTANCE, indexRollupRequest).actionGet() }
thalurur marked this conversation as resolved.
Show resolved Hide resolved
logger.info("Received status ${response.status.status} on trying to create rollup job $rollupId")

stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(rollupId!!, indexName))
} catch (e: VersionConflictEngineException) {
val message = getFailedJobExistsMessage(rollupId!!, indexName)
logger.info(message)
if (rollupId == previousRunRollupId && hasPreviousRollupAttemptFailed != null && hasPreviousRollupAttemptFailed!!) {
thalurur marked this conversation as resolved.
Show resolved Hide resolved
withContext(Dispatchers.IO) {
startRollupJob()
}
} else {
stepStatus = StepStatus.COMPLETED
info = mapOf("info" to message)
}
} catch (e: Exception) {
dbbaughe marked this conversation as resolved.
Show resolved Hide resolved
val message = getFailedMessage(rollupId!!, indexName)
logger.error(message, e)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "cause" to "${e.message}")
}

return this
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
val currentActionMetaData = currentMetaData.actionMetaData
return currentMetaData.copy(
actionMetaData = currentActionMetaData?.copy(actionProperties = ActionProperties(rollupId = rollupId)),
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

private fun startRollupJob() {
logger.info("Attempting to re-start the job $rollupId")
try {
val startRollupRequest = StartRollupRequest(rollupId!!)
client.execute(StartRollupAction.INSTANCE, startRollupRequest).actionGet()
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessRestartMessage(rollupId!!))
} catch (e: Exception) {
val message = getFailedToStartMessage(rollupId!!)
logger.error(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
thalurur marked this conversation as resolved.
Show resolved Hide resolved
info = mapOf("message" to message)
}
}

companion object {
thalurur marked this conversation as resolved.
Show resolved Hide resolved
const val name = "attempt_create_rollup"
fun getFailedMessage(rollupId: String, index: String) = "Failed to create the rollup job [$rollupId] for index [$index]"
fun getFailedJobExistsMessage(rollupId: String, index: String) = "Rollup job [$rollupId] already exists for index [$index]"
thalurur marked this conversation as resolved.
Show resolved Hide resolved
fun getFailedToStartMessage(rollupId: String) = "Failed to start the rollup job [$rollupId]"
fun getSuccessMessage(rollupId: String, index: String) = "Successfully created the rollup job [$rollupId] for index [$index]"
fun getSuccessRestartMessage(rollupId: String) = "Successfully restarted the rollup job [$rollupId]"
}
}
Loading