-
Notifications
You must be signed in to change notification settings - Fork 153
/
redis.clj
40 lines (35 loc) · 1.12 KB
/
redis.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
(ns birdwatch-tc.interop.redis
(:gen-class)
(:require
[clojure.tools.logging :as log]
[clojure.pprint :as pp]
[clojure.core.match :as match :refer (match)]
[taoensso.carmine :as car :refer (wcar)]
[clojure.core.async :as async :refer [<! put! go-loop]]))
(defn run-send-loop
"loop for sending items by publishing them on a Redis pub topic"
[send-chan conn topic]
(go-loop [] (let [msg (<! send-chan)]
(car/wcar conn (car/publish topic msg))
(recur))))
(defn- msg-handler-fn
"create handler function for messages from Redis Pub/Sub"
[receive-chan]
(fn [[msg-type topic payload]]
(when (= msg-type "message")
(put! receive-chan payload))))
(defn subscribe-topic
"subscribe to topic, put items on specified channel"
[receive-chan conn topic]
(car/with-new-pubsub-listener
(:spec conn)
{"matches" (msg-handler-fn receive-chan)}
(car/subscribe topic)))
(defn unsubscribe
"unsubscribe listener from all topics"
[listener]
(car/with-open-listener listener (car/unsubscribe)))
(defn close
"close listener"
[listener]
(car/close-listener listener))