diff --git a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepository.groovy b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepository.groovy index 3af03ede0d..14925632e0 100644 --- a/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepository.groovy +++ b/orca-core/src/main/groovy/com/netflix/spinnaker/orca/pipeline/persistence/jedis/JedisExecutionRepository.groovy @@ -622,8 +622,10 @@ class JedisExecutionRepository implements ExecutionRepository { stage.id = stageId // TODO: temp debug - if (map["stage.${stageId}.status".toString()] == null) { - log.warn("Stage $stageId data is missing ${map.findAll { k, v -> k.startsWith("stage.$stageId") }}") + if (map.keySet().findAll { + it.startsWith("stage.${stageId}") + }.isEmpty()) { + log.warn("Stage data is missing for $stageId of execution $id") } stage.refId = map["stage.${stageId}.refId".toString()] @@ -632,7 +634,7 @@ class JedisExecutionRepository implements ExecutionRepository { stage.startTime = map["stage.${stageId}.startTime".toString()]?.toLong() stage.endTime = map["stage.${stageId}.endTime".toString()]?.toLong() stage.status = map["stage.${stageId}.status".toString()] ? ExecutionStatus.valueOf(map["stage.${stageId}.status".toString()]) : null - stage.initializationStage = map["stage.${stageId}.initializationStage".toString()].toBoolean() + stage.initializationStage = map["stage.${stageId}.initializationStage".toString()]?.toBoolean() ?: false stage.syntheticStageOwner = map["stage.${stageId}.syntheticStageOwner".toString()] ? SyntheticStageOwner.valueOf(map["stage.${stageId}.syntheticStageOwner".toString()]) : null stage.parentStageId = map["stage.${stageId}.parentStageId".toString()] stage.requisiteStageRefIds = map["stage.${stageId}.requisiteStageRefIds".toString()]?.tokenize(",") ?: emptySet() diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueProcessorSpec.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueProcessorSpec.kt index b695a3393f..e8f9702817 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueProcessorSpec.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/QueueProcessorSpec.kt @@ -16,7 +16,6 @@ package com.netflix.spinnaker.orca.q -import com.google.common.util.concurrent.MoreExecutors.directExecutor import com.natpryce.hamkrest.assertion.assertThat import com.natpryce.hamkrest.throws import com.netflix.appinfo.InstanceInfo.InstanceStatus.OUT_OF_SERVICE @@ -32,12 +31,16 @@ import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.context import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executor +import java.util.concurrent.Executors class QueueProcessorSpec : Spek({ describe("execution workers") { val queue: Queue = mock() val startExecutionHandler: MessageHandler = mock() val configurationErrorHandler: MessageHandler = mock() + val ackFunction: () -> Unit = mock() val registry: Registry = mock { on { createId(any()) } doReturn mock() on { counter(any()) } doReturn mock() @@ -45,12 +48,12 @@ class QueueProcessorSpec : Spek({ var queueProcessor: QueueProcessor? = null - fun resetMocks() = reset(queue, startExecutionHandler, configurationErrorHandler) + fun resetMocks() = reset(queue, startExecutionHandler, configurationErrorHandler, ackFunction) beforeGroup { queueProcessor = QueueProcessor( queue, - directExecutor(), + BlockingThreadExecutor(), registry, listOf(startExecutionHandler, configurationErrorHandler) ) @@ -90,7 +93,7 @@ class QueueProcessorSpec : Spek({ whenever(queue.poll(any())).then { @Suppress("UNCHECKED_CAST") val callback = it.arguments.first() as QueueCallback - callback.invoke(message, {}) + callback.invoke(message, ackFunction) } } @@ -107,6 +110,10 @@ class QueueProcessorSpec : Spek({ it("does not invoke other handlers") { verify(configurationErrorHandler, never()).invoke(any()) } + + it("acknowledges the message") { + verify(ackFunction).invoke() + } } context("it is a subclass of a supported message type") { @@ -119,7 +126,7 @@ class QueueProcessorSpec : Spek({ whenever(queue.poll(any())).then { @Suppress("UNCHECKED_CAST") val callback = it.arguments.first() as QueueCallback - callback.invoke(message, {}) + callback.invoke(message, ackFunction) } } @@ -136,6 +143,10 @@ class QueueProcessorSpec : Spek({ it("does not invoke other handlers") { verify(startExecutionHandler, never()).invoke(any()) } + + it("acknowledges the message") { + verify(ackFunction).invoke() + } } context("it is an unsupported message type") { @@ -148,7 +159,7 @@ class QueueProcessorSpec : Spek({ whenever(queue.poll(any())).then { @Suppress("UNCHECKED_CAST") val callback = it.arguments.first() as QueueCallback - callback.invoke(message, {}) + callback.invoke(message, ackFunction) } } @@ -162,8 +173,56 @@ class QueueProcessorSpec : Spek({ verify(startExecutionHandler, never()).invoke(any()) verify(configurationErrorHandler, never()).invoke(any()) } + + it("does not acknowledge the message") { + verify(ackFunction, never()).invoke() + } + } + + context("the handler throws an exception") { + val message = StartExecution(Pipeline::class.java, "1", "foo") + + beforeGroup { + whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java + whenever(configurationErrorHandler.messageType) doReturn ConfigurationError::class.java + + whenever(queue.poll(any())).then { + @Suppress("UNCHECKED_CAST") + val callback = it.arguments.first() as QueueCallback + callback.invoke(message, ackFunction) + } + + whenever(startExecutionHandler.invoke(any())) doThrow NullPointerException() + } + + afterGroup(::resetMocks) + + action("the worker polls the queue") { + queueProcessor!!.pollOnce() + } + + it("does not acknowledge the message") { + verify(ackFunction, never()).invoke() + } } } } } }) + +class BlockingThreadExecutor : Executor { + + private val delegate = Executors.newSingleThreadExecutor() + + override fun execute(command: Runnable) { + val latch = CountDownLatch(1) + delegate.execute { + try { + command.run() + } finally { + latch.countDown() + } + } + latch.await() + } +} diff --git a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerSpec.kt b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerSpec.kt index c4d442df90..06408151c6 100644 --- a/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerSpec.kt +++ b/orca-queue/src/test/kotlin/com/netflix/spinnaker/orca/q/handler/StartTaskHandlerSpec.kt @@ -26,6 +26,7 @@ import com.nhaarman.mockito_kotlin.* import org.jetbrains.spek.api.Spek import org.jetbrains.spek.api.dsl.describe import org.jetbrains.spek.api.dsl.it +import org.junit.jupiter.api.Assertions.assertThrows import org.springframework.context.ApplicationEventPublisher class StartTaskHandlerSpec : Spek({ @@ -90,4 +91,26 @@ class StartTaskHandlerSpec : Spek({ } } } + + describe("when the execution repository has a problem") { + val pipeline = pipeline { + stage { + type = singleTaskStage.type + singleTaskStage.buildTasks(this) + } + } + val message = StartTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1") + + beforeGroup { + whenever(repository.retrievePipeline(message.executionId)) doThrow NullPointerException() + } + + afterGroup(::resetMocks) + + it("propagates any exception") { + assertThrows(NullPointerException::class.java) { + handler.handle(message) + } + } + } })