Skip to content

Commit

Permalink
Breaking change: unify consumer handler names
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelklishin committed Sep 22, 2012
1 parent 88e05c5 commit 5b8f72c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 30 deletions.
15 changes: 14 additions & 1 deletion ChangeLog.md
@@ -1,6 +1,19 @@
## Changes between Langohr 1.0.0-beta6 and 1.0.0-beta7

No changes yet.
`1.0.0-beta7` has **BREAKING CHANGES**:

### langohr.basic/consume Handler Names

The options `langohr.basic/consume` takes now have consistent naming:

* `:consume-ok-fn` becomes `:handle-consume-ok-fn`
* `:cancel-fn` becomes `:handle-cancel-fn`
* `:cancel-ok-fn` becomes `:handle-cancel-ok-fn`
* `:shutdown-signal-ok-fn` becomes `:handle-shutdown-signal-ok-fn`
* `:recover-ok-fn` becomes `:handle-recover-ok-fn`
* `:handle-delivery-fn` does not change

This makes handler argument names consistent across the board.


## Changes between Langohr 1.0.0-beta5 and 1.0.0-beta6
Expand Down
42 changes: 23 additions & 19 deletions src/clojure/langohr/consumers.clj
Expand Up @@ -21,33 +21,37 @@

(defn ^Consumer create-default
"Instantiates and returns a new consumer that handles various consumer life cycle events. See also langohr.basic/consume."
[^Channel channel &{ :keys [consume-ok-fn cancel-fn cancel-ok-fn shutdown-signal-fn recover-ok-fn handle-delivery-fn] }]
[^Channel channel &{:keys [handle-consume-ok-fn
handle-cancel-fn
handle-cancel-ok-fn
handle-shutdown-signal-fn
handle-recover-ok-fn
handle-delivery-fn]}]
(proxy [DefaultConsumer] [^Channel channel]
(handleConsumeOk [^String consumer-tag]
(when consume-ok-fn
(consume-ok-fn consumer-tag)))
(when handle-consume-ok-fn
(handle-consume-ok-fn consumer-tag)))

(handleCancelOk [^String consumer-tag]
(when cancel-ok-fn
(cancel-ok-fn consumer-tag)))
(when handle-cancel-ok-fn
(handle-cancel-ok-fn consumer-tag)))


(handleCancel [^String consumer-tag]
(when cancel-fn
(cancel-fn consumer-tag)))
(when handle-cancel-fn
(handle-cancel-fn consumer-tag)))

(handleRecoverOk []
(when recover-ok-fn
(recover-ok-fn)))
(when handle-recover-ok-fn
(handle-recover-ok-fn)))

(handleShutdownSignal [^String consumer-tag ^ShutdownSignalException sig]
(when shutdown-signal-fn
(shutdown-signal-fn consumer-tag sig)))
(when handle-shutdown-signal-fn
(handle-shutdown-signal-fn consumer-tag sig)))

(handleDelivery [^String consumer-tag ^Envelope envelope ^AMQP$BasicProperties properties ^bytes body]
(when handle-delivery-fn
(let [delivery (QueueingConsumer$Delivery. envelope properties body)]
(handle-delivery-fn channel (to-message-metadata delivery) body))))))
(handle-delivery-fn channel (to-message-metadata (QueueingConsumer$Delivery. envelope properties body)) body)))))

(defn subscribe
"Adds new blocking default consumer to a queue using basic.consume AMQP method"
Expand All @@ -56,10 +60,10 @@
cons-opts (select-keys options keys)
options' (apply dissoc (concat [options] keys))
consumer (create-default channel
:handle-delivery-fn f
:handle-consume-ok (get cons-opts :handle-consume-ok)
:handle-cancel-ok (get cons-opts :handle-cancel-ok)
:handle-cancel (get cons-opts :handle-cancel)
:handle-recover-ok (get cons-opts :handle-recover-ok)
:handle-shutdown-signal (get cons-opts :handle-shutdown-signal))]
:handle-delivery-fn f
:handle-consume-ok (get cons-opts :handle-consume-ok)
:handle-cancel-ok (get cons-opts :handle-cancel-ok)
:handle-cancel (get cons-opts :handle-cancel)
:handle-recover-ok (get cons-opts :handle-recover-ok)
:handle-shutdown-signal (get cons-opts :handle-shutdown-signal))]
(apply lhb/consume channel queue consumer (flatten (vec options')))))
20 changes: 10 additions & 10 deletions test/langohr/test/consumers_test.clj
Expand Up @@ -20,8 +20,8 @@
(let [channel (lch/open conn)
queue (.getQueue (lhq/declare channel))
latch (java.util.concurrent.CountDownLatch. 1)
consumer (lhcons/create-default channel :consume-ok-fn (fn [consumer-tag]
(.countDown latch)))]
consumer (lhcons/create-default channel :handle-consume-ok-fn (fn [consumer-tag]
(.countDown latch)))]
(lhb/consume channel queue consumer)
(.await latch)))

Expand All @@ -31,8 +31,8 @@
queue (.getQueue (lhq/declare channel))
tag (lhu/generate-consumer-tag "t-cancel-ok-handler")
latch (java.util.concurrent.CountDownLatch. 1)
consumer (lhcons/create-default channel :cancel-ok-fn (fn [consumer-tag]
(.countDown latch)))
consumer (lhcons/create-default channel :handle-cancel-ok-fn (fn [consumer-tag]
(.countDown latch)))
ret-tag (lhb/consume channel queue consumer :consumer-tag tag)]
(is (= tag ret-tag))
(lhb/cancel channel tag)
Expand All @@ -43,8 +43,8 @@
(let [channel (lch/open conn)
queue (.getQueue (lhq/declare channel))
latch (java.util.concurrent.CountDownLatch. 1)
consumer (lhcons/create-default channel :cancel-fn (fn [consumer_tag]
(.countDown latch)))]
consumer (lhcons/create-default channel :handle-cancel-fn (fn [consumer_tag]
(.countDown latch)))]
(lhb/consume channel queue consumer)
(lhq/delete channel queue)
(.await latch)))
Expand All @@ -58,10 +58,10 @@
n 300
latch (java.util.concurrent.CountDownLatch. (inc n))
consumer (lhcons/create-default channel
:handle-delivery-fn (fn [ch metadata ^bytes payload]
(.countDown latch))
:consume-ok-fn (fn [consumer-tag]
(.countDown latch)))]
:handle-delivery-fn (fn [ch metadata ^bytes payload]
(.countDown latch))
:handle-consume-ok-fn (fn [consumer-tag]
(.countDown latch)))]
(lhb/consume channel queue consumer)
(.start (Thread. (fn []
(dotimes [i n]
Expand Down

0 comments on commit 5b8f72c

Please sign in to comment.