Skip to content

Commit

Permalink
fix for MapperException[the [enabled] parameter can't be updated for …
Browse files Browse the repository at this point in the history
…the object mapping [metadata.source_to_query_index_mapping] (#1432)
  • Loading branch information
sbcd90 committed Feb 16, 2024
1 parent 719db46 commit 5b3e1a2
Showing 1 changed file with 42 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchSecurityException
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.DocWriteResponse
import org.opensearch.action.admin.indices.get.GetIndexRequest
Expand Down Expand Up @@ -78,35 +79,51 @@ object MonitorMetadataService :
@Suppress("ComplexMethod", "ReturnCount")
suspend fun upsertMetadata(metadata: MonitorMetadata, updating: Boolean): MonitorMetadata {
try {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(metadata.toXContent(XContentFactory.jsonBuilder(), ToXContent.MapParams(mapOf("with_type" to "true"))))
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)
if (clusterService.state().routingTable.hasIndex(ScheduledJob.SCHEDULED_JOBS_INDEX)) {
val indexRequest = IndexRequest(ScheduledJob.SCHEDULED_JOBS_INDEX)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(
metadata.toXContent(
XContentFactory.jsonBuilder(),
ToXContent.MapParams(mapOf("with_type" to "true"))
)
)
.id(metadata.id)
.routing(metadata.monitorId)
.setIfSeqNo(metadata.seqNo)
.setIfPrimaryTerm(metadata.primaryTerm)
.timeout(indexTimeout)

if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
}
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason = "The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(failureReason, RestStatus.INTERNAL_SERVER_ERROR, IllegalStateException(failureReason))
if (updating) {
indexRequest.id(metadata.id).setIfSeqNo(metadata.seqNo).setIfPrimaryTerm(metadata.primaryTerm)
} else {
indexRequest.opType(DocWriteRequest.OpType.CREATE)
}
DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
val response: IndexResponse = client.suspendUntil { index(indexRequest, it) }
when (response.result) {
DocWriteResponse.Result.DELETED, DocWriteResponse.Result.NOOP, DocWriteResponse.Result.NOT_FOUND, null -> {
val failureReason =
"The upsert metadata call failed with a ${response.result?.lowercase} result"
log.error(failureReason)
throw AlertingException(
failureReason,
RestStatus.INTERNAL_SERVER_ERROR,
IllegalStateException(failureReason)
)
}

DocWriteResponse.Result.CREATED, DocWriteResponse.Result.UPDATED -> {
log.debug("Successfully upserted MonitorMetadata:${metadata.id} ")
}
}
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
)
} else {
val failureReason = "Job index ${ScheduledJob.SCHEDULED_JOBS_INDEX} does not exist to update monitor metadata"
throw OpenSearchStatusException(failureReason, RestStatus.INTERNAL_SERVER_ERROR)
}
return metadata.copy(
seqNo = response.seqNo,
primaryTerm = response.primaryTerm
)
} catch (e: Exception) {
throw AlertingException.wrap(e)
}
Expand Down

0 comments on commit 5b3e1a2

Please sign in to comment.