From 271ce09fbf9c412bc85b54a5a8b13234ba3605e0 Mon Sep 17 00:00:00 2001 From: Peter Taoussanis Date: Thu, 29 Feb 2024 15:14:50 +0100 Subject: [PATCH] [mod] [mq] Clarify that queue names should be strings --- src/taoensso/carmine/message_queue.clj | 10 +++++++--- src/taoensso/carmine/tundra.clj | 2 +- test/taoensso/carmine/tests/message_queue.clj | 6 +++--- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/src/taoensso/carmine/message_queue.clj b/src/taoensso/carmine/message_queue.clj index 4d55db17..ee51917e 100644 --- a/src/taoensso/carmine/message_queue.clj +++ b/src/taoensso/carmine/message_queue.clj @@ -508,7 +508,9 @@ {:keys [worker queue-size nstats_ ssb-queueing-time-ms ssb-handling-time-ns] :or {queue-size -1}}] - (let [[kind] (when (vector? poll-reply) poll-reply)] + (let [[kind] (when (vector? poll-reply) poll-reply) + qname (enc/as-qname qname)] + (case kind "skip" (let [[_kind reason] poll-reply] @@ -614,7 +616,7 @@ java.io.Closeable (close [this] (stop this)) Object (toString [this] ; "CarmineMessageQueueWorker[nthreads=1w+1h, running]" - (str "CarmineMessageQueueWorker[nthreads=" + (str "CarmineMessageQueueWorker[qname=" qname ", nthreads=" (get worker-opts :nthreads-worker) "w+" (get worker-opts :nthreads-handler) "h, " (if @running?_ "running" "shut down") "]")) @@ -775,7 +777,8 @@ "Returns a worker monitor fn that warns when queue exceeds the prescribed size. A backoff timeout can be provided to rate-limit this warning." [qname max-queue-size warn-backoff-ms] - (let [udt-last-warning_ (atom 0)] + (let [qname (enc/as-qname qname) + udt-last-warning_ (atom 0)] (fn [{:keys [queue-size]}] (when (> (long queue-size) (long max-queue-size)) (let [instant (enc/now-udt) @@ -851,6 +854,7 @@ nthreads-worker (if (contains? worker-opts :nthreads-worker) nthreads-worker nthreads) nthreads-handler (if (contains? worker-opts :nthreads-handler) nthreads-handler nthreads) + qname (enc/as-qname qname) worker-opts (conj (or worker-opts {}) {:handler handler diff --git a/src/taoensso/carmine/tundra.clj b/src/taoensso/carmine/tundra.clj index 48850861..08278fca 100644 --- a/src/taoensso/carmine/tundra.clj +++ b/src/taoensso/carmine/tundra.clj @@ -280,7 +280,7 @@ See `ensure-ks`, `dirty`, `worker` for TundraStore API." [datastore & [{:keys [tqname freezer redis-ttl-ms] - :or {tqname :default freezer nippy-freezer}}]] + :or {tqname "default" freezer nippy-freezer}}]] {:pre [;; (satisfies? IDataStore datastore) ;; (or (nil? freezer) (satisfies? IFreezer freezer)) (or (nil? redis-ttl-ms) (>= redis-ttl-ms (* 1000 60 60 10)))]} diff --git a/test/taoensso/carmine/tests/message_queue.clj b/test/taoensso/carmine/tests/message_queue.clj index cfe589d7..ca62b30c 100644 --- a/test/taoensso/carmine/tests/message_queue.clj +++ b/test/taoensso/carmine/tests/message_queue.clj @@ -29,7 +29,7 @@ (def conn-opts {}) (defmacro wcar* [& body] `(car/wcar conn-opts ~@body)) -(def tq :carmine-test-queue) +(def tq "carmine-test-queue") (defn clear-tq! [] (mq/queues-clear!! conn-opts [tq])) (defn test-fixture [f] (f) (clear-tq!)) @@ -89,7 +89,7 @@ (is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1b 1 default-lock-ms #_udt])) (is (= (wcar* (msg-status tq :mid1)) :locked)) (is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" "a" eoq-backoff-ms])) - (is (contains? (mq/queue-names conn-opts) (name tq)))])) + (is (contains? (mq/queue-names conn-opts) tq))])) (deftest init-backoff (testing "Enqueue with initial backoff" @@ -161,7 +161,7 @@ (let [[pr ha hr] (test-handler (fn [_m] {:status :success}))] [(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt])) (is (enc/submap? ha - {:qname :carmine-test-queue, :mid "mid1", :message :msg1, + {:qname "carmine-test-queue", :mid "mid1", :message :msg1, :attempt 1, :lock-ms default-lock-ms})) (is (= hr [:handled :success]))])