diff --git a/example/consumer.clj b/example/consumer.clj new file mode 100644 index 0000000..09a9eb6 --- /dev/null +++ b/example/consumer.clj @@ -0,0 +1,29 @@ +(ns rabbitmq.consumer.test + (:require [com.github.icylisper.rabbitmq :as rabbitmq])) + +(defonce conn-map { :username "guest" + :password "guest" + :host "localhost" + :port 5672 + :virtual-host "/" + :type "direct" + :exchange "sorting-room" + :queue "po-box" + :routing-key "tata"}) + +(defonce connection (rabbitmq/connect conn-map)) + +(def c (ref 0)) + +(while true + (dosync (alter c inc)) + (println "cycle: " @c) + + ;; publish + (let [[_ channel] connection + message (rabbitmq/consume-wait conn-map channel)] + (println "rabbitmq consumer : got message" message)) + + (Thread/sleep 1000)) + + diff --git a/example.clj b/example/example.clj similarity index 100% rename from example.clj rename to example/example.clj diff --git a/example/pubslisher.clj b/example/pubslisher.clj new file mode 100644 index 0000000..d3fc964 --- /dev/null +++ b/example/pubslisher.clj @@ -0,0 +1,31 @@ +(ns rabbitmq.publisher + (:require [com.github.icylisper.rabbitmq :as rabbitmq])) + +(defonce conn-map { :username "guest" + :password "guest" + :host "localhost" + :port 5672 + :virtual-host "/" + :type "direct" + :exchange "sorting-room" + :queue "po-box" + :routing-key "tata"}) + +(defonce connection (rabbitmq/connect conn-map)) + +(def c (ref 0)) + +(while true + (dosync (alter c inc)) + (println "cycle: " @c) + + ;; publish + (let [[conn channel] connection] + (do + (rabbitmq/bind-channel conn-map channel) + (println "rabbitmq publishing:" (format "message%d" @c)) + (rabbitmq/publish conn-map channel (format "message%d" @c)))) + + (Thread/sleep 1000)) + + diff --git a/src/com/github/icylisper/rabbitmq.clj b/src/com/github/icylisper/rabbitmq.clj index 21a8ee1..04c57b4 100644 --- a/src/com/github/icylisper/rabbitmq.clj +++ b/src/com/github/icylisper/rabbitmq.clj @@ -7,6 +7,7 @@ Channel AMQP ConnectionFactory + Consumer QueueingConsumer))) @@ -18,9 +19,6 @@ `(with-open [~var (.createChannel connection)] ~@body)) -(defmacro with-connection - [] [] - ) (defn connect [conn-map] (let [params (doto (new ConnectionParameters) @@ -45,7 +43,6 @@ (defn disconnect [channel conn] (map (memfn close) [channel conn])) - ;;;; AMPQ Queue as a sequence (defn delivery-seq [ch q] (lazy-seq @@ -55,50 +52,21 @@ (cons m (delivery-seq ch q))))) (defn queue-seq [conn queue-name] - "Reutrn a sequence of the messages in queue with name queue-name" + "Return a sequence of the messages in queue with name queue-name" (let [ch (.createChannel conn)] (.queueDeclare ch queue-name) (let [consumer (QueueingConsumer. ch)] (.basicConsume ch queue-name consumer) (delivery-seq ch consumer)))) -(comment - ; usage - - (defonce conn-map { :username "guest" - :password "guest" - :host "localhost" - :port 5672 - :virtual-host "/" - :type "direct" - :exchange "sorting-room" - :queue "po-box" - :routing-key "tata"}) - (defonce connection (connect conn-map)) - ;; TODO - ;; (m-v-b [conn channel] (connect conn-map)) - - - (let [[conn channel] connection] - (bind-channel conn-map channel) - (publish conn-map channel "message")) - - ;; FIXME : replace with with-connection - - :or - (let [conn-map { :username "guest" - :password "guest" - :host "localhost" - :port 5672 - :virtual-host "/" - :type "direct" - :exchange "sorting-room" - :queue "po-box" - :routing-key "tata"} - [conn channel] (connect conn-map)] - (bind-channel conn-map channel) - (publish conn-map channel "message"))) - +(defn consume-wait [conn-map channel] + (let [consumer (QueueingConsumer. channel)] + (.queueDeclare channel (:queue conn-map)) + (.basicConsume channel (:queue conn-map) consumer) + (let [d (.nextDelivery consumer) + m (String. (.getBody d))] + (.basicAck channel (.. d getEnvelope getDeliveryTag) false) + m))) diff --git a/src/com/github/icylisper/test.clj b/src/com/github/icylisper/test.clj deleted file mode 100644 index 139597f..0000000 --- a/src/com/github/icylisper/test.clj +++ /dev/null @@ -1,2 +0,0 @@ - -