Skip to content

Commit

Permalink
fix(queue): handle starting stages with no tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 3, 2017
1 parent 7a42b9c commit 134ebf5
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@

package com.netflix.spinnaker.orca.q.handler

import com.netflix.spinnaker.orca.ExecutionStatus.RUNNING
import com.netflix.spinnaker.orca.ExecutionStatus.SKIPPED
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.events.StageStarted
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilder
import com.netflix.spinnaker.orca.pipeline.model.OptionalStageSupport
Expand Down Expand Up @@ -72,19 +71,24 @@ open class StartStageHandler @Autowired constructor(
}

private fun Stage<*>.start() {
firstBeforeStages().let { beforeStages ->
if (beforeStages.isEmpty()) {
firstTask().let { task ->
if (task == null) {
TODO("do what? Nothing to do, just indicate end of stage?")
} else {
queue.push(StartTask(this, task.id))
val beforeStages = firstBeforeStages()
if (beforeStages.isEmpty()) {
val task = firstTask()
if (task == null) {
val afterStages = firstAfterStages()
if (afterStages.isEmpty()) {
queue.push(CompleteStage(this, SUCCEEDED))
} else {
afterStages.forEach {
queue.push(StartStage(it))
}
}
} else {
beforeStages.forEach {
queue.push(StartStage(it))
}
queue.push(StartTask(this, task.id))
}
} else {
beforeStages.forEach {
queue.push(StartStage(it))
}
}
}
Expand Down
12 changes: 12 additions & 0 deletions orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/Stages.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ val singleTaskStage = object : StageDefinitionBuilder {
}
}

val zeroTaskStage = object : StageDefinitionBuilder {
override fun getType() = "zeroTaskStage"
}

val multiTaskStage = object : StageDefinitionBuilder {
override fun getType() = "multiTaskStage"
override fun <T : Execution<T>> taskGraph(stage: Stage<T>, builder: Builder) {
Expand Down Expand Up @@ -83,6 +87,14 @@ val stageWithSyntheticAfter = object : StageDefinitionBuilder {
)
}

val stageWithSyntheticAfterAndNoTasks = object : StageDefinitionBuilder {
override fun getType() = "stageWithSyntheticAfterAndNoTasks"

override fun <T : Execution<T>> aroundStages(stage: Stage<T>) = listOf(
newStage(stage.execution, singleTaskStage.type, "post", mutableMapOf(), stage, STAGE_AFTER)
)
}

val stageWithParallelBranches = object : BranchingStageDefinitionBuilder {
override fun <T : Execution<T>> parallelContexts(stage: Stage<T>): Collection<Map<String, Any>> =
listOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ class StartStageHandlerSpec : Spek({
stageWithSyntheticBefore,
stageWithSyntheticAfter,
stageWithParallelBranches,
rollingPushStage
rollingPushStage,
zeroTaskStage,
stageWithSyntheticAfterAndNoTasks
),
publisher,
clock,
Expand Down Expand Up @@ -119,6 +121,90 @@ class StartStageHandlerSpec : Spek({
}
}

context("with no tasks") {
context("and no after stages") {
val pipeline = pipeline {
application = "foo"
stage {
refId = "1"
type = zeroTaskStage.type
}
}
val message = StartStage(pipeline.stageByRef("1"))

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
handler.handle(message)
}

it("updates the stage status") {
verify(repository).storeStage(check {
it.getStatus() shouldEqual RUNNING
it.getStartTime() shouldEqual clock.millis()
})
}

it("immediately completes the stage") {
verify(queue).push(CompleteStage(message, SUCCEEDED))
verifyNoMoreInteractions(queue)
}

it("publishes an event") {
verify(publisher).publishEvent(check<StageStarted> {
it.executionType shouldEqual pipeline.javaClass
it.executionId shouldEqual pipeline.id
it.stageId shouldEqual message.stageId
})
}
}

context("and at least one after stage") {
val pipeline = pipeline {
application = "foo"
stage {
refId = "1"
type = stageWithSyntheticAfterAndNoTasks.type
}
}
val message = StartStage(pipeline.stageByRef("1"))

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doReturn pipeline
}

afterGroup(::resetMocks)

action("the handler receives a message") {
handler.handle(message)
}

it("updates the stage status") {
verify(repository).storeStage(check {
it.getStatus() shouldEqual RUNNING
it.getStartTime() shouldEqual clock.millis()
})
}

it("immediately starts the first after stage") {
verify(queue).push(StartStage(pipeline.stageByRef("1>1")))
verifyNoMoreInteractions(queue)
}

it("publishes an event") {
verify(publisher).publishEvent(check<StageStarted> {
it.executionType shouldEqual pipeline.javaClass
it.executionId shouldEqual pipeline.id
it.stageId shouldEqual message.stageId
})
}
}
}

context("with several linear tasks") {
val pipeline = pipeline {
application = "foo"
Expand Down Expand Up @@ -165,7 +251,7 @@ class StartStageHandlerSpec : Spek({
})
}

it("raises an event to indicate the first task is starting") {
it("starts the first task") {
verify(queue).push(StartTask(
message.executionType,
message.executionId,
Expand Down

0 comments on commit 134ebf5

Please sign in to comment.