Skip to content

Commit

Permalink
fix(after stages): Allow for just-in-time planning of after stages
Browse files Browse the repository at this point in the history
Handle stages that have no before stages or tasks.

# Conflicts:
#	orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandler.kt
#	orca-queue/src/main/kotlin/com/netflix/spinnaker/orca/q/handler/stages.kt
#	orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/CompleteStageHandlerTest.kt
  • Loading branch information
robfletcher committed Mar 26, 2018
1 parent 8b235bb commit d2beaca
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,28 @@ class CompleteStageHandler(
override fun handle(message: CompleteStage) {
message.withStage { stage ->
if (stage.status in setOf(RUNNING, NOT_STARTED)) {
val status = stage.determineStatus()
var status = stage.determineStatus()
try {
if (status.isComplete && !status.isHalt) {
if (status == NOT_STARTED) {
// Stage had no before stages or tasks so we'll see if it had any
// un-planned after stages
var afterStages = stage.firstAfterStages()
if (afterStages.isEmpty()) {
stage.planAfterStages()
afterStages = stage.firstAfterStages()
}
if (afterStages.isNotEmpty()) {
afterStages.forEach {
queue.push(StartStage(message, it.id))
}

return@withStage
} else {
// stage had no synthetic stages or tasks, which is odd but whatever
log.warn("Stage ${stage.id} (${stage.type}) of ${stage.execution.id} had no tasks or synthetic stages!")
status = SKIPPED
}
} else if (status.isComplete && !status.isHalt) {
// check to see if this stage has any unplanned synthetic after stages
var afterStages = stage.firstAfterStages()
if (afterStages.isEmpty()) {
Expand All @@ -69,9 +88,7 @@ class CompleteStageHandler(
return@withStage
}
}
}

if (status.isFailure) {
} else if (status.isFailure) {
if (stage.planOnFailureStages()) {
stage.firstAfterStages().forEach {
queue.push(StartStage(it))
Expand Down Expand Up @@ -212,6 +229,7 @@ private fun Stage.determineStatus(): ExecutionStatus {
val taskStatuses = tasks.map(Task::getStatus)
val allStatuses = syntheticStatuses + taskStatuses
return when {
allStatuses.isEmpty() -> NOT_STARTED
allStatuses.contains(TERMINAL) -> TERMINAL
allStatuses.contains(STOPPED) -> STOPPED
allStatuses.contains(CANCELED) -> CANCELED
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
val registry = NoopRegistry()
val contextParameterProcessor: ContextParameterProcessor = mock()

val emptyStage = object : StageDefinitionBuilder {}

val stageWithTaskAndAfterStages = object : StageDefinitionBuilder {
override fun getType() = "stageWithTaskAndAfterStages"

Expand Down Expand Up @@ -111,7 +113,9 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
stageWithTaskAndAfterStages,
stageThatBlowsUpPlanningAfterStages,
stageWithSyntheticOnFailure,
stageWithNothingButAfterStages
stageWithNothingButAfterStages,
stageWithSyntheticOnFailure,
emptyStage
)
)
}
Expand Down Expand Up @@ -370,7 +374,6 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({

beforeGroup {
whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline
assertThat(pipeline.stages.map { it.type }).isEqualTo(listOf(stageWithTaskAndAfterStages.type))
}

afterGroup(::resetMocks)
Expand All @@ -387,6 +390,10 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
verify(queue).push(StartStage(message, pipeline.stages[1].id))
}

it("does not update the status of the stage itself") {
verify(repository, never()).storeStage(pipeline.stageById(message.stageId))
}

it("does not signal completion of the execution") {
verify(queue, never()).push(isA<CompleteExecution>())
}
Expand Down Expand Up @@ -432,6 +439,37 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
}
}

given("a stage had no synthetics or tasks") {
val pipeline = pipeline {
application = "foo"
stage {
refId = "1"
name = "empty"
status = RUNNING
type = emptyStage.type
}
}

val message = CompleteStage(pipeline.stageByRef("1"))

beforeGroup {
whenever(repository.retrieve(PIPELINE, pipeline.id)) doReturn pipeline
}

afterGroup(::resetMocks)

action("receiving the message") {
subject.handle(message)
}

it("just marks the stage as SKIPPED") {
verify(repository).storeStage(check {
assertThat(it.id).isEqualTo(message.stageId)
assertThat(it.status).isEqualTo(SKIPPED)
})
}
}

setOf(TERMINAL, CANCELED).forEach { taskStatus ->
describe("when a stage's task fails with $taskStatus status") {
val pipeline = pipeline {
Expand Down Expand Up @@ -876,7 +914,12 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
stageWithParallelBranches.buildTasks(this)
}
}
val message = CompleteStage(pipeline.stageByRef("1<1"))

val message = pipeline.stageByRef("1<1").let { completedSynthetic ->
singleTaskStage.buildTasks(completedSynthetic)
completedSynthetic.tasks.forEach { it.status = SUCCEEDED }
CompleteStage(completedSynthetic)
}

beforeGroup {
pipeline.stageById(message.stageId).status = RUNNING
Expand Down Expand Up @@ -904,6 +947,11 @@ object CompleteStageHandlerTest : SubjectSpek<CompleteStageHandler>({
stageWithParallelBranches.buildTasks(this)
}
}

pipeline.stages.filter { it.parentStageId != null }.forEach {
singleTaskStage.buildTasks(it)
it.tasks.forEach { it.status = SUCCEEDED }
}
val message = CompleteStage(pipeline.stageByRef("1<1"))

beforeGroup {
Expand Down

0 comments on commit d2beaca

Please sign in to comment.