-
Notifications
You must be signed in to change notification settings - Fork 5
/
channel.clj
109 lines (87 loc) · 2.85 KB
/
channel.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
(ns bunnicula.client.rabbitmq.channel
(:import
(com.rabbitmq.client
AMQP$BasicProperties$Builder
Channel
Connection)))
(def default-routing-key "#")
(defn open? [channel]
(and channel (.isOpen ^Channel channel)))
(defn create
([connection]
(create connection nil))
([connection prefetch-count]
(let [ch (.createChannel ^Connection connection)]
(when prefetch-count
(.basicQos ^Channel ch (int prefetch-count)))
ch)))
(defn close [^Channel channel]
(if (.isOpen ^Channel channel)
(.close ^Channel channel)
channel))
(defn- publish-properties
[{:keys [attempt persistent expiration]}]
(let [basic (-> (AMQP$BasicProperties$Builder.)
(.deliveryMode (Integer/valueOf (if persistent 2 1))))
prop (cond-> basic
attempt (.headers {"retry-attempts" attempt})
expiration (.expiration (str expiration)))]
(.build prop)))
(defn publish-message
"Publish message bytes using provided channel.
Publish to given exchange & routing-key.
Options
- mandatory (flag to ensure message delivery)
- persistent? (flag to write message to disk)
- attempt (set 'retry-attempts' header in message)
- expiration (set message expiration)"
[^Channel channel
{:keys [exchange routing-key body attempt options]}]
(let [{:keys [mandatory immediate expiration persistent]
:or {mandatory false immediate false persistent true}} options
properties (publish-properties {:expiration expiration
:attempt attempt
:persistent persistent})]
(.basicPublish
channel
exchange
(or routing-key default-routing-key)
mandatory
immediate
properties
body)))
(defn ack-message
"ACK the given message."
[^Channel channel delivery-tag]
(.basicAck channel delivery-tag false))
(defn nack-message
"NACK the given message (message is requeued)!"
[^Channel channel delivery-tag]
(.basicNack channel delivery-tag false true))
(defn declare-exchange
[^Channel channel {:keys [options name type]}]
(let [{:keys [auto-delete durable internal]} options]
(.exchangeDeclare
channel
^String name
^String type
(boolean durable)
(boolean auto-delete)
(boolean internal)
nil)))
(defn declare-queue
[^Channel channel {:keys [name options]}]
(let [{:keys [auto-delete durable exclusive arguments]} options]
(.queueDeclare
channel
name
(boolean durable)
(boolean exclusive)
(boolean auto-delete)
arguments)))
(defn bind-queue
"Bind queue to exchange
- skip for default exchange ''"
[channel {:keys [queue exchange routing-key]}]
(when-not (empty? exchange)
(.queueBind ^Channel channel queue exchange (or routing-key default-routing-key) nil)))