-
Notifications
You must be signed in to change notification settings - Fork 23
/
switchboard.cljc
121 lines (111 loc) · 5.03 KB
/
switchboard.cljc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
(ns matthiasn.systems-toolbox.switchboard
(:require
[matthiasn.systems-toolbox.component :as comp]
[matthiasn.systems-toolbox.switchboard.route :as rt]
[matthiasn.systems-toolbox.switchboard.observe :as obs]
[matthiasn.systems-toolbox.switchboard.init :as i]
[matthiasn.systems-toolbox.component.helpers :as h]
#?(:clj [clojure.core.async :refer [put! chan pipe sub tap]]
:cljs [cljs.core.async :refer [put! chan pipe sub tap]])
#?(:clj [clojure.pprint :as pp]
:cljs [cljs.pprint :as pp])
#?(:clj [clojure.tools.logging :as l]
:cljs [matthiasn.systems-toolbox.log :as l])
#?(:clj [io.aviso.exception :as ex])
#?(:clj [clojure.spec.alpha :as s]
:cljs [cljs.spec.alpha :as s])))
(defn self-register
"Registers switchboard itself as another component that can be wired. Useful
for communication with the outside world / within hierarchies where a
subsystem has its own switchboard."
[{:keys [cmp-state msg-payload cmp-id]}]
(swap! cmp-state assoc-in [:components cmp-id] msg-payload)
(swap! cmp-state assoc-in [:switchboard-id] cmp-id)
{})
(defn mk-state [_put-fn]
{:state (atom {:components {}
:subs #{}
:taps #{}
:fh-taps #{}})})
(defn attach-to-firehose
"Attaches a component to firehose channel. For example for observational
components."
[{:keys [current-state msg-payload cmp-id]}]
(let [to msg-payload
sw-firehose-mult (:firehose-mult (cmp-id (:components current-state)))
to-comp (to (:components current-state))]
(try
(do
(tap sw-firehose-mult (:in-chan to-comp))
{:new-state (update-in current-state [:fh-taps] conj {:from cmp-id
:to to
:type :fh-tap})})
#?(:clj (catch Exception e
(l/error "Could not create tap: " cmp-id " -> " to " - "
(ex/format-exception e)))
:cljs (catch js/Object e (l/error "Could not create tap: " cmp-id
" -> " to " - " e))))))
(defn send-to
"Send message to specified component."
[{:keys [cmp-state msg-payload]}]
(let [{:keys [to msg]} msg-payload
dest-comp (to (:components @cmp-state))]
(put! (:in-chan dest-comp) msg))
{})
(defn wire-all-out-channels
"Function for calling the system-ready-fn on each component, which will pipe
the channel used by the put-fn to the out-chan when the system is connected.
Otherwise, messages sent before all channels are wired would get lost."
[{:keys [cmp-state]}]
(doseq [[_ cmp] (:components @cmp-state)]
((:system-ready-fn cmp))))
(def handler-map
{:cmd/route rt/route-handler
:cmd/route-all rt/route-all-handler
:cmd/wire-comp (i/wire-or-init-comp false)
:cmd/init-comp (i/wire-or-init-comp true)
:cmd/shutdown-all i/shutdown-all
:cmd/shutdown i/shutdown-cmp
:cmd/attach-to-firehose attach-to-firehose
:cmd/self-register self-register
:cmd/observe-state obs/observe-state
:cmd/send send-to
:status/system-ready wire-all-out-channels})
(defn xform-fn
"Transformer function for switchboard state snapshot. Allows serialization of
snapshot for sending, e.g. over WebSockets or other transports."
[m]
(update-in m [:components] (fn [cmps]
(into {} (mapv (fn [[k v]] [k k]) cmps)))))
(defn component
"Creates a switchboard component that wires individual components together
into a communicating system."
([switchboard-id]
(component switchboard-id {}))
([switchboard-id cmp-opts]
(let [system-info (merge {:system-id (str (h/make-uuid))
:node-type (str (h/make-uuid))
:node-id (str (h/make-uuid))}
(select-keys cmp-opts [:system-id :node-type :node-id]))
switchboard (comp/make-component
{:cmp-id switchboard-id
:state-fn mk-state
:handler-map handler-map
:system-info system-info
:state-spec :st.switchboard/state-spec
:opts (merge {:msgs-on-firehose false
:snapshots-on-firehose true}
cmp-opts)
:snapshot-xform-fn xform-fn})
sw-in-chan (:in-chan switchboard)]
(put! sw-in-chan [:cmd/self-register switchboard])
switchboard)))
(defn send-cmd
"Send message to the specified switchboard component."
[switchboard cmd]
(put! (:in-chan switchboard) cmd))
(defn send-mult-cmd
"Send messages to the specified switchboard component."
[switchboard cmds]
(doseq [cmd cmds] (when cmd (put! (:in-chan switchboard) cmd)))
(put! (:in-chan switchboard) [:status/system-ready]))