Skip to content

Commit

Permalink
feat(queue): Add command to cleanup zombie executions (#3946)
Browse files Browse the repository at this point in the history
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
robzienert and mergify[bot] committed Oct 8, 2020
1 parent cd8ad6c commit dbe175d
Show file tree
Hide file tree
Showing 5 changed files with 315 additions and 55 deletions.
@@ -0,0 +1,118 @@
/*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.spinnaker.orca.q

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.ORCHESTRATION
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.q.metrics.MonitorableQueue
import com.netflix.spinnaker.security.AuthenticatedRequest
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.util.Optional
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.stereotype.Component
import rx.Scheduler
import rx.schedulers.Schedulers

/**
* Logic related to operating zombie pipeline executions.
*
* A zombie pipeline execution is a pipeline execution that:
*
* 1. Has a status of [ExecutionStatus.RUNNING].
* 2. Does not have a message in the work queue.
*
* Until an overhaul on persistence is done, zombies occur intermittently even without major
* outages, and are irrecoverable.
*/
@Component
@ConditionalOnBean(MonitorableQueue::class)
class ZombieExecutionService(
private val executionRepository: ExecutionRepository,
private val queue: MonitorableQueue,
private val clock: Clock,
@Qualifier("scheduler") private val scheduler: Optional<Scheduler>
) {

private val log by lazy { LoggerFactory.getLogger(javaClass) }

/**
* Find all zombie pipeline executions.
*
* @param minimumInactivity The minimum amount of time an execution should be inactive before
* being considered a candidate as a zombie. The shorter this timeframe is, the more likely
* you will have half positives.
*/
fun findAllZombies(minimumInactivity: Duration): List<PipelineExecution> {
val criteria = ExecutionRepository.ExecutionCriteria().setStatuses(ExecutionStatus.RUNNING)
return executionRepository.retrieve(PIPELINE, criteria)
.mergeWith(executionRepository.retrieve(ORCHESTRATION, criteria))
.subscribeOn(scheduler.orElseGet(Schedulers::io))
.filter { hasBeenAroundAWhile(it, minimumInactivity) }
.filter(this::queueHasNoMessages)
.toList()
.toBlocking()
.first()
}

/**
* Find and kill all zombies.
*
* Since this pipeline can be highly disruptive to users in falsely identified zombies, the
* default [minimumActivity]cvalue of 60 minutes is the recommended low minimum setting. There
* is no risk in letting a zombie "run", so be safe.
*/
fun killZombies(minimumActivity: Duration = Duration.ofMinutes(60)) {
findAllZombies(minimumActivity).forEach {
killZombie(it)
}
}

/**
* Kill a single zombie pipeline execution.
*
* WARNING: This method is unprotected: It will set a pipeline to canceled without verifying that it is not a zombie.
* It is recommended to use [killZombies] instead.
*/
fun killZombie(execution: PipelineExecution) {
log.warn("Force cancelling zombie execution and all of its stages: ${execution.application}/${execution.id}")
execution.stages
.filter { it.status == ExecutionStatus.RUNNING }
.forEach {
it.status = ExecutionStatus.CANCELED
it.endTime = clock.millis()
}

execution.status = ExecutionStatus.CANCELED
execution.cancellationReason = "Identified as a zombie execution"
execution.canceledBy = AuthenticatedRequest.getSpinnakerUser().orElse("admin")
}

private fun hasBeenAroundAWhile(execution: PipelineExecution, cutoff: Duration): Boolean =
Instant.ofEpochMilli(execution.buildTime!!)
.isBefore(clock.instant().minus(cutoff))

private fun queueHasNoMessages(execution: PipelineExecution): Boolean =
!queue.containsMessage { message ->
message is ExecutionLevel && message.executionId == execution.id
}
}
Expand Up @@ -15,24 +15,37 @@
*/
package com.netflix.spinnaker.orca.q.admin.web

import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.StartWaitingExecutions
import com.netflix.spinnaker.orca.q.ZombieExecutionService
import com.netflix.spinnaker.orca.q.admin.HydrateQueueCommand
import com.netflix.spinnaker.orca.q.admin.HydrateQueueInput
import com.netflix.spinnaker.orca.q.admin.HydrateQueueOutput
import com.netflix.spinnaker.q.Message
import com.netflix.spinnaker.q.Queue
import java.lang.IllegalStateException
import java.time.Duration
import java.time.Instant
import java.util.Optional
import javassist.NotFoundException
import javax.ws.rs.QueryParam
import org.springframework.http.HttpStatus
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.PostMapping
import org.springframework.web.bind.annotation.RequestBody
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.ResponseStatus
import org.springframework.web.bind.annotation.RestController

@RestController
@RequestMapping("/admin/queue")
class QueueAdminController(
private val hydrateCommand: HydrateQueueCommand,
private val queue: Queue
private val zombieExecutionService: Optional<ZombieExecutionService>,
private val queue: Queue,
private val executionRepository: ExecutionRepository
) {

@PostMapping(value = ["/hydrate"])
Expand All @@ -51,6 +64,17 @@ class QueueAdminController(
)
)

@PostMapping(value = ["/zombies:kill"])
fun killZombies(@QueryParam("minimumActivity") minimumActivity: Duration = Duration.ofMinutes(60)) {
getZombieExecutionService().killZombies(minimumActivity)
}

@PostMapping(value = ["/zombies/{executionId}:kill"])
@ResponseStatus(HttpStatus.NO_CONTENT)
fun killZombie(@PathVariable executionId: String) {
getZombieExecutionService().killZombie(getPipelineOrOrchestration(executionId))
}

/**
* Posts StartWaitingExecutions message for the given pipeline message into the queue.
* This is useful when doing DB migration. If an execution is running from an old DB
Expand Down Expand Up @@ -85,4 +109,20 @@ class QueueAdminController(
) {
queue.push(message)
}

private fun getZombieExecutionService(): ZombieExecutionService =
zombieExecutionService
.orElseThrow {
IllegalStateException(
"Zombie management is unavailable. This is likely due to the queue not being enabled on this instance."
)
}

private fun getPipelineOrOrchestration(executionId: String): PipelineExecution {
return try {
executionRepository.retrieve(ExecutionType.PIPELINE, executionId)
} catch (e: NotFoundException) {
executionRepository.retrieve(ExecutionType.ORCHESTRATION, executionId)
}
}
}
Expand Up @@ -19,31 +19,19 @@ package com.netflix.spinnaker.orca.q.metrics
import com.netflix.spectator.api.BasicTag
import com.netflix.spectator.api.Registry
import com.netflix.spectator.api.Tag
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionStatus.RUNNING
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.ORCHESTRATION
import com.netflix.spinnaker.orca.api.pipeline.models.ExecutionType.PIPELINE
import com.netflix.spinnaker.orca.api.pipeline.models.PipelineExecution
import com.netflix.spinnaker.orca.notifications.AbstractPollingNotificationAgent
import com.netflix.spinnaker.orca.notifications.NotificationClusterLock
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.ExecutionLevel
import com.netflix.spinnaker.orca.q.ZombieExecutionService
import com.netflix.spinnaker.q.metrics.MonitorableQueue
import java.time.Clock
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit.MINUTES
import java.util.Optional
import net.logstash.logback.argument.StructuredArguments.kv
import org.slf4j.LoggerFactory
import org.slf4j.MDC
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean
import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression
import org.springframework.stereotype.Component
import rx.Scheduler
import rx.schedulers.Schedulers

/**
* Monitors a queue and generates Atlas metrics.
Expand All @@ -53,14 +41,11 @@ import rx.schedulers.Schedulers
"\${queue.zombie-check.enabled:false}"
)
@ConditionalOnBean(MonitorableQueue::class)
class ZombieExecutionCheckingAgent
@Autowired constructor(
private val queue: MonitorableQueue,
class ZombieExecutionCheckingAgent(
private val zombieExecutionService: ZombieExecutionService,
private val registry: Registry,
private val repository: ExecutionRepository,
private val clock: Clock,
private val conch: NotificationClusterLock,
@Qualifier("scheduler") private val zombieCheckScheduler: Optional<Scheduler>,
conch: NotificationClusterLock,
@Value("\${queue.zombie-check.interval-ms:3600000}") private val pollingIntervalMs: Long,
@Value("\${queue.zombie-check.enabled:false}") private val zombieCheckEnabled: Boolean,
@Value("\${queue.zombie-check.cutoff-minutes:10}") private val zombieCheckCutoffMinutes: Long,
Expand Down Expand Up @@ -90,40 +75,24 @@ class ZombieExecutionCheckingAgent
try {
MDC.put(AGENT_MDC_KEY, this.javaClass.simpleName)
val startedAt = clock.instant()
val criteria = ExecutionRepository.ExecutionCriteria().setStatuses(RUNNING)
repository.retrieve(PIPELINE, criteria)
.mergeWith(repository.retrieve(ORCHESTRATION, criteria))
.subscribeOn(zombieCheckScheduler.orElseGet(Schedulers::io))
.filter(this::hasBeenAroundAWhile)
.filter(this::queueHasNoMessages)
.doOnCompleted {
log.info("Completed zombie check in ${Duration.between(startedAt, clock.instant())}")
}
.subscribe {
log.error(
"Found zombie {} {} {} {}",
kv("executionType", it.type),
kv("application", it.application),
kv("executionName", it.name),
kv("executionId", it.id)
)
val tags = mutableListOf<Tag>(
BasicTag("application", it.application),
BasicTag("type", it.type.name)
)
registry.counter("queue.zombies", tags).increment()
}
val zombies = zombieExecutionService.findAllZombies(Duration.ofMinutes(zombieCheckCutoffMinutes))
log.info("Completed zombie check in ${Duration.between(startedAt, clock.instant())}")
zombies.forEach {
log.error(
"Found zombie {} {} {} {}",
kv("executionType", it.type),
kv("application", it.application),
kv("executionName", it.name),
kv("executionId", it.id)
)
val tags = mutableListOf<Tag>(
BasicTag("application", it.application),
BasicTag("type", it.type.name)
)
registry.counter("queue.zombies", tags).increment()
}
} finally {
MDC.remove(AGENT_MDC_KEY)
}
}

private fun hasBeenAroundAWhile(execution: PipelineExecution): Boolean =
Instant.ofEpochMilli(execution.buildTime!!)
.isBefore(clock.instant().minus(zombieCheckCutoffMinutes, MINUTES))

private fun queueHasNoMessages(execution: PipelineExecution): Boolean =
!queue.containsMessage { message ->
message is ExecutionLevel && message.executionId == execution.id
}
}

0 comments on commit dbe175d

Please sign in to comment.