From 134ebf55455965bfc169529e0670d405261adebd Mon Sep 17 00:00:00 2001 From: Rob Fletcher Date: Wed, 3 May 2017 15:48:01 -0700 Subject: [PATCH] fix(queue): handle starting stages with no tasks --- .../orca/q/handler/StartStageHandler.kt | 28 +++--- .../com/netflix/spinnaker/orca/q/Stages.kt | 12 +++ .../orca/q/handler/StartStageHandlerSpec.kt | 90 ++++++++++++++++++- 3 files changed, 116 insertions(+), 14 deletions(-) diff --git a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt index bf5c87a1d3..98cb757690 100644 --- a/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt +++ b/orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandler.kt @@ -16,8 +16,7 @@ package com.netflix.spinnaker.orca.q.handler -import com.netflix.spinnaker.orca.ExecutionStatus.RUNNING -import com.netflix.spinnaker.orca.ExecutionStatus.SKIPPED +import com.netflix.spinnaker.orca.ExecutionStatus.* import com.netflix.spinnaker.orca.events.StageStarted import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder import com.netflix.spinnaker.orca.pipeline.model.OptionalStageSupport @@ -72,19 +71,24 @@ open class StartStageHandler @Autowired constructor( } private fun Stage<*>.start() { - firstBeforeStages().let { beforeStages -> - if (beforeStages.isEmpty()) { - firstTask().let { task -> - if (task == null) { - TODO("do what? Nothing to do, just indicate end of stage?") - } else { - queue.push(StartTask(this, task.id)) + val beforeStages = firstBeforeStages() + if (beforeStages.isEmpty()) { + val task = firstTask() + if (task == null) { + val afterStages = firstAfterStages() + if (afterStages.isEmpty()) { + queue.push(CompleteStage(this, SUCCEEDED)) + } else { + afterStages.forEach { + queue.push(StartStage(it)) } } } else { - beforeStages.forEach { - queue.push(StartStage(it)) - } + queue.push(StartTask(this, task.id)) + } + } else { + beforeStages.forEach { + queue.push(StartStage(it)) } } } diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/Stages.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/Stages.kt index ea9a95deeb..b797edbcfa 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/Stages.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/Stages.kt @@ -32,6 +32,10 @@ val singleTaskStage = object : StageDefinitionBuilder { } } +val zeroTaskStage = object : StageDefinitionBuilder { + override fun getType() = "zeroTaskStage" +} + val multiTaskStage = object : StageDefinitionBuilder { override fun getType() = "multiTaskStage" override fun > taskGraph(stage: Stage, builder: Builder) { @@ -83,6 +87,14 @@ val stageWithSyntheticAfter = object : StageDefinitionBuilder { ) } +val stageWithSyntheticAfterAndNoTasks = object : StageDefinitionBuilder { + override fun getType() = "stageWithSyntheticAfterAndNoTasks" + + override fun > aroundStages(stage: Stage) = listOf( + newStage(stage.execution, singleTaskStage.type, "post", mutableMapOf(), stage, STAGE_AFTER) + ) +} + val stageWithParallelBranches = object : BranchingStageDefinitionBuilder { override fun > parallelContexts(stage: Stage): Collection> = listOf( diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerSpec.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerSpec.kt index ecb03ecb1f..35d07999f5 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerSpec.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartStageHandlerSpec.kt @@ -57,7 +57,9 @@ class StartStageHandlerSpec : Spek({ stageWithSyntheticBefore, stageWithSyntheticAfter, stageWithParallelBranches, - rollingPushStage + rollingPushStage, + zeroTaskStage, + stageWithSyntheticAfterAndNoTasks ), publisher, clock, @@ -119,6 +121,90 @@ class StartStageHandlerSpec : Spek({ } } + context("with no tasks") { + context("and no after stages") { + val pipeline = pipeline { + application = "foo" + stage { + refId = "1" + type = zeroTaskStage.type + } + } + val message = StartStage(pipeline.stageByRef("1")) + + beforeGroup { + whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + handler.handle(message) + } + + it("updates the stage status") { + verify(repository).storeStage(check { + it.getStatus() shouldEqual RUNNING + it.getStartTime() shouldEqual clock.millis() + }) + } + + it("immediately completes the stage") { + verify(queue).push(CompleteStage(message, SUCCEEDED)) + verifyNoMoreInteractions(queue) + } + + it("publishes an event") { + verify(publisher).publishEvent(check { + it.executionType shouldEqual pipeline.javaClass + it.executionId shouldEqual pipeline.id + it.stageId shouldEqual message.stageId + }) + } + } + + context("and at least one after stage") { + val pipeline = pipeline { + application = "foo" + stage { + refId = "1" + type = stageWithSyntheticAfterAndNoTasks.type + } + } + val message = StartStage(pipeline.stageByRef("1")) + + beforeGroup { + whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline + } + + afterGroup(::resetMocks) + + action("the handler receives a message") { + handler.handle(message) + } + + it("updates the stage status") { + verify(repository).storeStage(check { + it.getStatus() shouldEqual RUNNING + it.getStartTime() shouldEqual clock.millis() + }) + } + + it("immediately starts the first after stage") { + verify(queue).push(StartStage(pipeline.stageByRef("1>1"))) + verifyNoMoreInteractions(queue) + } + + it("publishes an event") { + verify(publisher).publishEvent(check { + it.executionType shouldEqual pipeline.javaClass + it.executionId shouldEqual pipeline.id + it.stageId shouldEqual message.stageId + }) + } + } + } + context("with several linear tasks") { val pipeline = pipeline { application = "foo" @@ -165,7 +251,7 @@ class StartStageHandlerSpec : Spek({ }) } - it("raises an event to indicate the first task is starting") { + it("starts the first task") { verify(queue).push(StartTask( message.executionType, message.executionId,