Permalink
Browse files

basic.* cleanup

  • Loading branch information...
1 parent c25cf2f commit d3555f237ff2f7306dad887e1ff4d147ad825a79 Michael Klishin committed Feb 27, 2013
Showing with 34 additions and 35 deletions.
  1. +34 −35 src/clojure/langohr/basic.clj
@@ -20,7 +20,7 @@
;;
(defn publish
- "Publishes a message using basic.publish AMQP method.
+ "Publishes a message using basic.publish AMQP 0.9.1 method.
This method publishes a message to a specific exchange. The message will be routed to queues as defined by
the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
@@ -55,31 +55,33 @@
Example:
- (lhb/publish channel exchange queue payload :priority 8 :message-id msg-id :content-type content-type :headers { \"see you soon\" \"à bientôt\" })
- "
+ (lhb/publish channel exchange queue payload :priority 8 :message-id msg-id :content-type content-type :headers { \"see you soon\" \"à bientôt\" })"
[^Channel channel ^String exchange ^String routing-key payload
&{:keys [^Boolean mandatory ^Boolean immediate ^String content-type ^String ^String content-encoding ^Map headers
^Boolean persistent ^Integer priority ^String correlation-id ^String reply-to ^String expiration ^String message-id
^Date timestamp ^String type ^String user-id ^String app-id ^String cluster-id]
:or {mandatory false immediate false}}]
- (let [payload-bytes (to-bytes payload)
- properties-builder (AMQP$BasicProperties$Builder.)
- properties (.build (doto properties-builder
- (.contentType content-type)
- (.contentEncoding content-encoding)
- (.headers headers)
- (.deliveryMode (Integer/valueOf (if persistent 2 1)))
- (.priority (if priority (Integer/valueOf ^Long priority) nil))
- (.correlationId correlation-id)
- (.replyTo reply-to)
- (.expiration expiration)
- (.messageId message-id)
- (.timestamp timestamp)
- (.type type)
- (.userId user-id)
- (.appId app-id)
- (.clusterId cluster-id)))]
- (.basicPublish channel exchange routing-key mandatory immediate properties payload-bytes)))
+ (let [bytes (to-bytes payload)
+ pb (doto (AMQP$BasicProperties$Builder.)
+ (.contentType content-type)
+ (.contentEncoding content-encoding)
+ (.headers headers)
+ (.deliveryMode (Integer/valueOf (if persistent 2 1)))
+ (.priority (if priority (Integer/valueOf ^Long priority) nil))
+ (.correlationId correlation-id)
+ (.replyTo reply-to)
+ (.expiration expiration)
+ (.messageId message-id)
+ (.timestamp timestamp)
+ (.type type)
+ (.userId user-id)
+ (.appId app-id)
+ (.clusterId cluster-id))]
+ (.basicPublish channel
+ exchange
+ routing-key
+ mandatory
+ immediate (.build pb) bytes)))
(defn ^ReturnListener return-listener
@@ -110,7 +112,7 @@
(defn ^String consume
- "Adds new consumer to a queue using basic.consume AMQP method.
+ "Adds new consumer to a queue using basic.consume AMQP 0.9.1 method.
Called with default parameters, starts non-nolocal, non-exclusive consumer with explicit acknowledgement and server-generated consumer tag.
@@ -119,25 +121,22 @@
Options:
- ^String :consumer-tag
- ^Boolean :auto-ack (default false) - true if the server should consider messages acknowledged once delivered, false if server should expect explicit acknowledgements.
- ^Boolean :exclusive (default false) - true if this is an exclusive consumer (no other consumer can consume given queue)
- ^Boolean :no-local (default false) - flag set to true unless server local buffering is required.
-
-"
+ ^String :consumer-tag: a unique consumer (subscription) identifier. Pass an empty string to make RabbitMQ generate one for you.
+ ^Boolean :auto-ack (default false): true if the server should consider messages acknowledged once delivered, false if server should expect explicit acknowledgements.
+ ^Boolean :exclusive (default false): true if this is an exclusive consumer (no other consumer can consume given queue)"
[^Channel channel ^String queue ^Consumer consumer &{:keys [consumer-tag auto-ack exclusive arguments no-local]
:or {consumer-tag "" auto-ack false exclusive false no-local false}}]
(.basicConsume ^Channel channel ^String queue ^Boolean auto-ack ^String consumer-tag ^Boolean no-local ^Boolean exclusive ^Map arguments ^Consumer consumer))
(defn cancel
- "Cancels consumer using basic.cancel AMQP method"
+ "Cancels a consumer (subscription) using basic.cancel AMQP 0.9.1 method"
[^Channel channel ^String consumer-tag]
(.basicCancel ^Channel channel ^String consumer-tag))
(defn get
- "Fetches a message from a queue using basic.get AMQP method"
+ "Fetches a message from a queue using basic.get AMQP 0.9.1 method"
([^Channel channel ^String queue]
(when-let [response (.basicGet channel queue true)]
[(to-message-metadata response) (.getBody response)]))
@@ -148,36 +147,36 @@
(defn qos
- "Sets channel or connection prefetch level using basic.qos AMQP method"
+ "Sets channel or connection prefetch level using basic.qos AMQP 0.9.1 method"
([^Channel channel ^long prefetch-count]
(.basicQos channel prefetch-count))
([^Channel channel ^long prefetch-size ^long prefetch-count ^Boolean global]
(.basicQos channel prefetch-size prefetch-count global)))
(defn ack
- "Acknowledges one or more messages using basic.ack AMQP method"
+ "Acknowledges one or more messages using basic.ack AMQP 0.9.1 method"
([^Channel channel ^long delivery-tag]
(.basicAck channel delivery-tag false))
([^Channel channel ^long delivery-tag multiple]
(.basicAck channel delivery-tag multiple)))
(defn reject
- "Rejects (and, optionally, requeues) a messages using basic.reject AMQP method"
+ "Rejects (and, optionally, requeues) a messages using basic.reject AMQP 0.9.1 method"
([^Channel channel ^long delivery-tag]
(.basicReject channel delivery-tag false))
([^Channel channel ^long delivery-tag ^Boolean requeue]
(.basicReject channel delivery-tag requeue)))
(defn nack
- "Negative acknowledgement of one or more messages using basic.nack AMQP methods (a RabbitMQ extension to AMQP 0.9.1"
+ "Negative acknowledgement of one or more messages using basic.nack AMQP 0.9.1 methods (a RabbitMQ-specific extension)"
[^Channel channel ^long delivery-tag multiple ^Boolean requeue]
(.basicNack channel delivery-tag multiple requeue))
(defn recover
- "Notifies RabbitMQ that it needs to redeliver unacknowledged messages using basic.recover AMQP method"
+ "Notifies RabbitMQ that it needs to redeliver unacknowledged messages using basic.recover AMQP 0.9.1 method"
([^Channel channel]
(.basicRecover channel))
([^Channel channel ^Boolean requeue]

0 comments on commit d3555f2

Please sign in to comment.