Skip to content

Commit 64602ab

Browse files
committed
Extend internal monitoring.
1 parent 6af103c commit 64602ab

File tree

1 file changed

+150
-50
lines changed

1 file changed

+150
-50
lines changed

src/mysql_queue/core.clj

Lines changed: 150 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
(ns mysql-queue.core
22
"A MySQL-backed durable queue implementation with scheduled jobs support."
3-
(:import (com.mysql.jdbc.exceptions.jdbc4 MySQLIntegrityConstraintViolationException))
43
(:require [mysql-queue.queries :as queries]
5-
[mysql-queue.utils :refer [while-let fn-options with-error-handler]]
4+
[mysql-queue.utils :refer [while-let fn-options with-error-handler profile-block meter ns->ms]]
65
[clojure.string :as string]
76
[clojure.set :as clj-set]
87
[clojure.edn :as edn]
9-
[clojure.core.async :as async :refer [chan >!! >! <! <!! go go-loop thread thread-call close! timeout alts!!]]
10-
[clojure.core.async.impl.protocols :as async-proto :refer [closed?]]))
8+
[clojure.core.async :as async :refer [chan >!! >! <! <!! go go-loop thread thread-call close! timeout alts! alts!!]]
9+
[clojure.core.async.impl.protocols :as async-proto :refer [closed?]])
10+
(:import (com.mysql.jdbc.exceptions.jdbc4 MySQLIntegrityConstraintViolationException)
11+
(java.util Date)))
1112

1213
(def ultimate-job-states #{:canceled :failed :done})
1314
(def max-retries 5)
@@ -19,11 +20,12 @@
1920
(defprotocol Stoppable
2021
(stop [worker timeout-secs]))
2122

22-
(defrecord Worker [db-conn input-chan status consumer-threads scheduler-thread recovery-thread options]
23+
(defrecord Worker
24+
[db-conn input-chan status sieve status-threads consumer-threads scheduler-thread recovery-thread options]
2325
Stoppable
2426
(stop [this timeout-secs]
25-
(when (= :running @status)
26-
(swap! status (constantly :stopped))
27+
(when (:running @status)
28+
(swap! status assoc :running false)
2729
(close! input-chan)
2830
(let [consumer-shutdowns (->> consumer-threads
2931
(concat [scheduler-thread recovery-thread])
@@ -154,28 +156,32 @@
154156
(finished? [job]
155157
(ultimate-job-states (:status job)))
156158
(execute [{:as job job-fn :user-fn :keys [status parameters attempt]} db-conn log-fn err-fn]
157-
(if-not (ultimate-job-states status)
158-
(try
159-
(log-fn :info job "Executing job " job)
160-
(let [[status params] (-> (job-fn status parameters) job-result-or-nil (or [:done nil]))]
161-
(-> job (beget status params) (persist db-conn)))
162-
(catch Exception e
163-
(err-fn e)
164-
(if (< attempt max-retries)
165-
(-> job beget (persist db-conn))
166-
(-> job (beget :failed) (persist db-conn)))))
167-
(cleanup job db-conn)))
159+
(profile-block [m]
160+
(if (finished? job)
161+
(cleanup job db-conn)
162+
(try
163+
(log-fn :info job "Executing job " job)
164+
(let [[status params] (-> (meter m :job-fn (job-fn status parameters))
165+
job-result-or-nil (or [:done nil]))]
166+
(-> job (beget status params) (persist db-conn)))
167+
(catch Exception e
168+
(err-fn e)
169+
(if (< attempt max-retries)
170+
(-> job beget (persist db-conn))
171+
(-> job (beget :failed) (persist db-conn))))))))
168172
StuckJob
169173
(finished? [job] false)
170174
(execute [job db-conn log-fn err-fn]
171-
(log-fn :info job "Recovering job " job)
172-
(-> job beget (persist db-conn)))
175+
(profile-block [_]
176+
(log-fn :info job "Recovering job " job)
177+
(-> job beget (persist db-conn))))
173178
ScheduledJob
174179
(finished? [job]
175180
(throw (UnsupportedOperationException. "finished? is not implemented for ScheduledJob.")))
176181
(execute [job db-conn log-fn _err-fn]
177-
(log-fn :info job "Executing job " job)
178-
(-> job beget (persist db-conn))))
182+
(profile-block [_]
183+
(log-fn :info job "Executing job " job)
184+
(-> job beget (persist db-conn)))))
179185

180186
(defn- get-scheduled-jobs
181187
"Searches for ready scheduled jobs and attempts to insert root jobs for each of those.
@@ -206,38 +212,116 @@
206212
total
207213
false)))
208214

215+
(defn- default-status
216+
"Build the default status map for a given number of consumer threads."
217+
[{:keys [num-consumer-threads
218+
recovery-threshold-mins
219+
min-scheduler-sleep-interval
220+
max-scheduler-sleep-interval
221+
min-recovery-sleep-interval
222+
max-recovery-sleep-interval]}]
223+
{:running true
224+
:consumers (mapv #(hash-map :n (inc %)
225+
:started-at (Date.)
226+
:jobs-executed 0N
227+
:recent-jobs [])
228+
(range num-consumer-threads))
229+
:recovery {:started-at (Date.)
230+
:min-interval min-recovery-sleep-interval
231+
:max-interval max-recovery-sleep-interval
232+
:iterations 0N
233+
:jobs-published 0N}
234+
:scheduler {:started-at (Date.)
235+
:min-interval min-scheduler-sleep-interval
236+
:max-interval max-scheduler-sleep-interval
237+
:recovery-threshold-mins recovery-threshold-mins
238+
:iterations 0N
239+
:jobs-published 0N}})
240+
209241
(defn- consumer-thread
210242
"Consumer loop. Automatically quits if the listen-chan is closed. Runs in a go-thread."
211-
[listen-chan db-conn log-fn err-fn]
243+
[id listen-chan status-chan db-conn log-fn err-fn]
212244
(go
213245
(while-let [job (<! listen-chan)]
214246
(try
215-
(loop [job job]
216-
(when job
217-
(log-fn :debug job "Consumer received job " job)
218-
(recur (<! (thread (execute job db-conn log-fn err-fn))))))
247+
(log-fn :debug job "Consumer received job " job)
248+
(loop [current-job job
249+
metrics {}]
250+
(if current-job
251+
(do
252+
(>! status-chan {:id id :state :running-job :job current-job})
253+
(let [[next-job dmetrics] (<! (thread (execute current-job db-conn log-fn err-fn)))]
254+
(recur next-job (merge-with + metrics dmetrics))))
255+
(do
256+
(>! status-chan {:id id :state :finished-job :job current-job :metrics metrics})
257+
(log-fn :info current-job "Completed job " job " in " (ns->ms (:full metrics)) "ms")
258+
nil)))
219259
(catch Exception e
260+
(>! status-chan {:id id :state :error :job job})
220261
(log-fn :error job "Unexpected error " e " in consumer loop when running job " job)
221262
(err-fn e))))
263+
(>! status-chan {:id id :state :quit})
222264
(log-fn :debug "Consumer Thread" "Consumer is stopping...")
223265
:done))
224266

225267
(defn- publisher-thread
226268
"Publisher loop. Automatically quits if the publish-chan is closed. Runs in a go-thread."
227-
[min-sleep-secs max-sleep-secs source-fn log-fn]
269+
[status-chan min-sleep-secs max-sleep-secs source-fn log-fn]
228270
(go-loop [last-exec (System/currentTimeMillis)]
229-
(if-let [num-jobs (<! (thread-call source-fn))]
271+
(if-let [published (<! (thread-call source-fn))]
230272
(do
231-
(if (zero? num-jobs)
273+
(>! status-chan {:state :running :jobs-published published})
274+
(if (zero? published)
232275
(<! (timeout (max (* 1000 min-sleep-secs)
233276
(- (* 1000 max-sleep-secs)
234277
(- (System/currentTimeMillis) last-exec)))))
235-
(log-fn :debug nil "Published " num-jobs " new jobs."))
278+
(log-fn :debug nil "Published " published " new jobs."))
236279
(recur (System/currentTimeMillis)))
237280
(do
281+
(>! status-chan {:state :quit :jobs-published 0})
238282
(log-fn :debug nil "Publisher is stopping...")
239283
:done))))
240284

285+
(defn- status-threads
286+
"Status loop. Listens for status updates from consumer and publisher threads.
287+
Runs multiple go-threads."
288+
[status num-kept-jobs]
289+
(let [consumer-chan (chan)
290+
scheduler-chan (chan)
291+
recovery-chan (chan)
292+
consumer-fn (fn [{:keys [recent-jobs] :as status} consumer-state job metrics]
293+
(let [current-time (Date.)
294+
finished? (= consumer-state :finished-job)
295+
overflow (inc (- (count recent-jobs) num-kept-jobs))
296+
overflow? (pos? overflow)
297+
job (assoc job :metrics metrics :processed-at current-time)]
298+
(cond-> status
299+
true (assoc :last-update-at current-time
300+
:last-job job
301+
:state consumer-state)
302+
(and finished? overflow?) (update :recent-jobs subvec overflow)
303+
finished? (update :jobs-executed inc)
304+
finished? (update :recent-jobs conj job))))
305+
consumer-thread (go
306+
(while-let [{:keys [id job state metrics]} (<! consumer-chan)]
307+
(swap! status update-in [:consumers id] consumer-fn state job metrics)))
308+
publisher-fn (fn [status publisher-state jobs-published]
309+
(-> status
310+
(assoc :last-update-at (Date.)
311+
:state publisher-state
312+
:jobs-published-last-run jobs-published)
313+
(update :jobs-published + jobs-published)
314+
(update :iterations inc)))
315+
scheduler-thread (go
316+
(while-let [{:keys [state jobs-published]} (<! scheduler-chan)]
317+
(swap! status update :scheduler publisher-fn state jobs-published)))
318+
recovery-thread (go
319+
(while-let [{:keys [state jobs-published]} (<! recovery-chan)]
320+
(swap! status update :recovery publisher-fn state jobs-published)))]
321+
{:consumer {:channel consumer-chan :thread consumer-thread}
322+
:scheduler {:channel scheduler-chan :thread scheduler-thread}
323+
:recovery {:channel recovery-chan :thread recovery-thread}}))
324+
241325
(defn- sieve->ids
242326
"Returns a sieve seq that can be used to filter SQL queries for
243327
certain job types. Includes a 0 id to simplify the case when the
@@ -264,13 +348,13 @@
264348
(>! ch v))
265349
(recur))
266350
(close! ch)))
267-
(doseq [out-ch out-chs]
268-
(go-loop [last-v nil]
269-
(if-let [v (<! ch)]
270-
(do
271-
(>! out-ch v)
272-
(swap! sieve disj last-v)
273-
(recur v))
351+
(go-loop [occupations {}]
352+
(if-let [v (<! ch)]
353+
(let [[v ch] (alts! (map #(vector %1 v) out-chs))]
354+
(when-let [occupation (occupations ch)]
355+
(swap! sieve disj occupation))
356+
(recur (assoc occupations ch v)))
357+
(doseq [out-ch out-chs]
274358
(close! out-ch))))
275359
[in-ch out-chs sieve]))
276360

@@ -346,6 +430,7 @@
346430
fn-bindings
347431
&{:keys [buffer-size
348432
prefetch
433+
num-stats-jobs
349434
num-consumer-threads
350435
min-scheduler-sleep-interval
351436
max-scheduler-sleep-interval
@@ -356,6 +441,7 @@
356441
err-fn]
357442
:or {buffer-size 10
358443
prefetch 10
444+
num-stats-jobs 50
359445
num-consumer-threads 2
360446
min-scheduler-sleep-interval 0
361447
max-scheduler-sleep-interval 10
@@ -372,14 +458,32 @@
372458
(fn? err-fn)]}
373459
(let [log-fn (quiet-log-fn log-fn)
374460
err-fn (quiet-err-fn err-fn)
461+
options {:buffer-size buffer-size
462+
:prefetch prefetch
463+
:num-stats-jobs num-stats-jobs
464+
:num-consumer-threads num-consumer-threads
465+
:min-scheduler-sleep-interval min-scheduler-sleep-interval
466+
:max-scheduler-sleep-interval max-scheduler-sleep-interval
467+
:min-recovery-sleep-interval min-recovery-sleep-interval
468+
:max-recovery-sleep-interval max-recovery-sleep-interval
469+
:recovery-threshold-mins recovery-threshold-mins
470+
:log-fn log-fn
471+
:err-fn err-fn}
472+
status (atom (default-status options))
473+
{:as status-threads
474+
{consumer-status-channel :channel} :consumer
475+
{recovery-status-channel :channel} :recovery
476+
{scheduler-status-channel :channel} :scheduler} (status-threads status num-stats-jobs)
375477
queue-chan (chan buffer-size)
376478
[in-ch out-chs sieve] (deduplicate queue-chan num-consumer-threads)
377479
consumer-threads (->> out-chs
378-
(map #(consumer-thread % db-conn log-fn err-fn))
480+
(map-indexed
481+
#(consumer-thread %1 %2 consumer-status-channel db-conn log-fn err-fn))
379482
(into [])
380483
doall)
381484
handler (partial publisher-error-handler log-fn err-fn)
382-
scheduler-thread (publisher-thread min-scheduler-sleep-interval
485+
scheduler-thread (publisher-thread scheduler-status-channel
486+
min-scheduler-sleep-interval
383487
max-scheduler-sleep-interval
384488
(with-error-handler [(handler "scheduler thread")]
385489
(batch-publish in-ch
@@ -388,7 +492,8 @@
388492
fn-bindings
389493
(sieve->ids @sieve ScheduledJob))))
390494
log-fn)
391-
recovery-thread (publisher-thread min-recovery-sleep-interval
495+
recovery-thread (publisher-thread recovery-status-channel
496+
min-recovery-sleep-interval
392497
max-recovery-sleep-interval
393498
(with-error-handler [(handler "recovery thread")]
394499
(batch-publish in-ch
@@ -401,16 +506,11 @@
401506
(log-fn :info nil "Starting a new worker...")
402507
(->Worker db-conn
403508
in-ch
404-
(atom :running)
509+
status
510+
sieve
511+
status-threads
405512
consumer-threads
406513
scheduler-thread
407514
recovery-thread
408-
{:buffer-size buffer-size
409-
:prefetch prefetch
410-
:num-consumer-threads num-consumer-threads
411-
:min-scheduler-sleep-interval min-scheduler-sleep-interval
412-
:max-scheduler-sleep-interval max-scheduler-sleep-interval
413-
:recovery-threshold-mins recovery-threshold-mins
414-
:log-fn log-fn
415-
:err-fn err-fn})))
515+
options)))
416516

0 commit comments

Comments
 (0)