Skip to content

Commit b08ba56

Browse files
author
Artem Chistyakov
committed
Add public status API.
1 parent 8d23f06 commit b08ba56

File tree

5 files changed

+169
-14
lines changed

5 files changed

+169
-14
lines changed

src/mysql_queue/core.clj

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
(ns mysql-queue.core
22
"A MySQL-backed durable queue implementation with scheduled jobs support."
33
(:require [mysql-queue.queries :as queries]
4-
[mysql-queue.utils :refer [while-let fn-options with-error-handler profile-block meter ns->ms]]
4+
[mysql-queue.utils :refer [while-let fn-options with-error-handler profile-block meter ns->ms numeric-stats]]
55
[clojure.string :as string]
66
[clojure.set :as clj-set]
77
[clojure.edn :as edn]
@@ -21,7 +21,7 @@
2121
(stop [worker timeout-secs]))
2222

2323
(defrecord Worker
24-
[db-conn input-chan status sieve status-threads consumer-threads scheduler-thread recovery-thread options]
24+
[db-conn fn-bindings input-chan status sieve status-threads consumer-threads scheduler-thread recovery-thread options]
2525
Stoppable
2626
(stop [this timeout-secs]
2727
(when (:running @status)
@@ -58,7 +58,8 @@
5858
(clojure.core/name name)
5959
(clojure.core/name status)
6060
(pr-str parameters)
61-
attempt])
61+
attempt
62+
(Date.)])
6263
Persistent
6364
(persist [this conn]
6465
(if id
@@ -161,7 +162,7 @@
161162
(cleanup job db-conn)
162163
(try
163164
(log-fn :info job "Executing job " job)
164-
(let [[status params] (-> (meter m :job-fn (job-fn status parameters))
165+
(let [[status params] (-> (meter m :user (job-fn status parameters))
165166
job-result-or-nil (or [:done nil]))]
166167
(-> job (beget status params) (persist db-conn)))
167168
(catch Exception e
@@ -183,11 +184,16 @@
183184
(log-fn :info job "Executing job " job)
184185
(-> job beget (persist db-conn)))))
185186

187+
(defn- bound-job-names
188+
"Takes a map of job bindings and returns a sequence of names as strings."
189+
[fn-bindings]
190+
(map name (keys fn-bindings)))
191+
186192
(defn- get-scheduled-jobs
187193
"Searches for ready scheduled jobs and attempts to insert root jobs for each of those.
188194
Returns the number of jobs added, or false if channel was closed."
189195
[db-conn n fn-bindings sieve]
190-
(->> (queries/select-n-ready-scheduled-jobs db-conn (map name (keys fn-bindings)) sieve n)
196+
(->> (queries/select-n-ready-scheduled-jobs db-conn (bound-job-names fn-bindings) sieve n)
191197
(map #(scheduled-job % fn-bindings))))
192198

193199
(defn- get-stuck-jobs
@@ -197,7 +203,7 @@
197203
[db-conn n fn-bindings threshold sieve]
198204
(->> (queries/select-n-stuck-jobs db-conn
199205
(map name ultimate-job-states)
200-
(map name (keys fn-bindings))
206+
(bound-job-names fn-bindings)
201207
sieve
202208
threshold
203209
n)
@@ -246,15 +252,16 @@
246252
(try
247253
(log-fn :debug job "Consumer received job " job)
248254
(loop [current-job job
255+
last-job job
249256
metrics {}]
250257
(if current-job
251258
(do
252259
(>! status-chan {:id id :state :running-job :job current-job})
253260
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn)))]
254-
(recur next-job (merge-with + metrics dmetrics))))
261+
(recur next-job current-job (merge-with + metrics dmetrics))))
255262
(do
256-
(>! status-chan {:id id :state :finished-job :job current-job :metrics metrics})
257-
(log-fn :info current-job "Completed job " job " in " (ns->ms (:full metrics)) "ms")
263+
(>! status-chan {:id id :state :finished-job :job last-job :metrics metrics})
264+
(log-fn :info last-job "Completed job " last-job " in " (ns->ms (:full metrics)) "ms")
258265
nil)))
259266
(catch Exception e
260267
(>! status-chan {:id id :state :error :job job})
@@ -350,10 +357,11 @@
350357
(close! ch)))
351358
(go-loop [occupations {}]
352359
(if-let [v (<! ch)]
353-
(let [[v ch] (alts! (map #(vector %1 v) out-chs))]
360+
(let [[ret ch] (alts! (map #(vector %1 v) out-chs))]
354361
(when-let [occupation (occupations ch)]
355362
(swap! sieve disj occupation))
356-
(recur (assoc occupations ch v)))
363+
(when ret
364+
(recur (assoc occupations ch v))))
357365
(doseq [out-ch out-chs]
358366
(close! out-ch))))
359367
[in-ch out-chs sieve]))
@@ -403,6 +411,37 @@
403411
(persist db-conn)
404412
:id))
405413

414+
(defn status
415+
"Returns a map describing the current status of the worker."
416+
[{:keys [db-conn fn-bindings]
417+
{recovery-threshold :recovery-threshold-mins} :options
418+
:as worker}]
419+
(let [serialize-job (fn [job]
420+
(and job
421+
(select-keys job [:id :name :status :metrics :processed-at])))
422+
sieve @(:sieve worker)
423+
{:keys [consumers] :as raw-status} @(:status worker)
424+
recent-jobs (mapcat :recent-jobs (:consumers raw-status))
425+
scheduled-status (queries/select-scheduled-jobs-status db-conn (bound-job-names fn-bindings))
426+
in-progress-status (queries/select-jobs-status
427+
db-conn
428+
(bound-job-names fn-bindings)
429+
ultimate-job-states
430+
recovery-threshold)
431+
consumers (map #(-> %
432+
(dissoc :recent-jobs)
433+
(update :last-job serialize-job))
434+
consumers)]
435+
(-> raw-status
436+
(assoc :db-queue {:scheduled-jobs scheduled-status :jobs in-progress-status})
437+
(assoc :prefetched-jobs (map serialize-job sieve))
438+
(assoc :recent-jobs-stats
439+
{:job-types (frequencies (map :name recent-jobs))
440+
:performance {:user (numeric-stats (keep #(get-in % [:metrics :user]) recent-jobs))
441+
:full (numeric-stats (keep #(get-in % [:metrics :full]) recent-jobs))}})
442+
(assoc :recent-jobs (map serialize-job recent-jobs))
443+
(assoc :consumers consumers))))
444+
406445
(defn worker
407446
"Creates a new worker. Takes a database connection db-conn,
408447
a map of fn-bindings binding job names to job functions, and a number
@@ -412,6 +451,8 @@
412451
when the publisher will block. Default 10.
413452
* prefetch - the number of jobs a publisher fetches from the database at once.
414453
Default 10.
454+
* num-stats-jobs - the number of jobs to keep in memory for statistical purpose.
455+
Per consumer thread. Default 50.
415456
* num-consumer-threads - the number of concurrent threads that run jobs at the
416457
same time.
417458
* min-scheduler-sleep-interval - the minimum time in seconds the scheduler will sleep
@@ -505,6 +546,7 @@
505546
log-fn)]
506547
(log-fn :info nil "Starting a new worker...")
507548
(->Worker db-conn
549+
fn-bindings
508550
in-ch
509551
status
510552
sieve

src/mysql_queue/queries.clj

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,15 @@
5151
:created_at (java.util.Date.)})))
5252

5353
(defn insert-job<!
54-
[db scheduled-job-id parent-id job-name job-status parameters attempt]
54+
[db scheduled-job-id parent-id job-name job-status parameters attempt created-at]
5555
(first (sql/insert! db :jobs
5656
{:scheduled_job_id scheduled-job-id
5757
:parent_id parent-id
5858
:name job-name
5959
:status job-status
6060
:parameters parameters
6161
:attempt attempt
62-
:created_at (java.util.Date.)})))
62+
:created_at created-at})))
6363

6464
(defn select-n-ready-scheduled-jobs
6565
[db jobs-names sieved-ids n]
@@ -106,3 +106,30 @@
106106
[db id]
107107
(sql/delete! db :scheduled_jobs ["id=?" id]))
108108

109+
(defn select-jobs-status
110+
[db ultimate-statuses job-names stuck-threshold-mins]
111+
(sql/query db
112+
(concat [(str "SELECT
113+
COALESCE(SUM(created_at + INTERVAL ? MINUTE <= ?), 0) AS stuck,
114+
COUNT(*) AS total
115+
FROM jobs
116+
WHERE
117+
status NOT IN (" (in-query-stubs ultimate-statuses) ") AND
118+
name IN (" (in-query-stubs job-names) ")")]
119+
[stuck-threshold-mins (java.util.Date.)]
120+
ultimate-statuses
121+
job-names)
122+
{:result-set-fn first}))
123+
124+
(defn select-scheduled-jobs-status
125+
[db job-names]
126+
(sql/query db
127+
(concat [(str "SELECT
128+
COALESCE(SUM(scheduled_for <= ?), 0) AS overdue,
129+
COUNT(*) total
130+
FROM scheduled_jobs
131+
WHERE name IN (" (in-query-stubs job-names) ")")]
132+
[(java.util.Date.)]
133+
job-names)
134+
{:result-set-fn first}))
135+

src/mysql_queue/utils.clj

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@
3232
(catch Exception e#
3333
(~f e#)))))
3434

35+
(defn numeric-stats
36+
"Returns a map containing the minimum (:min), the maximum (:max),
37+
the median (:median), the mean (:mean), and the 90 percentile (:90p)."
38+
[s]
39+
(when (seq s)
40+
(let [sorted (sort s)
41+
length (count s)]
42+
{:min (first sorted)
43+
:max (last sorted)
44+
:mean (float (/ (reduce + s) length))
45+
:median (nth sorted (dec (/ length 2)))
46+
:90p (nth sorted (dec (/ (* 9 length) 10)))})))
47+
3548
(defn ns->ms
3649
"Converts a value in nanoseconds to milliseconds."
3750
[t]

test/mysql_queue/core_test.clj

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@
187187
[:done args])}
188188
_ (dotimes [n num-jobs]
189189
(let [scheduled-id (schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.))]
190-
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {:id (inc n)}) 1)))]
190+
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {:id (inc n)}) 1 (java.util.Date.))))]
191191
(with-worker [wrk (worker db-conn
192192
jobs
193193
:num-consumer-threads 1
@@ -232,3 +232,68 @@
232232
(is (= num-jobs (count @check-ins))
233233
"The number of executed jobs doesn't match the number of jobs queued.")))
234234

235+
(deftest status-test
236+
(let [num-jobs 100
237+
expected-set (->> num-jobs range (map inc) (into #{}))
238+
success? (promise)
239+
exception (promise)
240+
check-ins (check-in-atom expected-set success?)
241+
jobs {:test-foo (fn [status {id :id :as args}]
242+
(Thread/sleep 20)
243+
(swap! check-ins conj id)
244+
[:done args])}]
245+
(with-worker [wrk (worker db-conn
246+
jobs
247+
:num-consumer-threads 1
248+
:err-fn #(deliver exception %)
249+
:max-scheduler-sleep-interval 0.5
250+
:max-recovery-sleep-interval 2)]
251+
; Initial status
252+
(let [{{:keys [scheduled-jobs jobs]} :db-queue
253+
:keys [consumers prefetched-jobs]}
254+
(status wrk)]
255+
(is (= 1 (count consumers)))
256+
(is (zero? (:overdue scheduled-jobs)))
257+
(is (zero? (:total scheduled-jobs)))
258+
(is (zero? (:stuck jobs)))
259+
(is (zero? (:total jobs)))
260+
(is (empty? prefetched-jobs)))
261+
262+
; Publishing jobs
263+
(dotimes [n num-jobs]
264+
(schedule-job db-conn :test-foo :begin {:id (inc n)} (java.util.Date.)))
265+
266+
; Publishing one stuck job
267+
(let [scheduled-id (schedule-job db-conn :test-foo :begin {:id 1} (java.util.Date. 0))]
268+
(queries/insert-job<! db-conn scheduled-id 0 "test-foo" "begin" (pr-str {:id 1}) 1 (java.util.Date. 0)))
269+
270+
; Status after publishing
271+
(let [{{:keys [scheduled-jobs jobs]} :db-queue
272+
:keys [consumers]
273+
:as status}
274+
(status wrk)]
275+
(is (= 1 (count consumers)))
276+
(is (<= 50 (:overdue scheduled-jobs) 101))
277+
(is (<= 50 (:total scheduled-jobs) 101))
278+
(is (<= 0 (:stuck jobs) 1))
279+
(is (<= 0 (:total jobs) 50)))
280+
281+
(is (deref success? 15000 false) "Failed to process jobs in time. Test results below depend on this.")
282+
283+
; Extra sleep time to let stuck job processing finish
284+
(Thread/sleep 1000)
285+
286+
; Status after finished
287+
(let [{{:keys [scheduled-jobs jobs]} :db-queue
288+
:keys [consumers recent-jobs recent-jobs-stats]
289+
:as status}
290+
(status wrk)]
291+
(is (= 1 (count consumers)))
292+
(is (zero? (:overdue scheduled-jobs)))
293+
(is (zero? (:total scheduled-jobs)))
294+
(is (zero? (:stuck jobs)))
295+
(is (zero? (:total jobs)))
296+
(is (= 50 (count recent-jobs)))
297+
(is (= 50 (get-in recent-jobs-stats [:job-types :test-foo])))
298+
(is (= 101 (:jobs-executed (first consumers))))))))
299+

test/mysql_queue/utils_test.clj

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,11 @@
2727
(is (<= 50000000 (:subop m) 60000000))
2828
(is (<= 100000000 (:full m)))))
2929

30+
(deftest numeric-stats-test
31+
(let [stats (numeric-stats (shuffle (range 1 101)))]
32+
(is (= 1 (:min stats)))
33+
(is (= 100 (:max stats)))
34+
(is (= 50 (:median stats)))
35+
(is (= 50.5 (:mean stats)))
36+
(is (= 90 (:90p stats)))))
37+

0 commit comments

Comments
 (0)