Skip to content

Commit

Permalink
fix(nu-orca): handle parallel stage name/type correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 2, 2017
1 parent 2574dd1 commit 79bbba2
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class BakeStage implements BranchingStageDefinitionBuilder, RestartableStage {
}

@Override
<T extends Execution<T>> String parallelStageName(Stage<T> stage, boolean hasParallelFlows) {
String parallelStageName(Stage<?> stage, boolean hasParallelFlows) {
return hasParallelFlows ? "Multi-region Bake" : stage.name
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ParallelDeployStage implements BranchingStageDefinitionBuilder {

@Override
String getChildStageType(Stage childStage) {
return isClone(childStage) ? CloneServerGroupStage.PIPELINE_CONFIG_TYPE : CreateServerGroupStage.PIPELINE_CONFIG_TYPE
return isClone(childStage) ? CloneServerGroupStage.PIPELINE_CONFIG_TYPE : PIPELINE_CONFIG_TYPE
}

@CompileDynamic
Expand Down Expand Up @@ -146,7 +146,7 @@ class ParallelDeployStage implements BranchingStageDefinitionBuilder {
}

@Override
<T extends Execution<T>> String parallelStageName(Stage<T> stage, boolean hasParallelFlows) {
String parallelStageName(Stage<?> stage, boolean hasParallelFlows) {
return isClone(stage) ? "Clone" : stage.name
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ default void postBranchGraph(Stage<?> stage, TaskNode.Builder builder) {
* Override this to rename the stage if it has parallel flows.
* This affects the base stage not the individual parallel synthetic stages.
*/
default <T extends Execution<T>> String parallelStageName(Stage<T> stage, boolean hasParallelFlows) {
default String parallelStageName(Stage<?> stage, boolean hasParallelFlows) {
return stage.getName();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,6 @@ class CheckPreconditionsStage implements BranchingStageDefinitionBuilder {
builder.withTask("checkPrecondition", preconditionTask.getClass() as Class<? extends Task>)
}

@Override
String parallelStageName(Stage stage, boolean hasParallelFlows) {
stage.name
}

@Override
def <T extends Execution<T>> Collection<Map<String, Object>> parallelContexts(Stage<T> stage) {
stage.resolveStrategyParams()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,13 +89,20 @@ fun StageDefinitionBuilder.buildSyntheticStages(
}

@Suppress("UNCHECKED_CAST")
private fun BranchingStageDefinitionBuilder.eachParallelContext(stage: Stage<*>, block: (Map<String, Any>) -> Stage<*>) =
private fun BranchingStageDefinitionBuilder.parallelContexts(stage: Stage<*>) =
when (stage.getExecution()) {
is Pipeline -> parallelContexts(stage as Stage<Pipeline>)
is Orchestration -> parallelContexts(stage as Stage<Orchestration>)
else -> throw IllegalStateException()
}
.map(block)

@Suppress("UNCHECKED_CAST")
private fun BranchingStageDefinitionBuilder.parallelStageName(stage: Stage<*>, hasParallelStages: Boolean) =
when (stage.getExecution()) {
is Pipeline -> parallelStageName(stage as Stage<Pipeline>, hasParallelStages)
is Orchestration -> parallelStageName(stage as Stage<Orchestration>, hasParallelStages)
else -> throw IllegalStateException()
}

private typealias SyntheticStages = Map<SyntheticStageOwner, List<Stage<*>>>

Expand Down Expand Up @@ -150,14 +157,18 @@ private fun SyntheticStages.buildAfterStages(stage: Stage<out Execution<*>>, cal

private fun StageDefinitionBuilder.buildParallelStages(stage: Stage<out Execution<*>>, callback: (Stage<*>) -> Unit) {
if (this is BranchingStageDefinitionBuilder && stage.getParentStageId() == null) {
stage.setInitializationStage(true)
eachParallelContext(stage) { context ->
val parallelContexts = parallelContexts(stage)
parallelContexts
.map { context ->
val execution = stage.getExecution()
when (execution) {
is Pipeline -> newStage(execution, stage.getType(), stage.getName(), context, stage as Stage<Pipeline>, STAGE_BEFORE)
is Orchestration -> newStage(execution, stage.getType(), stage.getName(), context, stage as Stage<Orchestration>, STAGE_BEFORE)
else -> throw IllegalStateException()
}
val stageType = context.getOrDefault("type", stage.getType()).toString()
val stageName = context.getOrDefault("name", stage.getName()).toString()
@Suppress("UNCHECKED_CAST")
when (execution) {
is Pipeline -> newStage(execution, stageType, stageName, context, stage as Stage<Pipeline>, STAGE_BEFORE)
is Orchestration -> newStage(execution, stageType, stageName, context, stage as Stage<Orchestration>, STAGE_BEFORE)
else -> throw IllegalStateException()
}
}
.forEachIndexed { i, it ->
// TODO: this is insane backwards nonsense, it doesn't need the child stage in any impl so we could determine this when building the stage in the first place
Expand All @@ -169,6 +180,8 @@ private fun StageDefinitionBuilder.buildParallelStages(stage: Stage<out Executio
callback.invoke(it)
}
}
stage.setName(parallelStageName(stage, parallelContexts.size > 1))
stage.setInitializationStage(true)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ val stageWithSyntheticAfter = object : StageDefinitionBuilder {
val stageWithParallelBranches = object : BranchingStageDefinitionBuilder {
override fun <T : Execution<T>> parallelContexts(stage: Stage<T>): Collection<Map<String, Any>> =
listOf(
mapOf("region" to "us-east-1"),
mapOf("region" to "us-west-2"),
mapOf("region" to "eu-west-1")
mapOf("region" to "us-east-1", "name" to "run in us-east-1"),
mapOf("region" to "us-west-2", "name" to "run in us-west-2"),
mapOf("region" to "eu-west-1", "name" to "run in eu-west-1")
)

override fun parallelStageName(stage: Stage<*>, hasParallelFlows: Boolean) =
if (hasParallelFlows) "is parallel" else "is not parallel"

override fun preBranchGraph(stage: Stage<*>, builder: Builder) {
builder.withTask("pre-branch", DummyTask::class.java)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,7 @@ class CompleteStageHandlerSpec : Spek({
application = "foo"
stage {
refId = "1"
name = "parallel"
type = stageWithParallelBranches.type
stageWithParallelBranches.buildSyntheticStages(this)
stageWithParallelBranches.buildTasks(this)
Expand Down Expand Up @@ -420,6 +421,7 @@ class CompleteStageHandlerSpec : Spek({
application = "foo"
stage {
refId = "1"
name = "parallel"
type = stageWithParallelBranches.type
stageWithParallelBranches.buildSyntheticStages(this)
stageWithParallelBranches.buildTasks(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ class StartStageHandlerSpec : Spek({
application = "foo"
stage {
refId = "1"
name = "parallel"
type = stageWithParallelBranches.type
}
}
Expand Down Expand Up @@ -367,6 +368,15 @@ class StartStageHandlerSpec : Spek({
// TODO: contexts, etc.
}

it("renames the primary branch") {
pipeline.stageByRef("1").name shouldEqual "is parallel"
}

it("renames each parallel branch") {
val stage = pipeline.stageByRef("1")
pipeline.stages.filter { it.parentStageId == stage.id }.map { it.name } shouldEqual listOf("run in us-east-1", "run in us-west-2", "run in eu-west-1")
}

it("runs the parallel stages") {
verify(queue, times(3)).push(check<StartStage> {
pipeline.stageById(it.stageId).parentStageId shouldEqual message.stageId
Expand All @@ -379,6 +389,7 @@ class StartStageHandlerSpec : Spek({
application = "foo"
stage {
refId = "1"
name = "parallel"
type = stageWithParallelBranches.type
stageWithParallelBranches.buildSyntheticStages(this)
stageWithParallelBranches.buildTasks(this)
Expand Down

0 comments on commit 79bbba2

Please sign in to comment.