Skip to content

Commit 8d23f06

Browse files
committed
Add a test for unbalanced processing.
This test ensures that if one worker is busy performing a slow job, other free workers are not blocked.
1 parent 64602ab commit 8d23f06

File tree

1 file changed

+48
-0
lines changed

1 file changed

+48
-0
lines changed

test/mysql_queue/core_test.clj

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
[mysql-queue.core :refer :all]
77
[mysql-queue.queries :as queries]))
88

9+
(Thread/setDefaultUncaughtExceptionHandler
10+
(reify Thread$UncaughtExceptionHandler
11+
(uncaughtException [_ thread throwable]
12+
(println "WARNING!!! Uncaught exception in core async:")
13+
(println throwable))))
14+
915
(def db-conn {:subprotocol "mysql"
1016
:subname "//localhost:3306/clj_mysql_queue?useSSL=false"
1117
:user "root"
@@ -23,6 +29,8 @@
2329
(defn clean-up
2430
[f]
2531
(delete-scheduled-jobs-by-name! db-conn "test-foo")
32+
(delete-scheduled-jobs-by-name! db-conn "slow-job")
33+
(delete-scheduled-jobs-by-name! db-conn "quick-job")
2634
(f))
2735

2836
(use-fixtures :once setup-db)
@@ -94,6 +102,46 @@
94102
(is (= num-jobs (count @check-ins))
95103
"The number of executed jobs doesn't match the number of jobs queued."))))
96104

105+
(deftest unbalanced-parallel-job-processing-test
106+
(let [num-slow-jobs 1
107+
num-quick-jobs 5
108+
expected-slow-set (->> num-slow-jobs range (map inc) (into #{}))
109+
expected-quick-set (->> num-quick-jobs range (map inc) (into #{}))
110+
quick-success? (promise)
111+
slow-success? (promise)
112+
exception (promise)
113+
slow-check-ins (check-in-atom expected-slow-set slow-success?)
114+
quick-check-ins (check-in-atom expected-quick-set quick-success?)
115+
jobs {:quick-job (fn [status {id :id :as args}]
116+
(swap! quick-check-ins conj id)
117+
[:done args])
118+
:slow-job (fn [status {id :id :as args}]
119+
(when (deref quick-success? 2000 false)
120+
(swap! slow-check-ins conj id))
121+
[:done args])}
122+
_ (dotimes [n num-slow-jobs]
123+
(schedule-job db-conn :slow-job :begin {:id (inc n)} (java.util.Date.)))
124+
_ (dotimes [n num-quick-jobs]
125+
(schedule-job db-conn :quick-job :begin {:id (inc n)} (java.util.Date.)))]
126+
(with-worker [wrk (worker db-conn
127+
jobs
128+
:prefetch 3
129+
:num-consumer-threads 2
130+
:err-fn #(deliver exception %)
131+
:max-scheduler-sleep-interval 0.1)]
132+
(is (deref slow-success? 2000 false)
133+
(str "Failed to process 1 slow job and " num-quick-jobs
134+
" quick jobs in 2 seconds.\n"
135+
"Missing slow job IDs: " (clj-set/difference expected-slow-set
136+
@slow-check-ins) "\n"
137+
"Missing quick job IDs: " (clj-set/difference expected-quick-set
138+
@quick-check-ins) "\n"
139+
"Exception?: " (deref exception 0 "nope")))
140+
(is (= num-slow-jobs (count @slow-check-ins))
141+
"The number of executed slow jobs doesn't match the number of jobs queued.")
142+
(is (= num-quick-jobs (count @quick-check-ins))
143+
"The number of executed quick jobs doesn't match the number of jobs queued."))))
144+
97145
(deftest distributed-job-processing-test
98146
(let [num-jobs 100
99147
expected-set (->> num-jobs range (map inc) (into #{}))

0 commit comments

Comments
 (0)