-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
54 lines (44 loc) · 1.89 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
(ns lupapiste-pubsub.core
(:require [lupapiste-pubsub.protocol :as pubsub]
[lupapiste-pubsub.publisher-util :as pub-util]
[lupapiste-pubsub.subscriber-util :as sub-util]
[taoensso.timbre :as timbre])
(:import [com.google.cloud.pubsub.v1 Publisher Subscriber]))
(defrecord PubSubClient [config *publishers *subscribers]
pubsub/MessageQueueClient
(publish [this topic-name message]
(-> (pubsub/get-publisher this topic-name)
(pub-util/publish this message)))
(subscribe [this topic-name handler]
(pubsub/subscribe this topic-name handler nil))
(subscribe [_ topic-name handler additional-config]
(when-not (get @*subscribers topic-name)
(->> (sub-util/build-subscriber (merge config additional-config) topic-name handler)
(swap! *subscribers assoc topic-name))))
(get-publisher [_ topic-name]
(if topic-name
(or (get @*publishers topic-name)
(let [publisher (pub-util/build-publisher config topic-name)]
(swap! *publishers assoc topic-name publisher)
publisher))
(throw (IllegalArgumentException. "No topic-name provided for publisher"))))
(stop-subscriber [_ topic-name]
(some-> (get @*subscribers topic-name) sub-util/stop-subscriber))
(remove-subscription [_ topic-name]
(some-> (get @*subscribers topic-name) sub-util/stop-subscriber)
(sub-util/delete-subscription config topic-name))
(halt [_]
(timbre/info "Tearing down publishers")
(->> @*publishers
(pmap (fn [[_ ^Publisher pub]]
(pub-util/shutdown-publisher pub)))
dorun)
(reset! *publishers {})
(timbre/info "Tearing down subscribers")
(->> @*subscribers
(pmap (fn [[_ ^Subscriber sub]]
(sub-util/stop-subscriber sub)))
dorun)
(reset! *subscribers {})))
(defn init [config]
(->PubSubClient config (atom {}) (atom {})))