Permalink
Browse files

if a queue item can't be decoded, confirm the remove (so you don't tr…

…y again later), log an error, and move on.
  • Loading branch information...
1 parent 3f96894 commit 85abcbcf84d23790ceba009afe34a473f905fbaf Robey Pointer committed Dec 10, 2010
@@ -190,7 +190,13 @@ class JobScheduler[J <: Job](val name: String,
}
def process() {
- queue.get().foreach { ticket =>
+ (try {
+ queue.get()
+ } catch {
+ case e: Exception =>
+ log.error(e, "Unable to parse job")
+ None
+ }).foreach { ticket =>
val job = ticket.job
try {
job()
@@ -76,7 +76,14 @@ class KestrelJobQueue[J <: Job](queueName: String, val queue: PersistentQueue, c
item = queue.removeReceive(System.currentTimeMillis + TIMEOUT, true)
}
item.map { qitem =>
- val decoded = codec.inflate(qitem.data)
+ val decoded = try {
+ codec.inflate(qitem.data)
+ } catch {
+ case e: Throwable =>
+ // outer layers should handle this exception, but make sure it's fully removed.
+ queue.confirmRemove(qitem.xid)
+ throw e
+ }
new Ticket[J] {
def job = decoded
def ack() {
@@ -216,6 +216,14 @@ class JobSchedulerSpec extends ConfiguredSpecification with JMocker with ClassMo
jobScheduler.process()
}
+
+ "decoder error" in {
+ expect {
+ one(queue).get() willThrow new UnparsableJsonException("Unparsable json", null)
+ }
+
+ jobScheduler.process()
+ }
}
}
}
@@ -123,6 +123,17 @@ object KestrelJobQueueSpec extends ConfiguredSpecification with JMocker with Cla
ticket must beSome[Ticket[Job]].which { _.job == job1 }
ticket.get.ack()
}
+
+ "item can't be decoded" in {
+ expect {
+ allowing(queue).isClosed willReturn false
+ one(queue).removeReceive(any[Long], any[Boolean]) willReturn Some(QItem(0, 0, "abc".getBytes, 900))
+ one(codec).inflate("abc".getBytes) willThrow new UnparsableJsonException("Unparsable json", null)
+ one(queue).confirmRemove(900)
+ }
+
+ kestrelJobQueue.get() must throwA[Exception]
+ }
}
"drainTo" in {

0 comments on commit 85abcbc

Please sign in to comment.