Skip to content

Commit

Permalink
fix(core): don't cancel waiting pipelines as well as starting them
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Apr 4, 2018
1 parent 62c681b commit 869e6ee
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,21 +49,20 @@ class StartWaitingExecutionsHandler(
pipelines
.maxBy { it.buildTime ?: 0L }
?.let { newest ->
log.info("Starting queued {} {} {}", newest.application, newest.name, newest.id)
log.info("Starting queued pipeline {} {} {}", newest.application, newest.name, newest.id)
queue.push(StartExecution(newest))
(pipelines - newest)
.also { queued ->
log.info("Dropping queued {} {} {}", newest.application, newest.name, queued.map { it.id })
}
pipelines
.filter { it.id != newest.id }
.forEach {
log.info("Dropping queued pipeline {} {} {}", it.application, it.name, it.id)
queue.push(CancelExecution(it))
}
}
} else {
pipelines
.minBy { it.buildTime ?: 0L }
?.let { oldest ->
log.info("Starting queued {} {} {}", oldest.application, oldest.name, oldest.id)
log.info("Starting queued pipeline {} {} {}", oldest.application, oldest.name, oldest.id)
queue.push(StartExecution(oldest))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.netflix.spinnaker.orca.q.CancelExecution
import com.netflix.spinnaker.orca.q.StartExecution
import com.netflix.spinnaker.orca.q.StartWaitingExecutions
import com.netflix.spinnaker.q.Queue
import com.netflix.spinnaker.spek.and
import com.nhaarman.mockito_kotlin.*
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.given
Expand Down Expand Up @@ -82,23 +81,50 @@ object StartWaitingExecutionsHandlerTest : SubjectSpek<StartWaitingExecutionsHan
pipelineConfigId = configId
}

beforeGroup {
whenever(
repository.retrievePipelinesForPipelineConfigId(
configId,
ExecutionCriteria().setStatuses(NOT_STARTED).setLimit(Int.MAX_VALUE)
)
) doReturn just(waitingPipeline)
}
given("the queue should not be purged") {
beforeGroup {
whenever(
repository.retrievePipelinesForPipelineConfigId(
configId,
ExecutionCriteria().setStatuses(NOT_STARTED).setLimit(Int.MAX_VALUE)
)
) doReturn just(waitingPipeline)
}

afterGroup(::resetMocks)
afterGroup(::resetMocks)

on("receiving the message") {
subject.handle(StartWaitingExecutions(configId))
on("receiving the message") {
subject.handle(StartWaitingExecutions(configId))
}

it("starts the waiting pipeline") {
verify(queue).push(StartExecution(waitingPipeline))
}
}

it("starts the waiting pipeline") {
verify(queue).push(StartExecution(waitingPipeline))
given("the queue should be purged") {
beforeGroup {
whenever(
repository.retrievePipelinesForPipelineConfigId(
configId,
ExecutionCriteria().setStatuses(NOT_STARTED).setLimit(Int.MAX_VALUE)
)
) doReturn just(waitingPipeline)
}

afterGroup(::resetMocks)

on("receiving the message") {
subject.handle(StartWaitingExecutions(configId, purgeQueue = true))
}

it("starts the waiting pipeline") {
verify(queue).push(StartExecution(waitingPipeline))
}

it("does not cancel anything") {
verify(queue, never()).push(isA<CancelExecution>())
}
}
}

Expand All @@ -114,7 +140,7 @@ object StartWaitingExecutionsHandlerTest : SubjectSpek<StartWaitingExecutionsHan
val oldest = waitingPipelines.minBy { it.buildTime!! }!!
val newest = waitingPipelines.maxBy { it.buildTime!! }!!

and("the queue should not be purged") {
given("the queue should not be purged") {
beforeGroup {
whenever(
repository.retrievePipelinesForPipelineConfigId(
Expand All @@ -139,7 +165,7 @@ object StartWaitingExecutionsHandlerTest : SubjectSpek<StartWaitingExecutionsHan
}
}

and("the queue should be purged") {
given("the queue should be purged") {
beforeGroup {
whenever(
repository.retrievePipelinesForPipelineConfigId(
Expand All @@ -160,9 +186,7 @@ object StartWaitingExecutionsHandlerTest : SubjectSpek<StartWaitingExecutionsHan
}

it("cancels all the other waiting pipelines") {
(waitingPipelines - newest).forEach {
verify(queue).push(CancelExecution(it))
}
verify(queue, times(waitingPipelines.size - 1)).push(isA<CancelExecution>())
}

it("does not cancel the one it's trying to start") {
Expand Down

0 comments on commit 869e6ee

Please sign in to comment.