forked from rnewman/clojure-rabbitmq
/
rabbitmq.clj
98 lines (83 loc) · 2.59 KB
/
rabbitmq.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
(set! *warn-on-reflection* true)
(ns com.github.icylisper.rabbitmq
(:gen-class)
(:import (com.rabbitmq.client
ConnectionParameters
Connection
Channel
AMQP
ConnectionFactory
Consumer
QueueingConsumer)))
;; abbreviatons:
;; ch - channel
;; c - connection-map
;; q - queue
;; m - message
;; d - delivery
(defn connected? [cm]
true)
(defn connect [{:keys [username password virtual-host port
#^String host]}]
(let [#^ConnectionParameters params
(doto (new ConnectionParameters)
(.setUsername username)
(.setPassword password)
(.setVirtualHost virtual-host)
(.setRequestedHeartbeat 0))
#^Connection conn
(let [#^ConnectionFactory f
(new ConnectionFactory params)]
(.newConnection f host (int port)))]
[conn (.createChannel conn)]))
(defn bind-channel [{:keys [exchange type queue routing-key]}
#^Channel ch]
(.exchangeDeclare ch exchange type)
(.queueDeclare ch queue)
(.queueBind ch queue exchange routing-key))
(defn publish [{:keys [exchange routing-key]}
#^Channel ch
#^String m]
(let [msg-bytes (.getBytes m)]
(.basicPublish ch exchange routing-key nil msg-bytes)))
(defn disconnect [#^Channel ch
#^Connection conn]
(.close ch)
(.close conn))
;;;; AMPQ Queue as a sequence
(defn delivery-seq [#^Channel ch
#^QueueingConsumer q]
(lazy-seq
(let [d (.nextDelivery q)
m (String. (.getBody d))]
(.basicAck ch (.. d getEnvelope getDeliveryTag) false)
(cons m (delivery-seq ch q)))))
(defn queue-seq
"Return a sequence of the messages in queue with name queue-name"
([#^Channel ch
{q :queue}]
(.queueDeclare ch q)
(let [consumer (QueueingConsumer. ch)]
(.basicConsume ch q consumer)
(delivery-seq ch consumer)))
([conn
#^Channel ch
c]
(queue-seq ch c)))
;;; consumer routines
(defn consume-wait [c #^Channel ch]
(let [consumer (QueueingConsumer. ch)]
(.queueDeclare ch (:queue c))
(.basicConsume ch (:queue c) false consumer)
(while true
(let [d (.nextDelivery consumer)
m (String. (.getBody d))]
(.basicAck ch (.. d getEnvelope getDeliveryTag) false)
m))))
(defn consume-poll [c #^Channel ch]
(let [consumer (QueueingConsumer. ch)]
(.queueDeclare ch (:queue c))
(.basicConsume ch (:queue c) false consumer)
(let [d (.nextDelivery consumer)
m (String. (.getBody d))]
m)))