Permalink
Browse files

Experimental publish protocol which can be called from another thread

  • Loading branch information...
1 parent 91cb9ce commit c04ca1798b4dbdf1debdd7ba2014e413732def88 @mefesto committed May 3, 2012
Showing with 64 additions and 1 deletion.
  1. +1 −1 project.clj
  2. +22 −0 src/com/mefesto/wabbitmq.clj
  3. +41 −0 test/repl.clj
View
@@ -1,4 +1,4 @@
-(defproject com.mefesto/wabbitmq "0.2.0-SNAPSHOT"
+(defproject com.mefesto/wabbitmq "0.2.2-SNAPSHOT"
:description "WabbitMQ: A simple RabbitMQ wrapper for Clojure"
:url "https://github.com/mefesto/wabbitmq"
:license {:name "Eclipse Public License - v 1.0"
@@ -438,3 +438,25 @@
(doseq [^Future future (.invokeAll pool workers)]
(.get future))
(.shutdown pool)))
+
+;; publishing
+(defprotocol PublishQueue
+ (drain! [this])
+ (put! [this routing-key props data])
+ (close [this]))
+
+(defn publish-queue []
+ (let [queue (LinkedBlockingQueue.) eof (Object.)]
+ (reify
+ PublishQueue
+ (drain! [this]
+ (loop [msg (.take queue)]
+ (when-not (= eof msg)
+ (publish (:routing-key msg) (:props msg) (:data msg))
+ (recur (.take queue)))))
+ (put! [this key props data]
+ (.put queue {:routing-key key
+ :props props
+ :data data}))
+ (close [this]
+ (.put queue eof)))))
View
@@ -0,0 +1,41 @@
+(require '[com.mefesto.wabbitmq :as mq]
+ '[com.mefesto.wabbitmq.content-type :as ct]))
+
+(def broker
+ {:host "localhost"
+ :username "guest"
+ :password "guest"
+ :virtual-host "/"})
+
+(def queue (mq/publish-queue))
+
+;; setup a publisher thread
+(def publisher
+ (future
+ (mq/with-broker broker
+ (mq/with-channel {:content-types [ct/application-json]}
+ (mq/with-exchange "test"
+ (println "draining publish queue")
+ (mq/drain! queue)
+ (println "publish queue closed"))))))
+
+;; setup a consumer thread
+(def consumer
+ (future
+ (mq/with-broker broker
+ (mq/with-channel {:content-types [ct/application-json]}
+ (mq/with-queue "test"
+ (println "consumer ready.")
+ (->> (mq/consuming-seq true)
+ (map :body)
+ (take-while #(not= "quit" (:type %)))
+ (map #(println "consuming:" %))
+ (dorun))
+ (println "consumer done."))))))
+
+;; publish test data
+(def props {:content-type "application/json"})
+(mq/put! queue "" props {:type :doc :msg "Hello world"})
+(mq/put! queue "" props {:type :doc :msg "Goodbye world"})
+(mq/put! queue "" props {:type :quit})
+(mq/close queue)

0 comments on commit c04ca17

Please sign in to comment.