Skip to content

Commit

Permalink
fix(queue): ensure executor doing message re-delivery doesn't die
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 9, 2017
1 parent 5ad5981 commit 73a41c3
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -591,6 +591,8 @@ class JedisExecutionRepository implements ExecutionRepository {
def key = "$prefix:$id"
if (jedis.exists(key)) {
Map<String, String> map = jedis.hgetAll(key)
def stageIds = jedis.exists("$key:stageIndex") ? jedis.lrange("$key:stageIndex", 0, -1) : (map.stageIndex ?: "").tokenize(",")

def execution = type.newInstance()
execution.id = id
execution.application = map.application
Expand All @@ -615,7 +617,6 @@ class JedisExecutionRepository implements ExecutionRepository {
execution.executionEngine = DEFAULT_EXECUTION_ENGINE
}

def stageIds = jedis.exists("$key:stageIndex") ? jedis.lrange("$key:stageIndex", 0, -1) : (map.stageIndex ?: "").tokenize(",")
stageIds.each { stageId ->
def stage = new Stage<>()
stage.stageNavigator = stageNavigator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import redis.clients.jedis.JedisCommands
import redis.clients.jedis.Transaction
import redis.clients.util.Pool
import java.io.Closeable
import java.io.IOException
import java.time.Clock
import java.time.Duration
import java.time.Duration.ZERO
import java.time.temporal.TemporalAmount
import java.util.*
import java.util.UUID.randomUUID
import javax.annotation.PreDestroy

Expand Down Expand Up @@ -63,12 +63,12 @@ class RedisQueue(
pool.resource.use { redis ->
redis.apply {
pop(queueKey, unackedKey, ackTimeout)
?.also { hincrBy(attemptsKey, it, 1) }
?.let { id -> Pair(UUID.fromString(id), hget(messagesKey, id)) }
?.let { (id, json) -> Pair(id, mapper.readValue<Message>(json)) }
?.let { (id, payload) ->
callback.invoke(payload) {
ack(id)
?.also { id -> hincrBy(attemptsKey, id, 1) }
?.let { id ->
readMessage(id) { payload ->
callback.invoke(payload) {
ack(id)
}
}
}
}
Expand All @@ -90,10 +90,6 @@ class RedisQueue(
redeliveryWatcher.close()
}

private fun ack(id: UUID) {
ack(id.toString())
}

private fun ack(id: String) {
pool.resource.use { redis ->
redis.multi {
Expand All @@ -116,11 +112,11 @@ class RedisQueue(
ids.forEach { id ->
val attempts = hgetInt(attemptsKey, id)
if (attempts >= Queue.maxRedeliveries) {
log.warn("Message $id with payload ${hget(messagesKey, id)} exceeded max re-deliveries")
hget(messagesKey, id)
.let { json -> mapper.readValue<Message>(json) }
.let { handleDeadMessage(it) }
.also { ack(id) }
readMessage(id) { message ->
log.warn("Message $id with payload $message exceeded max re-deliveries")
handleDeadMessage(message)
ack(id)
}
} else {
log.warn("Re-delivering message $id after $attempts attempts")
move(unackedKey, queueKey, ZERO, setOf(id))
Expand All @@ -131,6 +127,36 @@ class RedisQueue(
}
}

/**
* Tries to read the message with the specified [id] passing it to [block].
* If it's not accessible for whatever reason any references are cleaned up.
*/
private fun Jedis.readMessage(id: String, block: (Message) -> Unit) {
val json = hget(messagesKey, id)
if (json == null) {
log.error("Payload for message $id is missing")
// clean up what is essentially an unrecoverable message
multi {
zrem(queueKey, id)
zrem(unackedKey, id)
hdel(attemptsKey, id)
}
} else {
try {
val message = mapper.readValue<Message>(json)
block.invoke(message)
} catch(e: IOException) {
log.error("Failed to read message $id", e)
multi {
zrem(queueKey, id)
zrem(unackedKey, id)
hdel(messagesKey, id)
hdel(attemptsKey, id)
}
}
}
}

private fun handleDeadMessage(it: Message) {
deadMessageHandler.invoke(this, it)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.netflix.spinnaker.orca.q

import org.slf4j.LoggerFactory
import java.io.Closeable
import java.util.concurrent.Executors.newSingleThreadScheduledExecutor
import java.util.concurrent.TimeUnit
Expand All @@ -29,15 +30,25 @@ import java.util.concurrent.TimeUnit.SECONDS
* dedicated thread.
*/
class ScheduledAction(
action: () -> Unit,
private val action: () -> Unit,
initialDelay: Long = 10,
delay: Long = 10,
unit: TimeUnit = SECONDS
) : Closeable {

private val executor = newSingleThreadScheduledExecutor()
private val watcher = executor
.scheduleWithFixedDelay(action, initialDelay, delay, unit)
.scheduleWithFixedDelay({
try {
action.invoke()
} catch(e: Exception) {
// this really indicates a code issue but if it's not caught here it
// will kill the scheduled action.
log.error("Uncaught exception in scheduled action", e)
}
}, initialDelay, delay, unit)

private val log = LoggerFactory.getLogger(javaClass)

override fun close() {
watcher.cancel(false)
Expand Down

0 comments on commit 73a41c3

Please sign in to comment.