Skip to content

Commit

Permalink
Tests housekeeping
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Nov 11, 2014
1 parent c338f37 commit 75ed451
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 104 deletions.
6 changes: 3 additions & 3 deletions run-tests
Expand Up @@ -14,9 +14,9 @@ else
lein test-auto
else
export TIMBRE_LOG_LEVEL="trace"
lein with-profile +1.5,+test expectations "taoensso.carmine.tests.tundra"
lein with-profile +1.5,+test expectations "taoensso.carmine.tests.main"
lein with-profile +1.5,+test expectations "taoensso.carmine.tests.message-queue"
# lein expectations "taoensso.carmine.tests.tundra"
# lein expectations "taoensso.carmine.tests.main"
# lein expectations "taoensso.carmine.tests.message-queue"
lein test-all
fi
fi
18 changes: 14 additions & 4 deletions src/taoensso/carmine/message_queue.clj
Expand Up @@ -46,10 +46,20 @@
(def qkey "Prefixed queue key" (memoize (partial car/key :carmine :mq)))

(defn clear-queues [conn-opts & qnames]
(wcar conn-opts
(doseq [qname qnames]
(when-let [qks (seq (wcar conn-opts (car/keys (qkey qname :*))))]
(apply car/del qks)))))
(when (seq qnames)
(wcar conn-opts
(doseq [qname qnames]
(let [qk (partial qkey qname)]
(car/del
(qk :messages)
(qk :locks)
(qk :backoffs)
(qk :nattempts)
(qk :mid-circle)
(qk :done)
(qk :requeue)
(qk :eoq-backoff?)
(qk :ndry-runs)))))))

(defn queue-status [conn-opts qname]
(let [qk (partial qkey qname)]
Expand Down
198 changes: 101 additions & 97 deletions test/taoensso/carmine/tests/message_queue.clj
@@ -1,131 +1,135 @@
(ns taoensso.carmine.tests.message-queue
(:require [expectations :as test :refer :all]
[taoensso.carmine :as car :refer (wcar)]
(:require [expectations :as tests :refer :all]
[taoensso.carmine :as car :refer (wcar)]
[taoensso.carmine.message-queue :as mq]))

(comment (test/run-tests '[taoensso.carmine.tests.message-queue]))
(comment
(remove-ns 'taoensso.carmine.tests.message-queue)
(tests/run-tests '[taoensso.carmine.tests.message-queue]))

(defmacro wcar* [& body] `(car/wcar {} ~@body))
(def tq :carmine-test-queue)
(def ^:private tq :carmine-test-queue)
(def ^:private conn-opts {})

(defn- before-run {:expectations-options :before-run} [] (mq/clear-queues {} tq))
(defn- after-run {:expectations-options :after-run} [] (mq/clear-queues {} tq))
(defn- clear-tq [] (mq/clear-queues conn-opts tq))
(defn- tq-status [] (mq/queue-status conn-opts tq))
(defn- before-run {:expectations-options :before-run} [] (clear-tq))
(defn- after-run {:expectations-options :after-run} [] (clear-tq))

(defn- dequeue*
"Like `mq/dequeue` but has a constant (175ms) eoq-backoff-ms and always sleeps
the same amount before returning."
[qname & [opts]]
(defmacro wcar* [& body] `(car/wcar conn-opts ~@body))

(defn- dequeue* [qname & [opts]]
(let [r (mq/dequeue qname (merge {:eoq-backoff-ms 175} opts))]
(Thread/sleep 205) r))

(defmacro expect* [n e a]
`(expect e (do (println (str ~n ":" (tq-status))) ~a)))

(expect (do (println
(str "Running message queue tests\n"
"NB: These don't seem to work properly from the command line (Ref. http://goo.gl/QZtEzn)"))
true))

;;;; Basic enqueuing & dequeuing
(expect (constantly true) (mq/clear-queues {} tq))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect {:messages {"mid1" :msg1}, :mid-circle ["mid1" "end-of-circle"]}
(in (mq/queue-status {} tq)))
(expect :queued (wcar {} (mq/message-status tq :mid1)))
(expect {:carmine.mq/error :queued} (wcar {} (mq/enqueue tq :msg1 :mid1))) ; Dupe
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq))) ; New msg
(expect :locked (wcar {} (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect nil (wcar {} (dequeue* tq))) ; Locked msg
(expect "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq))))
(expect "mid1" (wcar* (mq/enqueue tq :msg1 :mid1)))
(expect {:messages {"mid1" :msg1},
:mid-circle ["mid1" "end-of-circle"]} (in (tq-status)))
(expect :queued (wcar* (mq/message-status tq :mid1)))
(expect {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :mid1))) ; Dupe
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar* (dequeue* tq))) ; New msg
(expect :locked (wcar* (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect nil (wcar* (dequeue* tq))) ; Locked msg

;;;; Handling: success
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
;; (expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
;; (expect "eoq-backoff" (wcar* (dequeue* tq)))
;; Handler will *not* run against eoq-backoff/nil reply:
(expect nil (mq/handle1 {} tq nil (wcar {} (dequeue* tq))))
(expect nil (mq/handle1 conn-opts tq nil (wcar* (dequeue* tq))))
(expect {:mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
(mq/handle1 {} tq #(do (deliver p %) {:status :success})
(wcar {} (dequeue* tq)))
@p))
(expect :done-awaiting-gc (wcar {} (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect nil (wcar {} (dequeue* tq))) ; Will gc
(expect nil (wcar {} (mq/message-status tq :mid1)))
(let [p (promise)]
(mq/handle1 conn-opts tq #(do (deliver p %) {:status :success})
(wcar* (dequeue* tq)))
@p))
(expect :done-awaiting-gc (wcar* (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect nil (wcar* (dequeue* tq))) ; Will gc
(expect nil (wcar* (mq/message-status tq :mid1)))

;;;; Handling: handler crash
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 1]
(wcar {} (dequeue* tq {:lock-ms 3000}))) ; Simulates bad handler
(expect :locked (wcar {} (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(wcar* (dequeue* tq {:lock-ms 3000}))) ; Simulates bad handler
(expect :locked (wcar* (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for lock to expire
(wcar {} (dequeue* tq))))
(wcar* (dequeue* tq))))

;;;; Handling: retry with backoff
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect {:mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
(mq/handle1 {} tq #(do (deliver p %) {:status :retry :backoff-ms 3000})
(wcar {} (dequeue* tq)))
@p))
(expect :queued-with-backoff (wcar {} (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect nil (wcar {} (dequeue* tq))) ; Backoff (< 3s)
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar {} (dequeue* tq))))
(let [p (promise)]
(mq/handle1 conn-opts tq #(do (deliver p %) {:status :retry :backoff-ms 3000})
(wcar* (dequeue* tq)))
@p))
(expect :queued-with-backoff (wcar* (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect nil (wcar* (dequeue* tq))) ; Backoff (< 3s)
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (dequeue* tq))))

;;;; Handling: success with backoff (dedupe)
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect {:mid "mid1" :message :msg1, :attempt 1}
(let [p (promise)]
(mq/handle1 {} tq #(do (deliver p %) {:status :success :backoff-ms 3000})
(wcar {} (dequeue* tq)))
@p))
(expect :done-with-backoff (wcar {} (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect nil (wcar {} (dequeue* tq))) ; Will gc
(expect :done-with-backoff (wcar {} (mq/message-status tq :mid1))) ; Backoff (< 3s)
(let [p (promise)]
(mq/handle1 conn-opts tq #(do (deliver p %) {:status :success :backoff-ms 3000})
(wcar* (dequeue* tq)))
@p))
(expect :done-with-backoff (wcar* (mq/message-status tq :mid1)))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect nil (wcar* (dequeue* tq))) ; Will gc
(expect :done-with-backoff (wcar* (mq/message-status tq :mid1))) ; Backoff (< 3s)
(expect {:carmine.mq/error :done-with-backoff}
(wcar {} (mq/enqueue tq :msg1 :mid1))) ; Dupe
(wcar* (mq/enqueue tq :msg1 :mid1))) ; Dupe
(expect "mid1" (do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar {} (mq/enqueue tq :msg1 :mid1))))
(wcar* (mq/enqueue tq :msg1 :mid1))))

;;;; Handling: enqueue while :locked
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect :locked (do (-> (mq/handle1 {} tq (fn [_] (Thread/sleep 3000) ; Hold lock
{:status :success})
(wcar {} (dequeue* tq)))
(future))
(Thread/sleep 20)
(wcar {} (mq/message-status tq :mid1))))
(expect {:carmine.mq/error :locked} (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1 :allow-requeue)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect :locked (do (future
(mq/handle1 conn-opts tq (fn [_] (Thread/sleep 3000) ; Hold lock
{:status :success})
(wcar* (dequeue* tq))))
(Thread/sleep 50)
(wcar* (mq/message-status tq :mid1))))
(expect {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :mid1)))
(expect "mid1" (wcar* (mq/enqueue tq :msg1 :mid1 :allow-requeue)))
(expect {:carmine.mq/error :locked-with-requeue}
(wcar {} (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))
(wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))
(expect :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!)
(wcar {} (mq/message-status tq :mid1))))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq)))
(do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!)
(wcar* (mq/message-status tq :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar* (dequeue* tq)))

;;;; Handling: enqueue while :done-with-backoff
(expect (constantly true) (mq/clear-queues {} tq))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect :done-with-backoff
(do (mq/handle1 {} tq (fn [_] {:status :success :backoff-ms 3000})
(wcar {} (dequeue* tq)))
(Thread/sleep 20)
(wcar {} (mq/message-status tq :mid1))))
(expect {:carmine.mq/error :done-with-backoff} (wcar {} (mq/enqueue tq :msg1 :mid1)))
(expect "mid1" (wcar {} (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))
(do (mq/handle1 conn-opts tq (fn [_] {:status :success :backoff-ms 3000})
(wcar* (dequeue* tq)))
(Thread/sleep 20)
(wcar* (mq/message-status tq :mid1))))
(expect {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :mid1)))
(expect "mid1" (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue)))
(expect :queued ; cmp :done-awaiting-gc
(do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar {} (mq/message-status tq :mid1))))
(expect "eoq-backoff" (wcar {} (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq)))
(do (Thread/sleep 3000) ; Wait for backoff to expire
(wcar* (mq/message-status tq :mid1))))
(expect "eoq-backoff" (wcar* (dequeue* tq)))
(expect ["mid1" :msg1 1] (wcar* (dequeue* tq)))

0 comments on commit 75ed451

Please sign in to comment.