-
Notifications
You must be signed in to change notification settings - Fork 0
/
jms.clj
107 lines (91 loc) · 3.25 KB
/
jms.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
(ns monkey.ci.events.jms
"Uses JMS (with bowerick) to connect to an event broker. Can also
starts its own broker server, although this is mostly meant for
development and testing purposes."
(:require [bowerick.jms :as jms]
[clojure.java.io :as io]
[clojure.tools.logging :as log]
[com.stuartsierra.component :as co]
[medley.core :as mc]
[monkey.ci
[protocols :as p]
[utils :as u]]))
(def jms-ns 'bowerick.jms)
(defmacro with-logs [& body]
;; Bowerick uses println to log, so we need to redirect it here.
;; Don't use this with bodies that use regular logs because when
;; that prints to stdout, it will stack overflow.
;; Also, for some strange reason, it prints regular logs to stderr and
;; errors to stdout *facepalm*...
`(log/with-logs [jms-ns :debug :debug] ~@body))
(defn- maybe-start-broker [{:keys [enabled url]}]
(when enabled
(with-logs
(jms/start-broker url))))
(defn- wrap-creds [{:keys [username password]} h]
(if (and username password)
(binding [jms/*user-name* username
jms/*user-password* password]
(h))
(h)))
(defn- make-producer [{:keys [url dest] :as conf}]
;; TODO Auto reconnect
(with-logs
(wrap-creds
conf
#(jms/create-producer url dest 1 (comp (memfn getBytes) pr-str)))))
(defn filtering-listener [pred l]
(fn [evt]
(when (pred evt) (l evt))))
(defn- remove-listener [listeners l]
(remove (comp (partial = l) :orig) listeners))
(defn- event-handler
"Invoked when an event is received. Dispatches event to all listeners."
[listeners]
(fn [evt]
(log/debug "Event received:" evt)
(doseq [l @listeners]
((:listener l) evt))))
(defn- parse-edn [buf]
(with-open [r (io/reader buf)]
(u/parse-edn r)))
(defn- make-consumer [{:keys [url dest] :as conf} listeners]
;; TODO Auto reconnect
(with-logs
(wrap-creds
conf
#(jms/create-consumer url dest (event-handler listeners) 1 parse-edn))))
(defrecord JmsEvents [config matches-event? listeners broker producer consumer]
p/EventPoster
(post-events [this events]
(let [events (if (sequential? events) events [events])]
(doseq [evt events]
(producer evt))
this))
p/EventReceiver
(add-listener [this ef l]
(swap! listeners conj {:orig l
:listener (filtering-listener #(matches-event? % ef) l)})
this)
(remove-listener [this ef l]
(swap! listeners remove-listener l)
this)
co/Lifecycle
(start [this]
(-> this
(mc/assoc-some :broker (maybe-start-broker (:server config)))
;; Note that this opens 2 connections to the broker and also 3 sessions
;; per connection. We may want to replace this in the future.
(assoc :producer (make-producer (:client config))
:consumer (make-consumer (:client config) listeners))))
(stop [{:keys [broker producer consumer] :as this}]
(with-logs
(when producer
(jms/close producer))
(when consumer
(jms/close consumer))
(when broker
(jms/stop broker)))
(assoc this :broker nil :producer nil :consumer nil)))
(defn make-jms-events [config matches-event?]
(->JmsEvents config matches-event? (atom []) nil nil nil))