Skip to content

Commit

Permalink
fix(queue): handle sub-classes of message types (#1308)
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed Apr 28, 2017
1 parent de38d1e commit 84d1e3f
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,12 @@ open class QueueProcessor
@Autowired constructor(
private val queue: Queue,
@Qualifier("messageHandlerPool") private val executor: Executor,
val registry: Registry,
handlers: Collection<MessageHandler<*>>
private val registry: Registry,
private val handlers: Collection<MessageHandler<*>>
) : DiscoveryActivated {

override val log: Logger = getLogger(javaClass)
override val enabled = AtomicBoolean(false)
private val handlers = handlers.associate { Pair(it.messageType, it) }

private val pollOpsRateId = registry.createId("orca.nu.worker.pollOpsRate")
private val pollErrorRateId = registry.createId("orca.nu.worker.pollErrorRate")
Expand All @@ -51,7 +50,7 @@ open class QueueProcessor

queue.poll { message, ack ->
log.info("Received message $message")
val handler = handlers[message.javaClass]
val handler = handlerFor(message)
if (handler != null) {
executor.execute {
handler.invoke(message)
Expand All @@ -66,6 +65,16 @@ open class QueueProcessor
}
}

private val handlerCache = mutableMapOf<Class<out Message>, MessageHandler<*>>()

private fun handlerFor(message: Message) =
handlerCache[message.javaClass]
.let { handler ->
handler ?: handlers
.find { it.messageType.isAssignableFrom(message.javaClass) }
?.also { handlerCache[message.javaClass] = it }
}

@PostConstruct fun confirmQueueType() =
log.info("Using ${queue.javaClass.simpleName} queue")
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,25 +37,22 @@ class QueueProcessorSpec : Spek({
describe("execution workers") {
val queue: Queue = mock()
val startExecutionHandler: MessageHandler<StartExecution> = mock()
val completeExecutionHandler: MessageHandler<CompleteExecution> = mock()
val configurationErrorHandler: MessageHandler<ConfigurationError> = mock()
val registry: Registry = mock {
on { createId(any<String>()) }.thenReturn(mock<Id>())
on { counter(any<Id>()) }.thenReturn(mock<Counter>())
on { createId(any<String>()) } doReturn mock<Id>()
on { counter(any<Id>()) } doReturn mock<Counter>()
}

var queueProcessor: QueueProcessor? = null

fun resetMocks() = reset(queue, startExecutionHandler, completeExecutionHandler)
fun resetMocks() = reset(queue, startExecutionHandler, configurationErrorHandler)

beforeGroup {
whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java
whenever(completeExecutionHandler.messageType) doReturn CompleteExecution::class.java

queueProcessor = QueueProcessor(
queue,
directExecutor(),
registry,
listOf(startExecutionHandler, completeExecutionHandler)
listOf(startExecutionHandler, configurationErrorHandler)
)
}

Expand Down Expand Up @@ -87,6 +84,9 @@ class QueueProcessorSpec : Spek({
val message = StartExecution(Pipeline::class.java, "1", "foo")

beforeGroup {
whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java
whenever(configurationErrorHandler.messageType) doReturn ConfigurationError::class.java

whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
Expand All @@ -105,14 +105,46 @@ class QueueProcessorSpec : Spek({
}

it("does not invoke other handlers") {
verifyZeroInteractions(completeExecutionHandler)
verify(configurationErrorHandler, never()).invoke(any())
}
}

context("it is a subclass of a supported message type") {
val message = InvalidExecutionId(Pipeline::class.java, "1", "foo")

beforeGroup {
whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java
whenever(configurationErrorHandler.messageType) doReturn ConfigurationError::class.java

whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
callback.invoke(message, {})
}
}

afterGroup(::resetMocks)

action("the worker polls the queue") {
queueProcessor!!.pollOnce()
}

it("passes the message to the correct handler") {
verify(configurationErrorHandler).invoke(eq(message))
}

it("does not invoke other handlers") {
verify(startExecutionHandler, never()).invoke(any())
}
}

context("it is an unsupported message type") {
val message = StartStage(Pipeline::class.java, "1", "foo", "1")

beforeGroup {
whenever(startExecutionHandler.messageType) doReturn StartExecution::class.java
whenever(configurationErrorHandler.messageType) doReturn ConfigurationError::class.java

whenever(queue.poll(any())).then {
@Suppress("UNCHECKED_CAST")
val callback = it.arguments.first() as QueueCallback
Expand All @@ -127,10 +159,8 @@ class QueueProcessorSpec : Spek({
}

it("does not invoke any handlers") {
verifyZeroInteractions(
startExecutionHandler,
completeExecutionHandler
)
verify(startExecutionHandler, never()).invoke(any())
verify(configurationErrorHandler, never()).invoke(any())
}
}
}
Expand Down

0 comments on commit 84d1e3f

Please sign in to comment.