Skip to content

Commit

Permalink
added example scripts; implemented consume-wait
Browse files Browse the repository at this point in the history
  • Loading branch information
icylisper committed Sep 15, 2009
1 parent beecaef commit 4548dbe
Show file tree
Hide file tree
Showing 5 changed files with 70 additions and 44 deletions.
29 changes: 29 additions & 0 deletions example/consumer.clj
@@ -0,0 +1,29 @@
(ns rabbitmq.consumer.test
(:require [com.github.icylisper.rabbitmq :as rabbitmq]))

(defonce conn-map { :username "guest"
:password "guest"
:host "localhost"
:port 5672
:virtual-host "/"
:type "direct"
:exchange "sorting-room"
:queue "po-box"
:routing-key "tata"})

(defonce connection (rabbitmq/connect conn-map))

(def c (ref 0))

(while true
(dosync (alter c inc))
(println "cycle: " @c)

;; publish
(let [[_ channel] connection
message (rabbitmq/consume-wait conn-map channel)]
(println "rabbitmq consumer : got message" message))

(Thread/sleep 1000))


File renamed without changes.
31 changes: 31 additions & 0 deletions example/pubslisher.clj
@@ -0,0 +1,31 @@
(ns rabbitmq.publisher
(:require [com.github.icylisper.rabbitmq :as rabbitmq]))

(defonce conn-map { :username "guest"
:password "guest"
:host "localhost"
:port 5672
:virtual-host "/"
:type "direct"
:exchange "sorting-room"
:queue "po-box"
:routing-key "tata"})

(defonce connection (rabbitmq/connect conn-map))

(def c (ref 0))

(while true
(dosync (alter c inc))
(println "cycle: " @c)

;; publish
(let [[conn channel] connection]
(do
(rabbitmq/bind-channel conn-map channel)
(println "rabbitmq publishing:" (format "message%d" @c))
(rabbitmq/publish conn-map channel (format "message%d" @c))))

(Thread/sleep 1000))


52 changes: 10 additions & 42 deletions src/com/github/icylisper/rabbitmq.clj
Expand Up @@ -7,6 +7,7 @@
Channel
AMQP
ConnectionFactory
Consumer
QueueingConsumer)))


Expand All @@ -18,9 +19,6 @@
`(with-open [~var (.createChannel connection)]
~@body))

(defmacro with-connection
[] []
)

(defn connect [conn-map]
(let [params (doto (new ConnectionParameters)
Expand All @@ -45,7 +43,6 @@
(defn disconnect [channel conn]
(map (memfn close) [channel conn]))


;;;; AMPQ Queue as a sequence
(defn delivery-seq [ch q]
(lazy-seq
Expand All @@ -55,50 +52,21 @@
(cons m (delivery-seq ch q)))))

(defn queue-seq [conn queue-name]
"Reutrn a sequence of the messages in queue with name queue-name"
"Return a sequence of the messages in queue with name queue-name"
(let [ch (.createChannel conn)]
(.queueDeclare ch queue-name)
(let [consumer (QueueingConsumer. ch)]
(.basicConsume ch queue-name consumer)
(delivery-seq ch consumer))))

(comment
; usage

(defonce conn-map { :username "guest"
:password "guest"
:host "localhost"
:port 5672
:virtual-host "/"
:type "direct"
:exchange "sorting-room"
:queue "po-box"
:routing-key "tata"})
(defonce connection (connect conn-map))
;; TODO
;; (m-v-b [conn channel] (connect conn-map))


(let [[conn channel] connection]
(bind-channel conn-map channel)
(publish conn-map channel "message"))

;; FIXME : replace with with-connection

:or
(let [conn-map { :username "guest"
:password "guest"
:host "localhost"
:port 5672
:virtual-host "/"
:type "direct"
:exchange "sorting-room"
:queue "po-box"
:routing-key "tata"}
[conn channel] (connect conn-map)]
(bind-channel conn-map channel)
(publish conn-map channel "message")))

(defn consume-wait [conn-map channel]
(let [consumer (QueueingConsumer. channel)]
(.queueDeclare channel (:queue conn-map))
(.basicConsume channel (:queue conn-map) consumer)
(let [d (.nextDelivery consumer)
m (String. (.getBody d))]
(.basicAck channel (.. d getEnvelope getDeliveryTag) false)
m)))



2 changes: 0 additions & 2 deletions src/com/github/icylisper/test.clj

This file was deleted.

0 comments on commit 4548dbe

Please sign in to comment.