From 2a8492b039f3f57b30f8a926eeb705d5a794120a Mon Sep 17 00:00:00 2001 From: Chris Thielen Date: Wed, 26 Jun 2019 13:13:08 -0700 Subject: [PATCH] fix(queue): After trying to start a queued execution that is CANCELED, check the pipeline queue for additional waiting executions (#3015) --- .../orca/q/handler/StartExecutionHandler.kt | 4 +++ .../q/handler/StartExecutionHandlerTest.kt | 30 ++++++++++++++++++- 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandler.kt index f8a01d2a8c..1b3e51e59b 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandler.kt @@ -28,6 +28,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.q.CancelExecution import com.netflix.spinnaker.orca.q.StartExecution import com.netflix.spinnaker.orca.q.StartStage +import com.netflix.spinnaker.orca.q.StartWaitingExecutions import com.netflix.spinnaker.orca.q.pending.PendingExecutionService import com.netflix.spinnaker.q.Queue import net.logstash.logback.argument.StructuredArguments.value @@ -99,6 +100,9 @@ class StartExecutionHandler( private fun terminate(execution: Execution) { if (execution.status == CANCELED || execution.isCanceled) { publisher.publishEvent(ExecutionComplete(this, execution.type, execution.id, execution.status)) + execution.pipelineConfigId?.let { + queue.push(StartWaitingExecutions(it, purgeQueue = !execution.isKeepWaitingPipelines)) + } } else { log.warn("Execution (type: ${execution.type}, id: {}, status: ${execution.status}, application: {})" + " cannot be started unless state is NOT_STARTED. Ignoring StartExecution message.", diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt index 2d7f71e275..42bb55d56a 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartExecutionHandlerTest.kt @@ -29,6 +29,7 @@ import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository import com.netflix.spinnaker.orca.q.CancelExecution import com.netflix.spinnaker.orca.q.StartExecution import com.netflix.spinnaker.orca.q.StartStage +import com.netflix.spinnaker.orca.q.StartWaitingExecutions import com.netflix.spinnaker.orca.q.pending.PendingExecutionService import com.netflix.spinnaker.orca.q.singleTaskStage import com.netflix.spinnaker.q.Queue @@ -140,7 +141,7 @@ object StartExecutionHandlerTest : SubjectSpek({ } } - given("a pipeline that was previously canceled and status is NOT_STARTED") { + given("a pipeline with no pipelineConfigId that was previously canceled and status is NOT_STARTED") { val pipeline = pipeline { stage { type = singleTaskStage.type @@ -173,6 +174,33 @@ object StartExecutionHandlerTest : SubjectSpek({ } } + given("a pipeline with a pipelineConfigId that was previously canceled and status is NOT_STARTED") { + val pipeline = pipeline { + pipelineConfigId = "aaaaa-12345-bbbbb-67890" + isKeepWaitingPipelines = false + stage { + type = singleTaskStage.type + } + isCanceled = true + } + + val message = StartExecution(pipeline) + + beforeGroup { + whenever(repository.retrieve(message.executionType, message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + on("receiving a message") { + subject.handle(message) + } + + it("starts waiting executions for the pipelineConfigId") { + verify(queue).push(StartWaitingExecutions("aaaaa-12345-bbbbb-67890", true)) + } + } + given("a pipeline with multiple initial stages") { val pipeline = pipeline { stage {