Skip to content

Commit

Permalink
fix(peering): Log error metrics more better (#3647)
Browse files Browse the repository at this point in the history
* fix(peering): Log error metrics more better

Not all exceptions were caught and thus would leak out of the `PeeringAgent` and weren't logged.
Additionally, threw a retry on the most eggregious SQL query in case it fails due to timeout.

* fixup! fix(peering): Log error metrics more better

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
marchello2000 and mergify[bot] committed May 3, 2020
1 parent f7d7650 commit d464708
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@ package com.netflix.spinnaker.orca.peering
import com.netflix.spinnaker.kork.exceptions.SystemException
import com.netflix.spinnaker.kork.sql.routing.withPool
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import io.github.resilience4j.retry.Retry
import io.github.resilience4j.retry.RetryConfig
import io.vavr.control.Try
import java.time.Duration
import kotlin.math.min
import org.jooq.DSLContext
import org.jooq.Record
Expand Down Expand Up @@ -42,14 +46,16 @@ open class MySqlRawAccess(
field("`partition`").eq(partitionName)
}

return withPool(poolName) {
jooq
.select(field("id"), field("updated_at"))
.from(getExecutionTable(executionType))
.where(field("status").`in`(*completedStatuses.toTypedArray())
.and(field("updated_at").gt(updatedAfter))
.and(partitionConstraint))
.fetchInto(ExecutionDiffKey::class.java)
return withRetry {
withPool(poolName) {
jooq
.select(field("id"), field("updated_at"))
.from(getExecutionTable(executionType))
.where(field("status").`in`(*completedStatuses.toTypedArray())
.and(field("updated_at").gt(updatedAfter))
.and(partitionConstraint))
.fetchInto(ExecutionDiffKey::class.java)
}
}
}

Expand Down Expand Up @@ -204,4 +210,16 @@ open class MySqlRawAccess(

return persisted
}

private fun <T> withRetry(action: () -> T): T {
val retry = Retry.of(
"sqlPeeringAgent",
RetryConfig.custom<T>()
.maxAttempts(3)
.waitDuration(Duration.ofMillis(500))
.build()
)

return Try.ofSupplier(Retry.decorateSupplier(retry, action)).get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import com.netflix.spinnaker.orca.notifications.AbstractPollingNotificationAgent
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock
import java.time.Duration
import java.time.Instant
import kotlin.math.max
import org.slf4j.LoggerFactory

/**
Expand Down Expand Up @@ -76,7 +75,7 @@ class PeeringAgent(
override fun tick() {
if (dynamicConfigService.isEnabled("pollers.peering", true) &&
dynamicConfigService.isEnabled("pollers.peering.$peeredId", true)) {
peeringMetrics.recordOverallLag() {
peeringMetrics.recordOverallLag {
peerExecutions(ExecutionType.PIPELINE)
peerExecutions(ExecutionType.ORCHESTRATION)
peerDeletedExecutions()
Expand All @@ -85,24 +84,29 @@ class PeeringAgent(
}

private fun peerExecutions(executionType: ExecutionType) {
val start = Instant.now()
try {
val start = Instant.now()

val mostRecentUpdatedTime = when (executionType) {
ExecutionType.ORCHESTRATION -> completedOrchestrationsMostRecentUpdatedTime
ExecutionType.PIPELINE -> completedPipelinesMostRecentUpdatedTime
}
val isFirstRun = mostRecentUpdatedTime == 0L
val mostRecentUpdatedTime = when (executionType) {
ExecutionType.ORCHESTRATION -> completedOrchestrationsMostRecentUpdatedTime
ExecutionType.PIPELINE -> completedPipelinesMostRecentUpdatedTime
}
val isFirstRun = mostRecentUpdatedTime == 0L

// On first copy of completed executions, there is no point in copying active executions
// because they will be woefully out of date (since the first bulk copy will likely take 20+ minutes)
if (isFirstRun) {
peerCompletedExecutions(executionType)
} else {
peerCompletedExecutions(executionType)
peerActiveExecutions(executionType)
}
// On first copy of completed executions, there is no point in copying active executions
// because they will be woefully out of date (since the first bulk copy will likely take 20+ minutes)
if (isFirstRun) {
peerCompletedExecutions(executionType)
} else {
peerCompletedExecutions(executionType)
peerActiveExecutions(executionType)
}

peeringMetrics.recordLag(executionType, Duration.between(start, Instant.now()))
peeringMetrics.recordLag(executionType, Duration.between(start, Instant.now()))
} catch (e: Exception) {
log.error("Failed to peer $executionType", e)
peeringMetrics.incrementNumErrors(executionType)
}
}

/**
Expand Down Expand Up @@ -142,9 +146,9 @@ class PeeringAgent(
val newLatestUpdateTime = doMigrate(executionType, updatedAfter) - clockDriftMs

if (executionType == ExecutionType.ORCHESTRATION) {
completedOrchestrationsMostRecentUpdatedTime = max(0, newLatestUpdateTime)
completedOrchestrationsMostRecentUpdatedTime = newLatestUpdateTime.coerceAtLeast(0)
} else {
completedPipelinesMostRecentUpdatedTime = max(0, newLatestUpdateTime)
completedPipelinesMostRecentUpdatedTime = newLatestUpdateTime.coerceAtLeast(0)
}
}

Expand All @@ -156,43 +160,32 @@ class PeeringAgent(
* There is no harm (just some wasted RDS CPU) to "deleting" an execution that doesn't exist
*/
private fun peerDeletedExecutions() {
val deletedExecutionIds = srcDB.getDeletedExecutions(deletedExecutionCursor)
val orchestrationIdsToDelete = deletedExecutionIds.filter { it.execution_type == ExecutionType.ORCHESTRATION.toString() }.map { it.execution_id }
val pipelineIdsToDelete = deletedExecutionIds.filter { it.execution_type == ExecutionType.PIPELINE.toString() }.map { it.execution_id }
try {
val deletedExecutionIds = srcDB.getDeletedExecutions(deletedExecutionCursor)
val orchestrationIdsToDelete = deletedExecutionIds.filter { it.execution_type == ExecutionType.ORCHESTRATION.toString() }.map { it.execution_id }
val pipelineIdsToDelete = deletedExecutionIds.filter { it.execution_type == ExecutionType.PIPELINE.toString() }.map { it.execution_id }

log.debug("Found ${deletedExecutionIds.size} (orchestrations: ${orchestrationIdsToDelete.size} pipelines: ${pipelineIdsToDelete.size} deleted candidates after cursor: $deletedExecutionCursor")
var hadFailures = false
var orchestrationsDeleted = 0
var pipelinesDeleted = 0
log.debug("Found ${deletedExecutionIds.size} (orchestrations: ${orchestrationIdsToDelete.size} pipelines: ${pipelineIdsToDelete.size} deleted candidates after cursor: $deletedExecutionCursor")

try {
orchestrationsDeleted = destDB.deleteExecutions(ExecutionType.ORCHESTRATION, orchestrationIdsToDelete)
peeringMetrics.incrementNumDeleted(ExecutionType.ORCHESTRATION, orchestrationsDeleted)
} catch (e: Exception) {
log.error("Failed to delete some orchestrations", e)
peeringMetrics.incrementNumErrors(ExecutionType.ORCHESTRATION)
hadFailures = true
}
val orchestrationDeletionResult = delete(ExecutionType.ORCHESTRATION, orchestrationIdsToDelete)
val pipelinesDeletionResult = delete(ExecutionType.PIPELINE, pipelineIdsToDelete)
val succeeded = !(orchestrationDeletionResult.hadFailures || pipelinesDeletionResult.hadFailures)

try {
pipelinesDeleted = destDB.deleteExecutions(ExecutionType.PIPELINE, pipelineIdsToDelete)
peeringMetrics.incrementNumDeleted(ExecutionType.PIPELINE, pipelinesDeleted)
if (succeeded) {
deletedExecutionCursor = (deletedExecutionIds.maxBy { it.id })
?.id
?: deletedExecutionCursor

// It is likely that some executions were deleted during "general" peering (e.g. in doMigrate), but most will be
// deleted here so it's OK for the actual delete counts to not match the "requested" count
log.debug("Deleted orchestrations: ${orchestrationDeletionResult.numDeleted} (of ${orchestrationIdsToDelete.size} requested), pipelines: ${pipelinesDeletionResult.numDeleted} (of ${pipelineIdsToDelete.size} requested), new cursor: $deletedExecutionCursor")
} else {
log.error("Failed to delete some executions, not updating the cursor location to retry next time")
}
} catch (e: Exception) {
log.error("Failed to delete some pipelines", e)
log.error("Failed to delete some executions", e)
peeringMetrics.incrementNumErrors(ExecutionType.ORCHESTRATION)
peeringMetrics.incrementNumErrors(ExecutionType.PIPELINE)
hadFailures = true
}

if (!hadFailures) {
deletedExecutionCursor = (deletedExecutionIds.maxBy { it.id })
?.id
?: deletedExecutionCursor

// It is likely that some executions were deleted during "general" peering (e.g. in doMigrate), but most will be
// deleted here so it's OK for the actual delete counts to not match the "requested" count
log.debug("Deleted orchestrations: $orchestrationsDeleted (of ${orchestrationIdsToDelete.size} requested), pipelines: $pipelinesDeleted (of ${pipelineIdsToDelete.size} requested), new cursor: $deletedExecutionCursor")
} else {
log.error("Failed to delete some executions, not updating the cursor location to retry next time")
}
}

Expand Down Expand Up @@ -253,6 +246,28 @@ class PeeringAgent(
return migrationResult.latestUpdatedAt
}

/**
* Delete specified executions of the given type
*
* @return number deleted and if there were any errors
*/
private fun delete(executionType: ExecutionType, idsToDelete: List<String>): DeletionResult {
var numDeleted = 0
var hadFailures = false
try {
numDeleted = destDB.deleteExecutions(executionType, idsToDelete)
peeringMetrics.incrementNumDeleted(executionType, numDeleted)
} catch (e: Exception) {
log.error("Failed to delete some $executionType", e)
peeringMetrics.incrementNumErrors(executionType)
hadFailures = true
}

return DeletionResult(numDeleted, hadFailures)
}

private data class DeletionResult(val numDeleted: Int, val hadFailures: Boolean)

override fun getPollingInterval() = pollingIntervalMs
override fun getNotificationType(): String = this.javaClass.simpleName
}

0 comments on commit d464708

Please sign in to comment.