Skip to content

Commit

Permalink
[mod] [mq] Clarify that queue names should be strings
Browse files Browse the repository at this point in the history
  • Loading branch information
ptaoussanis committed Feb 29, 2024
1 parent 83c2880 commit 833d5b8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
10 changes: 7 additions & 3 deletions src/taoensso/carmine/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,9 @@
{:keys [worker queue-size nstats_ ssb-queueing-time-ms ssb-handling-time-ns]
:or {queue-size -1}}]

(let [[kind] (when (vector? poll-reply) poll-reply)]
(let [[kind] (when (vector? poll-reply) poll-reply)
qname (enc/as-qname qname)]

(case kind
"skip"
(let [[_kind reason] poll-reply]
Expand Down Expand Up @@ -614,7 +616,7 @@
java.io.Closeable (close [this] (stop this))
Object
(toString [this] ; "CarmineMessageQueueWorker[nthreads=1w+1h, running]"
(str "CarmineMessageQueueWorker[nthreads="
(str "CarmineMessageQueueWorker[qname=" qname ", nthreads="
(get worker-opts :nthreads-worker) "w+"
(get worker-opts :nthreads-handler) "h, "
(if @running?_ "running" "shut down") "]"))
Expand Down Expand Up @@ -775,7 +777,8 @@
"Returns a worker monitor fn that warns when queue exceeds the prescribed
size. A backoff timeout can be provided to rate-limit this warning."
[qname max-queue-size warn-backoff-ms]
(let [udt-last-warning_ (atom 0)]
(let [qname (enc/as-qname qname)
udt-last-warning_ (atom 0)]
(fn [{:keys [queue-size]}]
(when (> (long queue-size) (long max-queue-size))
(let [instant (enc/now-udt)
Expand Down Expand Up @@ -851,6 +854,7 @@
nthreads-worker (if (contains? worker-opts :nthreads-worker) nthreads-worker nthreads)
nthreads-handler (if (contains? worker-opts :nthreads-handler) nthreads-handler nthreads)

qname (enc/as-qname qname)
worker-opts
(conj (or worker-opts {})
{:handler handler
Expand Down
2 changes: 1 addition & 1 deletion src/taoensso/carmine/tundra.clj
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@
See `ensure-ks`, `dirty`, `worker` for TundraStore API."
[datastore & [{:keys [tqname freezer redis-ttl-ms]
:or {tqname :default freezer nippy-freezer}}]]
:or {tqname "default" freezer nippy-freezer}}]]
{:pre [;; (satisfies? IDataStore datastore)
;; (or (nil? freezer) (satisfies? IFreezer freezer))
(or (nil? redis-ttl-ms) (>= redis-ttl-ms (* 1000 60 60 10)))]}
Expand Down
6 changes: 3 additions & 3 deletions test/taoensso/carmine/tests/message_queue.clj
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
(def conn-opts {})
(defmacro wcar* [& body] `(car/wcar conn-opts ~@body))

(def tq :carmine-test-queue)
(def tq "carmine-test-queue")
(defn clear-tq! [] (mq/queues-clear!! conn-opts [tq]))

(defn test-fixture [f] (f) (clear-tq!))
Expand Down Expand Up @@ -89,7 +89,7 @@
(is (subvec? (wcar* (dequeue tq)) ["handle" "mid1" :msg1b 1 default-lock-ms #_udt]))
(is (= (wcar* (msg-status tq :mid1)) :locked))
(is (= (wcar* (dequeue tq)) ["sleep" "end-of-circle" "a" eoq-backoff-ms]))
(is (contains? (mq/queue-names conn-opts) (name tq)))]))
(is (contains? (mq/queue-names conn-opts) tq))]))

(deftest init-backoff
(testing "Enqueue with initial backoff"
Expand Down Expand Up @@ -161,7 +161,7 @@
(let [[pr ha hr] (test-handler (fn [_m] {:status :success}))]
[(is (subvec? pr ["handle" "mid1" :msg1 1 default-lock-ms #_udt]))
(is (enc/submap? ha
{:qname :carmine-test-queue, :mid "mid1", :message :msg1,
{:qname "carmine-test-queue", :mid "mid1", :message :msg1,
:attempt 1, :lock-ms default-lock-ms}))
(is (= hr [:handled :success]))])

Expand Down

0 comments on commit 833d5b8

Please sign in to comment.