-
Notifications
You must be signed in to change notification settings - Fork 160
/
bus.clj
58 lines (47 loc) · 1.74 KB
/
bus.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
(ns crux.bus
(:refer-clojure :exclude [send])
(:require [crux.io :as cio]
[clojure.tools.logging :as log]
[clojure.spec.alpha :as s])
(:import [java.io Closeable]
[java.util.concurrent ExecutorService Executors TimeUnit]))
(defprotocol EventSource
(listen [_ f] [_ listen-ops f]))
(defprotocol EventSink
(send [_ event]))
(s/def ::event-type keyword?)
(s/def ::event-types (s/coll-of ::event-type :kind set?))
(defmulti event-spec ::event-type, :default ::default)
(defmethod event-spec ::default [_] any?)
(s/def ::event (s/and (s/keys :req [::event-type])
(s/multi-spec event-spec ::event-type)))
(defrecord EventBus [!listeners]
EventSource
(listen [this listen-ops f]
(let [{::keys [event-types]} listen-ops]
(swap! !listeners
conj {:executor (Executors/newSingleThreadExecutor (cio/thread-factory "bus-listener"))
:f f
::event-types event-types})
nil))
(listen [this f]
(listen this {} f))
EventSink
(send [_ {::keys [event-type] :as event}]
(s/assert ::event event)
(doseq [{:keys [^ExecutorService executor f ::event-types]} @!listeners]
(when (or (nil? event-types) (contains? event-types event-type))
(.submit executor ^Runnable #(f event)))))
Closeable
(close [_]
(doseq [{:keys [^ExecutorService executor]} @!listeners]
(try
(.shutdown executor)
(or (.awaitTermination executor 5 TimeUnit/SECONDS)
(.shutdownNow executor)
(log/warn "event bus listener not shut down after 5s"))
(catch Exception e
(log/error e "error closing listener"))))))
(def bus
{:start-fn (fn [deps args]
(->EventBus (atom #{})))})