Skip to content

Commit

Permalink
fix(queue): reschedule child pipeline when parent canceled (#2139)
Browse files Browse the repository at this point in the history
  • Loading branch information
asher committed Apr 11, 2018
1 parent e5bd1fb commit 79f5ab3
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,30 @@ fun ConfigurableApplicationContext.runToCompletion(execution: Execution, launche
repository.waitForAllStagesToComplete(execution)
}

/**
* Given parent and child pipelines:
* 1) Start child pipeline
* 2) Invoke parent pipeline and continue until completion
*
* Useful for testing failure interactions between pipelines. Child pipeline can be inspected prior to
* completion, and subsequently completed via [runToCompletion].
*/
fun ConfigurableApplicationContext.runParentToCompletion(parent: Execution,
child: Execution,
launcher: (Execution) -> Unit,
repository: ExecutionRepository) {
val latch = ExecutionLatch(Predicate {
it.executionId == parent.id
})

addApplicationListener(latch)
launcher.invoke(child)
launcher.invoke(parent)
assert(latch.await()) { "Pipeline did not complete" }

repository.waitForAllStagesToComplete(parent)
}

fun ConfigurableApplicationContext.restartAndRunToCompletion(stage: Stage, launcher: (Execution, String) -> Unit, repository: ExecutionRepository) {
val execution = stage.execution
val latch = ExecutionLatch(Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.netflix.spinnaker.config.QueueConfiguration
import com.netflix.spinnaker.kork.eureka.RemoteStatusChangedEvent
import com.netflix.spinnaker.kork.jedis.RedisClientDelegate
import com.netflix.spinnaker.kork.jedis.RedisClientSelector
import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.ExecutionStatus.*
import com.netflix.spinnaker.orca.TaskResult
import com.netflix.spinnaker.orca.config.OrcaConfiguration
Expand Down Expand Up @@ -424,6 +425,47 @@ class QueueIntegrationTest {
}
}

@Test
fun `child pipeline is prompty cancelled with the parent regardless of task backoff time`() {
val childPipeline = pipeline {
application = "spinnaker"
stage {
refId = "wait"
type = "wait"
context = mapOf("waitTime" to 60)
}
}
val parentPipeline = pipeline {
application = "spinnaker"
stage {
refId = "1"
type = "pipeline"
context = mapOf("executionId" to childPipeline.id)
}
}

repository.store(childPipeline)
repository.store(parentPipeline)

whenever(dummyTask.execute(argThat { refId == "1" })) doReturn TaskResult(CANCELED)
context.runParentToCompletion(parentPipeline, childPipeline, runner::start, repository)

repository.retrieve(PIPELINE, parentPipeline.id).apply {
assertThat(status == CANCELED)
}
repository.retrieve(PIPELINE, childPipeline.id).apply {
assertThat(stageByRef("wait").status == RUNNING)
}

context.runToCompletion(childPipeline, runner::start, repository)

repository.retrieve(PIPELINE, childPipeline.id).apply {
assertThat(isCanceled).isTrue()
assertThat(stageByRef("wait").wasShorterThan(10000L)).isTrue()
assertThat(stageByRef("wait").status == CANCELED)
}
}

@Test
fun `terminal pipeline immediately cancels stages in other branches where tasks have long backoff times`() {
val pipeline = pipeline {
Expand Down Expand Up @@ -838,6 +880,21 @@ class TestConfig {
}
}

@Bean
fun pipelineStage(@Autowired repository: ExecutionRepository): StageDefinitionBuilder =
object : CancellableStage, StageDefinitionBuilder {
override fun taskGraph(stage: Stage, builder: Builder) {
builder.withTask<DummyTask>("dummy")
}

override fun getType() = "pipeline"

override fun cancel(stage: Stage?): CancellableStage.Result {
repository.cancel(stage!!.context["executionId"] as String)
return CancellableStage.Result(stage, mapOf("foo" to "bar"))
}
}

@Bean
fun currentInstanceId() = "localhost"

Expand All @@ -863,7 +920,8 @@ class TestConfig {
publisher = publisher
)

@Bean fun redisClientSelector(redisClientDelegates: List<RedisClientDelegate>) =
@Bean
fun redisClientSelector(redisClientDelegates: List<RedisClientDelegate>) =
RedisClientSelector(redisClientDelegates)
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@ import com.netflix.spinnaker.orca.CancellableStage
import com.netflix.spinnaker.orca.ExecutionStatus.RUNNING
import com.netflix.spinnaker.orca.Task
import com.netflix.spinnaker.orca.pipeline.StageDefinitionBuilderFactory
import com.netflix.spinnaker.orca.pipeline.model.Execution.ExecutionType.*
import com.netflix.spinnaker.orca.pipeline.persistence.ExecutionRepository
import com.netflix.spinnaker.orca.q.CancelStage
import com.netflix.spinnaker.orca.q.RescheduleExecution
import com.netflix.spinnaker.orca.q.RunTask
import com.netflix.spinnaker.q.Queue
import org.springframework.beans.factory.annotation.Qualifier
Expand Down Expand Up @@ -81,6 +83,15 @@ class CancelStageHandler(
// time out.
executor.execute {
builder.cancel(stage)
// Special case for PipelineStage to ensure prompt cancellation of
// child pipelines and deployment strategies regardless of task backoff
if (stage.type.equals("pipeline", true) && stage.context.containsKey("executionId")) {
val childId = stage.context["executionId"] as? String
if (childId != null) {
val child = repository.retrieve(PIPELINE, childId)
queue.push(RescheduleExecution(child))
}
}
}
}
}
Expand Down

0 comments on commit 79f5ab3

Please sign in to comment.