Permalink
Browse files

Change message consumer fn signature per discussion in #3

  • Loading branch information...
1 parent 363bd0a commit ebd0f4c72b720860449f3df4af3f523ef91311a5 @michaelklishin committed Apr 19, 2012
Showing with 102 additions and 17 deletions.
  1. +30 −0 ChangeLog.md
  2. +12 −12 src/langohr/consumers.clj
  3. +46 −0 src/langohr/conversion.clj
  4. +12 −3 test/langohr/test/basic.clj
  5. +2 −2 test/langohr/test/exchange.clj
View
@@ -1,5 +1,35 @@
## Changes between Langohr 1.0.0-beta1 and 1.0.0-beta2
+### Breaking change: message handler signature has changed
+
+Previously message handlers registered via `langohr.consumers/subscribe` had the following
+signature:
+
+``` clojure
+(fn [^QueueingConsumer$Delivery delivery ^AMQP$BasicProperties properties payload] ...)
+```
+
+starting with beta2, it has changed to be more Clojure friendly
+
+``` clojure
+(fn [^Channel ch metadata payload] ...)
+```
+
+All message metadata (both envelope and message properties) are now passed in as a single Clojure
+map that you can use destructuring on:
+
+``` clojure
+(fn [^Channel ch {:keys [type content-type message-id correlation-id] :as metadata} payload] ...)
+```
+
+In addition, in explicit acknowledgement mode, ack-ing and nack-ing messages got easier because
+consumer channel is now passed in.
+
+It is important to remember that sharing channels between threads that publish messages is **dangerous**
+and should be avoided. Ack-ing, nack-ing and consuming messages with shared channels is usually acceptable.
+
+
+
### RabbitMQ Java Client 2.8.x
RabbitMQ Java Client which Langohr is based on has been upgraded to version 2.8.1.
@@ -9,8 +9,9 @@
(ns langohr.consumers
(:refer-clojure :exclude [get])
- (:import [com.rabbitmq.client Channel Consumer DefaultConsumer QueueingConsumer ShutdownSignalException Envelope AMQP$BasicProperties QueueingConsumer$Delivery])
- (:use [langohr.basic :as lhb]))
+ (:import [com.rabbitmq.client Channel Consumer DefaultConsumer QueueingConsumer QueueingConsumer$Delivery ShutdownSignalException Envelope AMQP$BasicProperties QueueingConsumer$Delivery])
+ (:require [langohr.basic :as lhb])
+ (:use langohr.conversion))
@@ -52,14 +53,13 @@
(defn subscribe
"Adds new blocking consumer to a queue using basic.consume AMQP method"
- [^Channel channel ^String queue ^clojure.lang.IFn message-handler & { :keys [consumer-tag auto-ack exclusive no-local arguments]
- :or { consumer-tag "" auto-ack false exclusive false no-local false } }]
+ [^Channel channel ^String queue f &{:keys [consumer-tag auto-ack exclusive no-local arguments]
+ :or { consumer-tag "" auto-ack false exclusive false no-local false}}]
(let [queueing-consumer (QueueingConsumer. channel)]
- (do
- (lhb/consume channel queue queueing-consumer :consumer-tag consumer-tag :auto-ack auto-ack :exclusive exclusive :arguments arguments :no-local no-local)
- (while true
- (try
- (let [delivery (.nextDelivery queueing-consumer)]
- (message-handler delivery (.getProperties delivery) (.getBody delivery)))
- (catch InterruptedException e
- nil))))))
+ (lhb/consume channel queue queueing-consumer :consumer-tag consumer-tag :auto-ack auto-ack :exclusive exclusive :arguments arguments :no-local no-local)
+ (while true
+ (try
+ (let [^QueueingConsumer$Delivery delivery (.nextDelivery queueing-consumer)]
+ (f channel (to-message-metadata delivery) (.getBody delivery)))
+ (catch InterruptedException e
+ nil)))))
@@ -0,0 +1,46 @@
+(ns langohr.conversion)
+
+
+;;
+;; API
+;;
+
+(def ^{:const true}
+ persistent-mode 2)
+
+(defprotocol MessageMetadata
+ (to-message-metadata [input] "Turns AMQP 0.9.1 message metadata into a Clojure map"))
+
+(extend-protocol MessageMetadata
+ com.rabbitmq.client.Envelope
+ (to-message-metadata [^Envelope input]
+ {:delivery-tag (.getDeliveryTag input)
+ :redelivery? (.isRedeliver input)
+ :exchange (.getExchange input)
+ :routing-key (.getRoutingKey input)})
+
+
+ com.rabbitmq.client.AMQP$BasicProperties
+ (to-message-metadata [^AMQP$BasicProperties input]
+ {:content-type (.getContentType input)
+ :content-encoding (.getContentEncoding input)
+ :headers (.getHeaders input)
+ :delivery-mode (.getDeliveryMode input)
+ :persistent? (= persistent-mode (.getDeliveryMode input))
+ :priority (.getPriority input)
+ :correlation-id (.getCorrelationId input)
+ :reply-to (.getReplyTo input)
+ :expiration (.getExpiration input)
+ :message-id (.getMessageId input)
+ :timestamp (.getTimestamp input)
+ :type (.getType input)
+ :user-id (.getUserId input)
+ :app-id (.getAppId input)
+ :cluster-id (.getClusterId input)})
+
+
+ com.rabbitmq.client.QueueingConsumer$Delivery
+ (to-message-metadata [^QueueingConsumer$Delivery input]
+ (merge (to-message-metadata (.getProperties input))
+ (to-message-metadata (.getEnvelope input)))))
+
@@ -34,12 +34,21 @@
msg-id (.toString (java.util.UUID/randomUUID))
n 3000
latch (java.util.concurrent.CountDownLatch. n)
- msg-handler (fn [delivery message-properties message-payload]
+ msg-handler (fn [ch metadata payload]
+ (is (:delivery-tag metadata))
+ (is (:content-type metadata))
+ (is (:headers metadata))
+ (is (:message-id metadata))
+ (is (:priority metadata))
(.countDown latch))]
(.start (Thread. #((lhcons/subscribe channel queue msg-handler :consumer-tag tag :auto-ack true)) "t-publishing-using-default-exchange-and-default-message-attributes/consumer"))
(.start (Thread. (fn []
(dotimes [i n]
- (lhb/publish channel exchange queue payload :priority 8, :message-id msg-id, :content-type content-type, :headers { "see you soon" "à bientôt" }))) "publisher"))
+ (lhb/publish channel exchange queue payload
+ :priority 8
+ :message-id msg-id
+ :content-type content-type
+ :headers { "see you soon" "à bientôt" }))) "publisher"))
(.await latch)))
@@ -67,7 +76,7 @@
queue (.getQueue (lhq/declare channel "" :auto-delete true))
tag (lhu/generate-consumer-tag "langohr.basic/consume-tests")
counter (atom 0)
- msg-handler (fn [delivery message-properties message-payload]
+ msg-handler (fn [ch metadata payload]
(swap! counter inc))]
(.start (Thread. #((lhcons/subscribe channel queue msg-handler :consumer-tag tag, :auto-ack true)) "t-basic-cancel/consumer"))
(lhb/publish channel exchange queue payload)
@@ -223,10 +223,10 @@
de "langohr.extensions.altexchanges.direct1"
queue (.getQueue (lhq/declare channel "" :auto-delete true))
latch (java.util.concurrent.CountDownLatch. 1)
- msg-handler (fn [delivery message-properties message-payload]
+ msg-handler (fn [ch metadata payload]
(.countDown latch))]
(lhe/declare channel fe "fanout" :auto-delete true)
- (lhe/declare channel de "direct" :auto-delete true :arguments { "alternate-exchange" fe })
+ (lhe/declare channel de "direct" :auto-delete true :arguments {"alternate-exchange" fe})
(lhq/bind channel queue fe)
(.start (Thread. #((lhcons/subscribe channel queue msg-handler :auto-ack true)) "subscriber"))
(.start (Thread. (fn []

0 comments on commit ebd0f4c

Please sign in to comment.