Permalink
Browse files

Fix broadcast by binding queue.

Still some race conditions surfacing occasionally in broadcast tests.
  • Loading branch information...
1 parent 5e94fb2 commit acf3fd290c2acc45377647fed97de0b9f85a30b6 @technomancy committed Aug 3, 2011
Showing with 43 additions and 19 deletions.
  1. +3 −0 src/die/roboter.clj
  2. +40 −19 test/die/test/roboter.clj
View
@@ -25,6 +25,9 @@
(:exchange-auto-delete config false))
(wabbit/queue-declare (:queue config "die.roboter.work")
(:durable config true))
+ (wabbit/queue-bind (:queue config "die.roboter.work")
+ (:exchange config "die.roboter")
+ (:queue config "die.roboter.work"))
(catch Exception e
(log/error e "Couldn't declare exchange/queue."))))
@@ -11,50 +11,71 @@
(def state (atom {}))
+(def bound :root)
+
(defn clear-queues! [queue-name]
(with-robots {}
(wabbit/with-queue queue-name
(doall (take 100 (wabbit/consuming-seq true 1))))))
(defn work-fixture [f]
(clear-queues! "die.roboter.work")
- (let [worker (clojure.core/future (work))]
- (reset! state {})
- (try (f)
- (finally (.cancel worker true)
- (remove-watch state :unblocker)))))
+ (reset! state {})
+ (f))
+
+(defmacro with-worker [& body]
+ `(let [worker# (clojure.core/future (work))]
+ (try ~@body
+ (finally (.cancel worker# true)))))
+
+(defn wait-for [blockers]
+ (try (.get (clojure.core/future (doseq [b blockers] @b)) 1 TimeUnit/SECONDS)
+ (catch java.util.concurrent.TimeoutException _
+ (is false "Timed out!"))))
(defmacro with-block [n body]
- `(let [blockers# (repeat ~n (promise))
+ `(let [blockers# (repeatedly ~n #(promise))
blocked# (atom blockers#)]
(add-watch state :unblocker (fn [& _#]
- (deliver (first @blocked#) true)
- (swap! blocked# rest)))
- ~body
- (.get (clojure.core/future (doseq [b# blockers#] @b#))
- 1 TimeUnit/SECONDS)))
+ (locking blocked#
+ (if (seq @blocked#)
+ (deliver (first @blocked#) true)
+ (println "Unblocked too many times!"))
+ (swap! blocked# rest))))
+ (try
+ ~body
+ (wait-for (take ~n blockers#))
+ (finally (remove-watch state :unblocker)))))
(use-fixtures :each work-fixture)
(deftest test-send-off
- (with-block 1
- (send-off `(swap! state assoc :ran true)))
+ (with-worker
+ (with-block 1
+ (send-off `(swap! state assoc :ran true))))
(is (:ran @state)))
(deftest test-future
- (is (= 1 (.get (future 1) 100 TimeUnit/MILLISECONDS))))
+ (with-worker
+ (is (= 1 (.get (future 1) 100 TimeUnit/MILLISECONDS)))))
-(def bound :root)
+(deftest test-simple-broadcast
+ (with-block 1
+ (let [worker (clojure.core/future
+ (binding [bound 1]
+ (work-on-broadcast)))]
+ (try (broadcast `(swap! state assoc bound true))
+ (finally (.cancel worker true)))))
+ (is (= {1 true} @state)))
-(deftest test-broadcast
+(deftest test-multiple-broadcast
(with-block 2
- ;; TODO: these jerks aren't pulling their weight
(let [worker1 (clojure.core/future
(binding [bound 1]
- (work-on-broadcast)))
+ (work-on-broadcast {:queue "worker1"})))
worker2 (clojure.core/future
(binding [bound 2]
- (work-on-broadcast)))]
+ (work-on-broadcast {:queue "worker2"})))]
(try (broadcast `(swap! state assoc bound true))
(finally (.cancel worker1 true)
(.cancel worker2 true)))))

0 comments on commit acf3fd2

Please sign in to comment.