Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
index mapping schema versioning (#61)
Browse files Browse the repository at this point in the history
index mapping schema versioning
  • Loading branch information
ylwu-amzn committed May 31, 2019
1 parent 7ed67a2 commit b0e2913
Show file tree
Hide file tree
Showing 16 changed files with 462 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_COUNT
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.MOVE_ALERTS_BACKOFF_MILLIS
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import org.apache.logging.log4j.LogManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
Expand Down Expand Up @@ -180,8 +181,8 @@ class MonitorRunner(

var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd)
val currentAlerts = try {
alertIndices.createAlertIndex()
alertIndices.createInitialHistoryIndex()
alertIndices.createOrUpdateAlertIndex()
alertIndices.createOrUpdateInitialHistoryIndex()
loadCurrentAlerts(monitor)
} catch (e: Exception) {
// We can't save ERROR alerts to the index here as we don't know if there are existing ACTIVE alerts
Expand Down Expand Up @@ -252,18 +253,21 @@ class MonitorRunner(
val updatedHistory = currentAlert?.errorHistory.update(alertError)
return if (alertError == null && !result.triggered) {
currentAlert?.copy(state = COMPLETED, endTime = currentTime, errorMessage = null,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else if (alertError == null && currentAlert?.isAcknowledged() == true) {
null
} else if (currentAlert != null) {
val alertState = if (alertError == null) ACTIVE else ERROR
currentAlert.copy(state = alertState, lastNotificationTime = currentTime, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
} else {
val alertState = if (alertError == null) ACTIVE else ERROR
Alert(monitor = ctx.monitor, trigger = ctx.trigger, startTime = currentTime,
lastNotificationTime = currentTime, state = alertState, errorMessage = alertError?.message,
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults)
errorHistory = updatedHistory, actionExecutionResults = updatedActionExecutionResults,
schemaVersion = IndexUtils.alertIndexSchemaVersion)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,16 @@ import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.
import com.amazon.opendistroforelasticsearch.alerting.settings.AlertingSettings.Companion.REQUEST_TIMEOUT
import org.apache.logging.log4j.LogManager
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils
import org.elasticsearch.ResourceAlreadyExistsException
import org.elasticsearch.action.admin.indices.alias.Alias
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.IndicesAdminClient
import org.elasticsearch.cluster.ClusterChangedEvent
import org.elasticsearch.cluster.ClusterStateListener
Expand Down Expand Up @@ -151,16 +154,23 @@ class AlertIndices(
return alertIndexInitialized && historyIndexInitialized
}

suspend fun createAlertIndex() {
suspend fun createOrUpdateAlertIndex() {
if (!alertIndexInitialized) {
alertIndexInitialized = createIndex(ALERT_INDEX)
if (alertIndexInitialized) IndexUtils.alertIndexUpdated()
} else {
if (!IndexUtils.alertIndexUpdated) updateIndexMapping(ALERT_INDEX)
}
alertIndexInitialized
}

suspend fun createInitialHistoryIndex() {
suspend fun createOrUpdateInitialHistoryIndex() {
if (!historyIndexInitialized) {
historyIndexInitialized = createIndex(HISTORY_INDEX_PATTERN, HISTORY_WRITE_INDEX)
if (historyIndexInitialized)
IndexUtils.lastUpdatedHistoryIndex = IndexUtils.getIndexNameWithAlias(clusterService.state(), HISTORY_WRITE_INDEX)
} else {
updateIndexMapping(HISTORY_WRITE_INDEX, true)
}
historyIndexInitialized
}
Expand All @@ -184,6 +194,36 @@ class AlertIndices(
}
}

private suspend fun updateIndexMapping(index: String, alias: Boolean = false) {
val clusterState = clusterService.state()
val mapping = alertMapping()
var targetIndex = index
if (alias) {
targetIndex = IndexUtils.getIndexNameWithAlias(clusterState, index)
}

if (targetIndex == IndexUtils.lastUpdatedHistoryIndex) {
return
}

var putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex).type(MAPPING_TYPE)
.source(mapping, XContentType.JSON)
val updateResponse: AcknowledgedResponse = client.suspendUntil { client.putMapping(putMappingRequest, it) }
if (updateResponse.isAcknowledged) {
logger.info("Index mapping of $targetIndex is updated")
setIndexUpdateFlag(index, targetIndex)
} else {
logger.info("Failed to update index mapping of $targetIndex")
}
}

private fun setIndexUpdateFlag(index: String, targetIndex: String) {
when (index) {
ALERT_INDEX -> IndexUtils.alertIndexUpdated()
HISTORY_WRITE_INDEX -> IndexUtils.lastUpdatedHistoryIndex = targetIndex
}
}

fun rolloverHistoryIndex(): Boolean {
if (!historyIndexInitialized) {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.amazon.opendistroforelasticsearch.alerting.model
import com.amazon.opendistroforelasticsearch.alerting.alerts.AlertError
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.elasticsearch.common.lucene.uid.Versions
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -29,6 +30,7 @@ import java.time.Instant
data class Alert(
val id: String = NO_ID,
val version: Long = NO_VERSION,
val schemaVersion: Int = NO_SCHEMA_VERSION,
val monitorId: String,
val monitorName: String,
val monitorVersion: Long,
Expand Down Expand Up @@ -59,11 +61,12 @@ data class Alert(
state: State = State.ACTIVE,
errorMessage: String? = null,
errorHistory: List<AlertError> = mutableListOf(),
actionExecutionResults: List<ActionExecutionResult> = mutableListOf()
actionExecutionResults: List<ActionExecutionResult> = mutableListOf(),
schemaVersion: Int = NO_SCHEMA_VERSION
) : this(monitorId = monitor.id, monitorName = monitor.name, monitorVersion = monitor.version,
triggerId = trigger.id, triggerName = trigger.name, state = state, startTime = startTime,
lastNotificationTime = lastNotificationTime, errorMessage = errorMessage, errorHistory = errorHistory,
severity = trigger.severity, actionExecutionResults = actionExecutionResults)
severity = trigger.severity, actionExecutionResults = actionExecutionResults, schemaVersion = schemaVersion)

enum class State {
ACTIVE, ACKNOWLEDGED, COMPLETED, ERROR, DELETED
Expand All @@ -74,6 +77,7 @@ data class Alert(
companion object {

const val ALERT_ID_FIELD = "id"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val ALERT_VERSION_FIELD = "version"
const val MONITOR_ID_FIELD = "monitor_id"
const val MONITOR_VERSION_FIELD = "monitor_version"
Expand All @@ -98,6 +102,7 @@ data class Alert(
fun parse(xcp: XContentParser, id: String = NO_ID, version: Long = NO_VERSION): Alert {

lateinit var monitorId: String
var schemaVersion = NO_SCHEMA_VERSION
lateinit var monitorName: String
var monitorVersion: Long = Versions.NOT_FOUND
lateinit var triggerId: String
Expand All @@ -119,6 +124,7 @@ data class Alert(

when (fieldName) {
MONITOR_ID_FIELD -> monitorId = xcp.text()
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue()
TRIGGER_ID_FIELD -> triggerId = xcp.text()
Expand All @@ -145,7 +151,7 @@ data class Alert(
}
}

return Alert(id = id, version = version, monitorId = requireNotNull(monitorId),
return Alert(id = id, version = version, schemaVersion = schemaVersion, monitorId = requireNotNull(monitorId),
monitorName = requireNotNull(monitorName), monitorVersion = monitorVersion,
triggerId = requireNotNull(triggerId), triggerName = requireNotNull(triggerName),
state = requireNotNull(state), startTime = requireNotNull(startTime), endTime = endTime,
Expand All @@ -158,6 +164,7 @@ data class Alert(
override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
return builder.startObject()
.field(MONITOR_ID_FIELD, monitorId)
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(MONITOR_VERSION_FIELD, monitorVersion)
.field(MONITOR_NAME_FIELD, monitorName)
.field(TRIGGER_ID_FIELD, triggerId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.alerting.util._ID
import com.amazon.opendistroforelasticsearch.alerting.util._VERSION
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.elasticsearch.common.CheckedFunction
import org.elasticsearch.common.ParseField
import org.elasticsearch.common.xcontent.NamedXContentRegistry
Expand All @@ -47,6 +48,7 @@ data class Monitor(
override val schedule: Schedule,
override val lastUpdateTime: Instant,
override val enabledTime: Instant?,
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<Input>,
val triggers: List<Trigger>,
val uiMetadata: Map<String, Any>
Expand Down Expand Up @@ -82,6 +84,7 @@ data class Monitor(
builder.startObject()
if (params.paramAsBoolean("with_type", false)) builder.startObject(type)
builder.field(TYPE_FIELD, type)
.field(SCHEMA_VERSION_FIELD, schemaVersion)
.field(NAME_FIELD, name)
.field(ENABLED_FIELD, enabled)
.optionalTimeField(ENABLED_TIME_FIELD, enabledTime)
Expand All @@ -99,6 +102,7 @@ data class Monitor(
companion object {
const val MONITOR_TYPE = "monitor"
const val TYPE_FIELD = "type"
const val SCHEMA_VERSION_FIELD = "schema_version"
const val NAME_FIELD = "name"
const val ENABLED_FIELD = "enabled"
const val SCHEDULE_FIELD = "schedule"
Expand Down Expand Up @@ -126,6 +130,7 @@ data class Monitor(
var enabledTime: Instant? = null
var uiMetadata: Map<String, Any> = mapOf()
var enabled = true
var schemaVersion = NO_SCHEMA_VERSION
val triggers: MutableList<Trigger> = mutableListOf()
val inputs: MutableList<Input> = mutableListOf()

Expand All @@ -135,6 +140,7 @@ data class Monitor(
xcp.nextToken()

when (fieldName) {
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
NAME_FIELD -> name = xcp.text()
ENABLED_FIELD -> enabled = xcp.booleanValue()
SCHEDULE_FIELD -> schedule = Schedule.parse(xcp)
Expand Down Expand Up @@ -171,6 +177,7 @@ data class Monitor(
requireNotNull(schedule) { "Monitor schedule is null" },
lastUpdateTime ?: Instant.now(),
enabledTime,
schemaVersion,
inputs.toList(),
triggers.toList(),
uiMetadata)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import com.amazon.opendistroforelasticsearch.alerting.elasticapi.convertToMap
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.instant
import com.amazon.opendistroforelasticsearch.alerting.elasticapi.optionalTimeField
import com.amazon.opendistroforelasticsearch.alerting.util.DestinationType
import com.amazon.opendistroforelasticsearch.alerting.util.IndexUtils.Companion.NO_SCHEMA_VERSION
import org.apache.logging.log4j.LogManager
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.XContentBuilder
Expand All @@ -40,6 +41,7 @@ import java.util.Locale
data class Destination(
val id: String = NO_ID,
val version: Long = NO_VERSION,
val schemaVersion: Int = NO_SCHEMA_VERSION,
val type: DestinationType,
val name: String,
val lastUpdateTime: Instant,
Expand All @@ -53,6 +55,7 @@ data class Destination(
if (params.paramAsBoolean("with_type", false)) builder.startObject(DESTINATION)
builder.field(TYPE_FIELD, type.value)
.field(NAME_FIELD, name)
.field(SCHEMA_VERSION, schemaVersion)
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
.field(type.value, constructResponseForDestinationType(type))
if (params.paramAsBoolean("with_type", false)) builder.endObject()
Expand All @@ -69,6 +72,7 @@ data class Destination(
const val NAME_FIELD = "name"
const val NO_ID = ""
const val NO_VERSION = 1L
const val SCHEMA_VERSION = "schema_version"
const val LAST_UPDATE_TIME_FIELD = "last_update_time"
const val CHIME = "chime"
const val SLACK = "slack"
Expand All @@ -88,6 +92,7 @@ data class Destination(
var chime: Chime? = null
var customWebhook: CustomWebhook? = null
var lastUpdateTime: Instant? = null
var schemaVersion = NO_SCHEMA_VERSION

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -116,13 +121,17 @@ data class Destination(
TEST_ACTION -> {
// This condition is for integ tests to avoid parsing
}
SCHEMA_VERSION -> {
schemaVersion = xcp.intValue()
}
else -> {
xcp.skipChildren()
}
}
}
return Destination(id,
version,
schemaVersion,
DestinationType.valueOf(type.toUpperCase(Locale.ROOT)),
requireNotNull(name) { "Destination name is null" },
lastUpdateTime ?: Instant.now(),
Expand Down

0 comments on commit b0e2913

Please sign in to comment.