Skip to content

Commit

Permalink
fix(core): missed a field with null-safety fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 8, 2017
1 parent f359591 commit 5ad5981
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -622,8 +622,10 @@ class JedisExecutionRepository implements ExecutionRepository {
stage.id = stageId

// TODO: temp debug
if (map["stage.${stageId}.status".toString()] == null) {
log.warn("Stage $stageId data is missing ${map.findAll { k, v -> k.startsWith("stage.$stageId") }}")
if (map.keySet().findAll {
it.startsWith("stage.${stageId}")
}.isEmpty()) {
log.warn("Stage data is missing for $stageId of execution $id")
}

stage.refId = map["stage.${stageId}.refId".toString()]
Expand All @@ -632,7 +634,7 @@ class JedisExecutionRepository implements ExecutionRepository {
stage.startTime = map["stage.${stageId}.startTime".toString()]?.toLong()
stage.endTime = map["stage.${stageId}.endTime".toString()]?.toLong()
stage.status = map["stage.${stageId}.status".toString()] ? ExecutionStatus.valueOf(map["stage.${stageId}.status".toString()]) : null
stage.initializationStage = map["stage.${stageId}.initializationStage".toString()].toBoolean()
stage.initializationStage = map["stage.${stageId}.initializationStage".toString()]?.toBoolean() ?: false
stage.syntheticStageOwner = map["stage.${stageId}.syntheticStageOwner".toString()] ? SyntheticStageOwner.valueOf(map["stage.${stageId}.syntheticStageOwner".toString()]) : null
stage.parentStageId = map["stage.${stageId}.parentStageId".toString()]
stage.requisiteStageRefIds = map["stage.${stageId}.requisiteStageRefIds".toString()]?.tokenize(",") ?: emptySet()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package com.netflix.spinnaker.orca.q

import com.google.common.util.concurrent.MoreExecutors.directExecutor
import com.natpryce.hamkrest.assertion.assertThat
import com.natpryce.hamkrest.throws
import com.netflix.appinfo.InstanceInfo.InstanceStatus.OUT_OF_SERVICE
Expand All @@ -32,25 +31,29 @@ import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.context
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executor
import java.util.concurrent.Executors

class QueueProcessorSpec : Spek({
describe("execution workers") {
val queue: Queue = mock()
val startExecutionHandler: MessageHandler<StartExecution> = mock()
val configurationErrorHandler: MessageHandler<ConfigurationError> = mock()
val ackFunction: () -> Unit = mock()
val registry: Registry = mock {
on { createId(any<String>()) } doReturn mock<Id>()
on { counter(any<Id>()) } doReturn mock<Counter>()
}

var queueProcessor: QueueProcessor? = null

fun resetMocks() = reset(queue, startExecutionHandler, configurationErrorHandler)
fun resetMocks() = reset(queue, startExecutionHandler, configurationErrorHandler, ackFunction)

beforeGroup {
queueProcessor = QueueProcessor(
queue,
directExecutor(),
BlockingThreadExecutor(),
registry,
listOf(startExecutionHandler, configurationErrorHandler)
)
Expand Down Expand Up @@ -90,7 +93,7 @@ class QueueProcessorSpec : Spek({
whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
callback.invoke(message, {})
callback.invoke(message, ackFunction)
}
}

Expand All @@ -107,6 +110,10 @@ class QueueProcessorSpec : Spek({
it("does not invoke other handlers") {
verify(configurationErrorHandler, never()).invoke(any())
}

it("acknowledges the message") {
verify(ackFunction).invoke()
}
}

context("it is a subclass of a supported message type") {
Expand All @@ -119,7 +126,7 @@ class QueueProcessorSpec : Spek({
whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
callback.invoke(message, {})
callback.invoke(message, ackFunction)
}
}

Expand All @@ -136,6 +143,10 @@ class QueueProcessorSpec : Spek({
it("does not invoke other handlers") {
verify(startExecutionHandler, never()).invoke(any())
}

it("acknowledges the message") {
verify(ackFunction).invoke()
}
}

context("it is an unsupported message type") {
Expand All @@ -148,7 +159,7 @@ class QueueProcessorSpec : Spek({
whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
callback.invoke(message, {})
callback.invoke(message, ackFunction)
}
}

Expand All @@ -162,8 +173,56 @@ class QueueProcessorSpec : Spek({
verify(startExecutionHandler, never()).invoke(any())
verify(configurationErrorHandler, never()).invoke(any())
}

it("does not acknowledge the message") {
verify(ackFunction, never()).invoke()
}
}

context("the handler throws an exception") {
val message = StartExecution(Pipeline::class.java, "1", "foo")

beforeGroup {
whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java
whenever(configurationErrorHandler.messageType) doReturn ConfigurationError::class.java

whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
callback.invoke(message, ackFunction)
}

whenever(startExecutionHandler.invoke(any())) doThrow NullPointerException()
}

afterGroup(::resetMocks)

action("the worker polls the queue") {
queueProcessor!!.pollOnce()
}

it("does not acknowledge the message") {
verify(ackFunction, never()).invoke()
}
}
}
}
}
})

class BlockingThreadExecutor : Executor {

private val delegate = Executors.newSingleThreadExecutor()

override fun execute(command: Runnable) {
val latch = CountDownLatch(1)
delegate.execute {
try {
command.run()
} finally {
latch.countDown()
}
}
latch.await()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.nhaarman.mockito_kotlin.*
import org.jetbrains.spek.api.Spek
import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.junit.jupiter.api.Assertions.assertThrows
import org.springframework.context.ApplicationEventPublisher

class StartTaskHandlerSpec : Spek({
Expand Down Expand Up @@ -90,4 +91,26 @@ class StartTaskHandlerSpec : Spek({
}
}
}

describe("when the execution repository has a problem") {
val pipeline = pipeline {
stage {
type = singleTaskStage.type
singleTaskStage.buildTasks(this)
}
}
val message = StartTask(Pipeline::class.java, pipeline.id, "foo", pipeline.stages.first().id, "1")

beforeGroup {
whenever(repository.retrievePipeline(message.executionId)) doThrow NullPointerException()
}

afterGroup(::resetMocks)

it("propagates any exception") {
assertThrows(NullPointerException::class.java) {
handler.handle(message)
}
}
}
})

0 comments on commit 5ad5981

Please sign in to comment.