From 75ed4511953a0de7f696d0b56a92bb5f303f7904 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Sun, 9 Nov 2014 19:05:15 +0700 Subject: [PATCH] Tests housekeeping --- run-tests | 6 +- src/taoensso/carmine/message_queue.clj | 18 +- test/taoensso/carmine/tests/message_queue.clj | 198 +++++++++--------- 3 files changed, 118 insertions(+), 104 deletions(-) diff --git a/run-tests b/run-tests index c8fa3384..0b73a49f 100755 --- a/run-tests +++ b/run-tests @@ -14,9 +14,9 @@ else lein test-auto else export TIMBRE_LOG_LEVEL="trace" - lein with-profile +1.5,+test expectations "taoensso.carmine.tests.tundra" - lein with-profile +1.5,+test expectations "taoensso.carmine.tests.main" - lein with-profile +1.5,+test expectations "taoensso.carmine.tests.message-queue" + # lein expectations "taoensso.carmine.tests.tundra" + # lein expectations "taoensso.carmine.tests.main" + # lein expectations "taoensso.carmine.tests.message-queue" lein test-all fi fi diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index 66997c01..1f85de62 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -46,10 +46,20 @@ (def qkey "Prefixed queue key" (memoize (partial car/key :carmine :mq))) (defn clear-queues [conn-opts & qnames] - (wcar conn-opts - (doseq [qname qnames] - (when-let [qks (seq (wcar conn-opts (car/keys (qkey qname :*))))] - (apply car/del qks))))) + (when (seq qnames) + (wcar conn-opts + (doseq [qname qnames] + (let [qk (partial qkey qname)] + (car/del + (qk :messages) + (qk :locks) + (qk :backoffs) + (qk :nattempts) + (qk :mid-circle) + (qk :done) + (qk :requeue) + (qk :eoq-backoff?) + (qk :ndry-runs))))))) (defn queue-status [conn-opts qname] (let [qk (partial qkey qname)] diff --git a/test/taoensso/carmine/tests/message_queue.clj b/test/taoensso/carmine/tests/message_queue.clj index c1fc63a3..e7f0b8d9 100644 --- a/test/taoensso/carmine/tests/message_queue.clj +++ b/test/taoensso/carmine/tests/message_queue.clj @@ -1,131 +1,135 @@ (ns taoensso.carmine.tests.message-queue - (:require [expectations :as test :refer :all] - [taoensso.carmine :as car :refer (wcar)] + (:require [expectations :as tests :refer :all] + [taoensso.carmine :as car :refer (wcar)] [taoensso.carmine.message-queue :as mq])) -(comment (test/run-tests '[taoensso.carmine.tests.message-queue])) +(comment + (remove-ns 'taoensso.carmine.tests.message-queue) + (tests/run-tests '[taoensso.carmine.tests.message-queue])) -(defmacro wcar* [& body] `(car/wcar {} ~@body)) -(def tq :carmine-test-queue) +(def ^:private tq :carmine-test-queue) +(def ^:private conn-opts {}) -(defn- before-run {:expectations-options :before-run} [] (mq/clear-queues {} tq)) -(defn- after-run {:expectations-options :after-run} [] (mq/clear-queues {} tq)) +(defn- clear-tq [] (mq/clear-queues conn-opts tq)) +(defn- tq-status [] (mq/queue-status conn-opts tq)) +(defn- before-run {:expectations-options :before-run} [] (clear-tq)) +(defn- after-run {:expectations-options :after-run} [] (clear-tq)) -(defn- dequeue* - "Like `mq/dequeue` but has a constant (175ms) eoq-backoff-ms and always sleeps - the same amount before returning." - [qname & [opts]] +(defmacro wcar* [& body] `(car/wcar conn-opts ~@body)) + +(defn- dequeue* [qname & [opts]] (let [r (mq/dequeue qname (merge {:eoq-backoff-ms 175} opts))] (Thread/sleep 205) r)) +(defmacro expect* [n e a] + `(expect e (do (println (str ~n ":" (tq-status))) ~a))) + +(expect (do (println + (str "Running message queue tests\n" + "NB: These don't seem to work properly from the command line (Ref. http://goo.gl/QZtEzn)")) + true)) + ;;;; Basic enqueuing & dequeuing -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect {:messages {"mid1" :msg1}, :mid-circle ["mid1" "end-of-circle"]} - (in (mq/queue-status {} tq))) -(expect :queued (wcar {} (mq/message-status tq :mid1))) -(expect {:carmine.mq/error :queued} (wcar {} (mq/enqueue tq :msg1 :mid1))) ; Dupe -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq))) ; New msg -(expect :locked (wcar {} (mq/message-status tq :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect nil (wcar {} (dequeue* tq))) ; Locked msg +(expect "eoq-backoff" (do (clear-tq) (wcar* (dequeue* tq)))) +(expect "mid1" (wcar* (mq/enqueue tq :msg1 :mid1))) +(expect {:messages {"mid1" :msg1}, + :mid-circle ["mid1" "end-of-circle"]} (in (tq-status))) +(expect :queued (wcar* (mq/message-status tq :mid1))) +(expect {:carmine.mq/error :queued} (wcar* (mq/enqueue tq :msg1 :mid1))) ; Dupe +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect ["mid1" :msg1 1] (wcar* (dequeue* tq))) ; New msg +(expect :locked (wcar* (mq/message-status tq :mid1))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect nil (wcar* (dequeue* tq))) ; Locked msg ;;;; Handling: success -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -;; (expect "eoq-backoff" (wcar {} (dequeue* tq))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +;; (expect "eoq-backoff" (wcar* (dequeue* tq))) ;; Handler will *not* run against eoq-backoff/nil reply: -(expect nil (mq/handle1 {} tq nil (wcar {} (dequeue* tq)))) +(expect nil (mq/handle1 conn-opts tq nil (wcar* (dequeue* tq)))) (expect {:mid "mid1" :message :msg1, :attempt 1} - (let [p (promise)] - (mq/handle1 {} tq #(do (deliver p %) {:status :success}) - (wcar {} (dequeue* tq))) - @p)) -(expect :done-awaiting-gc (wcar {} (mq/message-status tq :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect nil (wcar {} (dequeue* tq))) ; Will gc -(expect nil (wcar {} (mq/message-status tq :mid1))) + (let [p (promise)] + (mq/handle1 conn-opts tq #(do (deliver p %) {:status :success}) + (wcar* (dequeue* tq))) + @p)) +(expect :done-awaiting-gc (wcar* (mq/message-status tq :mid1))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect nil (wcar* (dequeue* tq))) ; Will gc +(expect nil (wcar* (mq/message-status tq :mid1))) ;;;; Handling: handler crash -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) (expect ["mid1" :msg1 1] - (wcar {} (dequeue* tq {:lock-ms 3000}))) ; Simulates bad handler -(expect :locked (wcar {} (mq/message-status tq :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) + (wcar* (dequeue* tq {:lock-ms 3000}))) ; Simulates bad handler +(expect :locked (wcar* (mq/message-status tq :mid1))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) (expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for lock to expire - (wcar {} (dequeue* tq)))) + (wcar* (dequeue* tq)))) ;;;; Handling: retry with backoff -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) (expect {:mid "mid1" :message :msg1, :attempt 1} - (let [p (promise)] - (mq/handle1 {} tq #(do (deliver p %) {:status :retry :backoff-ms 3000}) - (wcar {} (dequeue* tq))) - @p)) -(expect :queued-with-backoff (wcar {} (mq/message-status tq :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect nil (wcar {} (dequeue* tq))) ; Backoff (< 3s) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for backoff to expire - (wcar {} (dequeue* tq)))) + (let [p (promise)] + (mq/handle1 conn-opts tq #(do (deliver p %) {:status :retry :backoff-ms 3000}) + (wcar* (dequeue* tq))) + @p)) +(expect :queued-with-backoff (wcar* (mq/message-status tq :mid1))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect nil (wcar* (dequeue* tq))) ; Backoff (< 3s) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect ["mid1" :msg1 2] (do (Thread/sleep 3000) ; Wait for backoff to expire + (wcar* (dequeue* tq)))) ;;;; Handling: success with backoff (dedupe) -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) (expect {:mid "mid1" :message :msg1, :attempt 1} - (let [p (promise)] - (mq/handle1 {} tq #(do (deliver p %) {:status :success :backoff-ms 3000}) - (wcar {} (dequeue* tq))) - @p)) -(expect :done-with-backoff (wcar {} (mq/message-status tq :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect nil (wcar {} (dequeue* tq))) ; Will gc -(expect :done-with-backoff (wcar {} (mq/message-status tq :mid1))) ; Backoff (< 3s) + (let [p (promise)] + (mq/handle1 conn-opts tq #(do (deliver p %) {:status :success :backoff-ms 3000}) + (wcar* (dequeue* tq))) + @p)) +(expect :done-with-backoff (wcar* (mq/message-status tq :mid1))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect nil (wcar* (dequeue* tq))) ; Will gc +(expect :done-with-backoff (wcar* (mq/message-status tq :mid1))) ; Backoff (< 3s) (expect {:carmine.mq/error :done-with-backoff} - (wcar {} (mq/enqueue tq :msg1 :mid1))) ; Dupe + (wcar* (mq/enqueue tq :msg1 :mid1))) ; Dupe (expect "mid1" (do (Thread/sleep 3000) ; Wait for backoff to expire - (wcar {} (mq/enqueue tq :msg1 :mid1)))) + (wcar* (mq/enqueue tq :msg1 :mid1)))) ;;;; Handling: enqueue while :locked -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect :locked (do (-> (mq/handle1 {} tq (fn [_] (Thread/sleep 3000) ; Hold lock - {:status :success}) - (wcar {} (dequeue* tq))) - (future)) - (Thread/sleep 20) - (wcar {} (mq/message-status tq :mid1)))) -(expect {:carmine.mq/error :locked} (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1 :allow-requeue))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect :locked (do (future + (mq/handle1 conn-opts tq (fn [_] (Thread/sleep 3000) ; Hold lock + {:status :success}) + (wcar* (dequeue* tq)))) + (Thread/sleep 50) + (wcar* (mq/message-status tq :mid1)))) +(expect {:carmine.mq/error :locked} (wcar* (mq/enqueue tq :msg1 :mid1))) +(expect "mid1" (wcar* (mq/enqueue tq :msg1 :mid1 :allow-requeue))) (expect {:carmine.mq/error :locked-with-requeue} - (wcar {} (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))) + (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))) (expect :queued ; cmp :done-awaiting-gc - (do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!) - (wcar {} (mq/message-status tq :mid1)))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq))) + (do (Thread/sleep 3500) ; Wait for handler to complete (extra time for future!) + (wcar* (mq/message-status tq :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect ["mid1" :msg1 1] (wcar* (dequeue* tq))) ;;;; Handling: enqueue while :done-with-backoff -(expect (constantly true) (mq/clear-queues {} tq)) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) +(expect "mid1" (do (clear-tq) (wcar* (mq/enqueue tq :msg1 :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) (expect :done-with-backoff - (do (mq/handle1 {} tq (fn [_] {:status :success :backoff-ms 3000}) - (wcar {} (dequeue* tq))) - (Thread/sleep 20) - (wcar {} (mq/message-status tq :mid1)))) -(expect {:carmine.mq/error :done-with-backoff} (wcar {} (mq/enqueue tq :msg1 :mid1))) -(expect "mid1" (wcar {} (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))) + (do (mq/handle1 conn-opts tq (fn [_] {:status :success :backoff-ms 3000}) + (wcar* (dequeue* tq))) + (Thread/sleep 20) + (wcar* (mq/message-status tq :mid1)))) +(expect {:carmine.mq/error :done-with-backoff} (wcar* (mq/enqueue tq :msg1 :mid1))) +(expect "mid1" (wcar* (mq/enqueue tq :msg1-requeued :mid1 :allow-requeue))) (expect :queued ; cmp :done-awaiting-gc - (do (Thread/sleep 3000) ; Wait for backoff to expire - (wcar {} (mq/message-status tq :mid1)))) -(expect "eoq-backoff" (wcar {} (dequeue* tq))) -(expect ["mid1" :msg1 1] (wcar {} (dequeue* tq))) + (do (Thread/sleep 3000) ; Wait for backoff to expire + (wcar* (mq/message-status tq :mid1)))) +(expect "eoq-backoff" (wcar* (dequeue* tq))) +(expect ["mid1" :msg1 1] (wcar* (dequeue* tq)))