Skip to content

Commit

Permalink
fix(queue): stop dead-lettering messages after it's done once, d'oh!
Browse files Browse the repository at this point in the history
  • Loading branch information
robfletcher committed May 2, 2017
1 parent 5e22edf commit 5e0ea45
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,14 @@ class RedisQueue(
}

private fun ack(id: UUID) {
ack(id.toString())
}

private fun ack(id: String) {
pool.resource.use { redis ->
redis.zrem(unackedKey, id.toString())
redis.hdel(messagesKey, id.toString())
redis.hdel(attemptsKey, id.toString())
redis.zrem(unackedKey, id)
redis.hdel(messagesKey, id)
redis.hdel(attemptsKey, id)
}
}

Expand All @@ -101,16 +105,19 @@ class RedisQueue(
zrangeByScore(unackedKey, 0.0, score())
.let { ids ->
if (ids.size > 0) {
log.warn("Redelivering ${ids.size} messages")
ids.map { "$locksKey:$it" }.let { del(*it.toTypedArray()) }
}

ids.forEach { id ->
if (hgetInt(attemptsKey, id) >= Queue.maxRedeliveries) {
val attempts = hgetInt(attemptsKey, id)
if (attempts >= Queue.maxRedeliveries) {
log.warn("Message $id with payload ${hget(messagesKey, id)} exceeded max re-deliveries")
hget(messagesKey, id)
.let { json -> mapper.readValue<Message>(json) }
.let { handleDeadMessage(it) }
.also { ack(id) }
} else {
log.warn("Re-delivering message $id after $attempts attempts")
move(unackedKey, queueKey, ZERO, setOf(id))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ interface Queue {
* The maximum number of times an un-acknowledged message will be re-delivered
* before failing permanently.
*/
val maxRedeliveries: Int = 10
val maxRedeliveries: Int = 5
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,23 @@ abstract class QueueSpec<out Q : Queue>(
it("passes the failed message to the dead letter handler") {
verify(deadLetterCallback).invoke(queue!!, message)
}

context("once the message has been dead-lettered") {
action("the next time re-delivery checks happen") {
queue!!.apply {
triggerRedeliveryCheck()
poll(callback)
}
}

it("it does not get redelivered again") {
verifyZeroInteractions(callback)
}

it("no longer gets sent to the dead letter handler") {
verify(deadLetterCallback).invoke(queue!!, message)
}
}
}
}
}) {
Expand Down

0 comments on commit 5e0ea45

Please sign in to comment.