-
Notifications
You must be signed in to change notification settings - Fork 10
/
poll.clj
30 lines (27 loc) · 1.01 KB
/
poll.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
(ns ripley.live.poll
"A polling source."
(:require [clojure.core.async :as async :refer [go timeout]]
[ripley.live.protocols :as p]))
(defn poll-source [interval-ms poll-fn]
(let [listeners (atom #{})
close-ch (async/chan)
current-value (poll-fn)]
(go
(loop [old-value current-value
[_ port] (async/alts! [(timeout interval-ms) close-ch])]
(when (not= port close-ch)
(let [new-value (poll-fn)]
(if (not= old-value new-value)
(do
(doseq [listener @listeners]
(listener new-value))
(recur new-value (async/alts! [(timeout interval-ms) close-ch])))
(recur old-value (async/alts! [(timeout interval-ms) close-ch])))))))
(reify p/Source
(current-value [_] current-value)
(listen! [_ listener]
(swap! listeners conj listener)
#(swap! listeners disj listener))
(close! [_]
(reset! listeners #{})
(async/close! close-ch)))))