Skip to content
Browse files

BREAKING CHANGE: Make langohr.basic/get return value consistent with …

…what delivery handlers accept

It is both more consistent and a lot more Clojure friendly (does not require heavy interop or familiarity
with the RabbitMQ Java client).
  • Loading branch information...
1 parent 2b0c0cd commit fdfabda1b51e721b2699f6e74c24ccf2636e588c @michaelklishin committed Sep 23, 2012
Showing with 58 additions and 39 deletions.
  1. +18 −0 ChangeLog.md
  2. +10 −4 src/clojure/langohr/basic.clj
  3. +10 −4 src/clojure/langohr/conversion.clj
  4. +20 −31 test/langohr/test/basic_test.clj
View
18 ChangeLog.md
@@ -1,3 +1,21 @@
+## Changes between Langohr 1.0.0-beta7 and 1.0.0-beta8
+
+`1.0.0-beta8` has **BREAKING CHANGES**:
+
+### langohr.basic/get Return Value Change
+
+`langohr.basic/get` now returns a pair of `[metadata payload]` to be consistent with what
+delivery handler functions accept:
+
+``` clojure
+(require '[langohr.basic :as lhb])
+
+(let [[metadata payload] (lhb/get channel queue)]
+ (println metadata)
+ (println (String. ^bytes payload)))
+```
+
+
## Changes between Langohr 1.0.0-beta6 and 1.0.0-beta7
`1.0.0-beta7` has **BREAKING CHANGES**:
View
14 src/clojure/langohr/basic.clj
@@ -10,7 +10,7 @@
(ns langohr.basic
(:refer-clojure :exclude [get])
(:require langohr.util)
- (:use [langohr.conversion :only [to-bytes]])
+ (:use [langohr.conversion :only [to-bytes to-message-metadata]])
(:import [com.rabbitmq.client Channel AMQP AMQP$BasicProperties AMQP$BasicProperties$Builder Consumer GetResponse ReturnListener]
[java.util Map Date]))
@@ -133,12 +133,18 @@
(.basicCancel ^Channel channel ^String consumer-tag))
-(defn ^GetResponse get
+(defn get
"Fetches a message from a queue using basic.get AMQP method"
([^Channel channel ^String queue]
- (.basicGet channel queue true))
+ (let [response (.basicGet channel queue true)]
+ (if response
+ [(to-message-metadata response) (.getBody response)]
+ nil)))
([^Channel channel ^String queue ^Boolean auto-ack]
- (.basicGet channel queue auto-ack)))
+ (let [response (.basicGet channel queue auto-ack)]
+ (if response
+ [(to-message-metadata response) (.getBody response)]
+ nil))))
View
14 src/clojure/langohr/conversion.clj
@@ -14,15 +14,15 @@
(extend-protocol MessageMetadata
com.rabbitmq.client.Envelope
- (to-message-metadata [^Envelope input]
+ (to-message-metadata [input]
{:delivery-tag (.getDeliveryTag input)
:redelivery? (.isRedeliver input)
:exchange (.getExchange input)
:routing-key (.getRoutingKey input)})
com.rabbitmq.client.AMQP$BasicProperties
- (to-message-metadata [^AMQP$BasicProperties input]
+ (to-message-metadata [input]
{:content-type (.getContentType input)
:content-encoding (.getContentEncoding input)
:headers (.getHeaders input)
@@ -41,9 +41,15 @@
com.rabbitmq.client.QueueingConsumer$Delivery
- (to-message-metadata [^QueueingConsumer$Delivery input]
+ (to-message-metadata [input]
(merge (to-message-metadata (.getProperties input))
- (to-message-metadata (.getEnvelope input)))))
+ (to-message-metadata (.getEnvelope input))))
+
+ com.rabbitmq.client.GetResponse
+ (to-message-metadata [input]
+ (merge (to-message-metadata (.getProps input))
+ (to-message-metadata (.getEnvelope input))
+ {:message-count (.getMessageCount input)})))
View
51 test/langohr/test/basic_test.clj
@@ -99,27 +99,25 @@
(deftest t-basic-get-with-automatic-ack
(let [channel (.createChannel conn)
exchange ""
- payload "A message we will fetch with basic.get"
+ body "A message we will fetch with basic.get"
queue "langohr.examples.basic.get.queue1"
declare-ok (lhq/declare channel queue :auto-delete true)]
- (lhb/publish channel exchange queue payload)
- (let [get-response (lhb/get channel queue)]
- (is (instance? GetResponse get-response))
- (is (= (String. (.getBody get-response)) payload))
- (is (= (.getMessageCount get-response) 0))
- (is (= (.. get-response getEnvelope getExchange) exchange))
- (is (= (.. get-response getEnvelope getRoutingKey) queue)))))
+ (lhb/publish channel exchange queue body)
+ (let [[metadata payload] (lhb/get channel queue)]
+ (is (= (String. ^bytes payload) body))
+ (is (= (:message-count metadata) 0))
+ (is (= (:exchange metadata) exchange))
+ (is (= (:routing-key metadata) queue)))))
(deftest t-basic-get-with-explicit-ack
(let [channel (.createChannel conn)
exchange ""
- payload "A message we will fetch with basic.get"
+ body "A message we will fetch with basic.get"
queue "langohr.examples.basic.get.queue2"
declare-ok (lhq/declare channel queue :auto-delete true)]
- (lhb/publish channel exchange queue payload)
- (let [get-response (lhb/get channel queue false)]
- (is (instance? GetResponse get-response))
- (is (= (String. (.getBody get-response)) payload)))))
+ (lhb/publish channel exchange queue body)
+ (let [[metadata payload] (lhb/get channel queue false)]
+ (is (= (String. ^bytes payload) body)))))
(deftest t-basic-get-with-an-empty-queue
@@ -152,8 +150,7 @@
(lhb/publish producer-channel "" queue "Two")
(lhb/publish producer-channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response (lhb/get consumer-channel queue false)
- delivery-tag (.. get-response getEnvelope getDeliveryTag)]
+ (let [[{delivery-tag :delivery-tag} _] (lhb/get consumer-channel queue false)]
(is (= 1 delivery-tag))
(lhb/ack consumer-channel delivery-tag))
(lhq/purge producer-channel queue)))
@@ -168,10 +165,8 @@
(lhb/publish producer-channel "" queue "Two")
(lhb/publish producer-channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response1 (lhb/get consumer-channel queue false)
- get-response2 (lhb/get consumer-channel queue false)
- delivery-tag1 (.. get-response1 getEnvelope getDeliveryTag)
- delivery-tag2 (.. get-response2 getEnvelope getDeliveryTag)]
+ (let [[{delivery-tag1 :delivery-tag} _] (lhb/get consumer-channel queue false)
+ [{delivery-tag2 :delivery-tag} _] (lhb/get consumer-channel queue false)]
(is (= 1 delivery-tag1))
(is (= 2 delivery-tag2))
(lhb/ack consumer-channel delivery-tag1 true))
@@ -191,8 +186,7 @@
(lhb/publish channel "" queue "Two")
(lhb/publish channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response (lhb/get channel queue false)
- delivery-tag (.. get-response getEnvelope getDeliveryTag)]
+ (let [[{:keys [delivery-tag]} _] (lhb/get channel queue false)]
(is (= 1 delivery-tag))
(lhb/nack channel delivery-tag false true))
(lhq/purge channel queue)))
@@ -206,10 +200,8 @@
(lhb/publish channel "" queue "Two")
(lhb/publish channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response1 (lhb/get channel queue false)
- get-response2 (lhb/get channel queue false)
- delivery-tag1 (.. get-response1 getEnvelope getDeliveryTag)
- delivery-tag2 (.. get-response2 getEnvelope getDeliveryTag)]
+ (let [[{delivery-tag1 :delivery-tag} _] (lhb/get channel queue false)
+ [{delivery-tag2 :delivery-tag} _] (lhb/get channel queue false)]
(is (= 1 delivery-tag1))
(is (= 2 delivery-tag2))
(lhb/nack channel delivery-tag1 true false))
@@ -230,8 +222,7 @@
(lhb/publish channel "" queue "Two")
(lhb/publish channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response (lhb/get channel queue false)
- delivery-tag (.. get-response getEnvelope getDeliveryTag)]
+ (let [[{:keys [delivery-tag]} _] (lhb/get channel queue false)]
(is (= 1 delivery-tag))
(lhb/reject channel delivery-tag true))
(lhq/purge channel queue)))
@@ -245,10 +236,8 @@
(lhb/publish channel "" queue "Two")
(lhb/publish channel "" queue "Three"))))
(Thread/sleep 200)
- (let [get-response1 (lhb/get channel queue false)
- get-response2 (lhb/get channel queue false)
- delivery-tag1 (.. get-response1 getEnvelope getDeliveryTag)
- delivery-tag2 (.. get-response2 getEnvelope getDeliveryTag)]
+ (let [[{delivery-tag1 :delivery-tag} _] (lhb/get channel queue false)
+ [{delivery-tag2 :delivery-tag} _] (lhb/get channel queue false)]
(is (= 1 delivery-tag1))
(is (= 2 delivery-tag2))
(lhb/reject channel delivery-tag1 false))

0 comments on commit fdfabda

Please sign in to comment.
Something went wrong with that request. Please try again.