Skip to content
Browse files

Introduce distinction between message-seq and delivery-seq.

This allows manual acking of lazy sequence contents.
Fix an apparent bug without prefetching argument.
Add delivery-contents, ack-delivery to pull contents out of Delivery
  • Loading branch information...
1 parent ee3f14e commit 699bd4872e1a27129a430956dc3907772f8a470a @rnewman committed Jan 25, 2010
Showing with 64 additions and 18 deletions.
  1. +64 −18 src/org/clojars/rabbitmq.clj
82 src/org/clojars/rabbitmq.clj
@@ -9,7 +9,8 @@
- QueueingConsumer)))
+ QueueingConsumer)
+ com.rabbitmq.client.QueueingConsumer$Delivery))
;; abbreviatons:
@@ -56,37 +57,82 @@
(.close ch)
(.close conn))
+(defn #^String delivery-contents
+ "Return the contents of the Delivery as a String."
+ [#^"QueueingConsumer$Delivery" d]
+ (when d
+ (String. (.getBody d))))
+(defn ack-delivery [#^Channel ch
+ #^"QueueingConsumer$Delivery" d]
+ "Acknowledge a Delivery from the given Channel."
+ (.basicAck ch (.. d getEnvelope getDeliveryTag) false))
+(defn delivery-seq
+ "Returns a lazy sequence of deliveries from the queue.
+ The caller is responsible for acking the message and
+ extracting its contents."
+ [#^Channel ch
+ #^QueueingConsumer q]
+ (lazy-seq
+ (let [d (.nextDelivery q)]
+ (cons d (delivery-seq ch q)))))
;;;; AMQP Queue as a sequence
-(defn delivery-seq [#^Channel ch
- #^QueueingConsumer q]
+(defn message-seq
+ "Return a lazy sequence of queue items. Automatically
+ acks each message and returns the content as a string."
+ [#^Channel ch
+ #^QueueingConsumer q]
(let [d (.nextDelivery q)
- m (String. (.getBody d))]
- (.basicAck ch (.. d getEnvelope getDeliveryTag) false)
- (cons m (delivery-seq ch q)))))
+ m (delivery-contents d)]
+ (ack-delivery ch d)
+ (cons m (message-seq ch q)))))
(defn #^QueueingConsumer
"Return a QueueingConsumer with the appropriate settings."
[#^Channel ch queue prefetch]
(.queueDeclare ch queue)
(when prefetch
- (.basicQos ch prefetch)
- (QueueingConsumer. ch)))
-(defn queue-seq
- "Return a sequence of the messages in queue with name queue-name"
- ([#^Channel ch
- {:keys [queue prefetch]}]
- (let [consumer (declare-queue-and-consumer ch queue prefetch)]
- (.basicConsume ch queue consumer)
- (delivery-seq ch consumer)))
+ (.basicQos ch prefetch))
+ (QueueingConsumer. ch))
+(defmacro with-declared-queue-and-consumer
+ "Macro which establishes a declared queue and consumer, starting
+ basicConsume, then executing the body.
+ `consumer` is bound to the resulting consumer for the duration of the body."
+ [[ch consumer options] & body]
+ `(let [opts# ~options
+ queue# (:queue opts#)
+ prefetch# (:prefetch opts#)
+ cch# ~ch]
+ (let [~consumer (declare-queue-and-consumer cch# queue# prefetch#)]
+ (.basicConsume cch# queue# ~consumer)
+ ~@body)))
+(defn queue-delivery-seq
+ "Return a sequence of the Delivery objects in the named queue."
+ ([#^Channel ch opts]
+ (with-declared-queue-and-consumer
+ [ch consumer opts]
+ (delivery-seq ch consumer))))
+(defn queue-message-seq
+ "Return a sequence of the messages in queue with name `queue-name`."
+ ([#^Channel ch opts]
+ (with-declared-queue-and-consumer
+ [ch consumer opts]
+ (message-seq ch consumer)))
#^Channel ch
- (queue-seq ch c)))
+ (queue-message-seq ch c)))
+(defn queue-seq [& args]
+ (apply queue-message-seq args))
;;; consumer routines
(defn consume-wait

0 comments on commit 699bd48

Please sign in to comment.
Something went wrong with that request. Please try again.