-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
169 lines (143 loc) · 5.15 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
(ns monkey.ci.events.core
(:require [clojure.tools.logging :as log]
[manifold.deferred :as md]
[monkey.ci
[config :as c]
[protocols :as p]
[runtime :as rt]
[time :as t]]
[monkey.ci.events
[jms :as jms]
[manifold :as manifold]
[zmq :as zmq]]))
(defn post-events [e evts]
(if e
(p/post-events e evts)
(log/warn "Unable to post, no events configured")))
(def add-listener p/add-listener)
(def remove-listener p/remove-listener)
(defn make-event
"Creates a new event with required properties. Additional properties are given as
map keyvals, or as a single map."
[type & props]
(-> (if (= 1 (count props))
(first props)
(apply hash-map props))
(assoc :type type
:time (t/now))))
(defn invoke-listeners [filter-fn listeners events]
;; Find all listeners where the filter and event are matched by the filter-fn
(doseq [[ef handlers] listeners]
(->> events
(filter #(filter-fn % ef))
(mapcat (fn [evt]
(doseq [h handlers]
(h evt))))
(doall))))
;; Simple in-memory implementation, useful for testing
(deftype SyncEvents [filter-fn listeners]
p/EventPoster
(post-events [this evt]
(invoke-listeners filter-fn @listeners (if (sequential? evt) evt [evt]))
this)
p/EventReceiver
(add-listener [this ef l]
(swap! listeners update ef (fnil conj []) l)
this)
(remove-listener [this ef l]
(swap! listeners update ef (partial remove (partial = l)))
this))
(defn matches-event?
"Matches events against event filters. This checks event types and possible sid."
[evt {:keys [types sid] :as ef}]
;; TODO Allow for more generic filter fn
(letfn [(matches-type? [evt]
(or (nil? types) (contains? types (:type evt))))
(matches-sid? [evt]
(or (nil? sid) (= sid (take (count sid) (:sid evt)))))]
(or (nil? ef)
((every-pred matches-type? matches-sid?) evt))))
(defn make-sync-events [filter-fn]
(->SyncEvents filter-fn (atom {})))
(defn ^:deprecated filter-type [t f]
(fn [evt]
(when (= t (:type evt))
(f evt))))
(defn ^:deprecated no-dispatch
"Wraps `f` so that it always returns `nil`, to avoid events being re-dispatched."
[f]
(fn [evt]
(f evt)
nil))
(defmethod c/normalize-key :events [k conf]
(update conf k (comp #(c/group-keys % :client)
#(c/group-keys % :server)
c/keywordize-type)))
(defmulti make-events :type)
(defmethod make-events :sync [_]
(make-sync-events matches-event?))
(defmethod make-events :jms [config]
(jms/make-jms-events config matches-event?))
(defmethod make-events :manifold [_]
(manifold/make-manifold-events matches-event?))
(defmethod make-events :zmq [config]
(zmq/make-zeromq-events config matches-event?))
(defmethod rt/setup-runtime :events [conf _]
(when-let [ec (:events conf)]
(make-events ec)))
(defn wrapped
"Returns a new function that wraps `f` and posts an event before
and after. The `before` fn just receives the same arguments as
`f`. The `after` fn one more, the return value of `f`. The first
argument is assumed to be the runtime, which is used to get the
event poster. The return values of `before` and `after` are posted
as events. Returns the return value of calling `f`.
Any of the event generating functions can be `nil`, in which case
it will be ignored."
[f before after & [error]]
(letfn [(maybe-post [f [rt :as args] & extras]
(when f
(when-let [e (apply f (concat args extras))]
(rt/post-events rt e))))
(inv [args]
(let [r (apply f args)]
(maybe-post after args r)
r))]
(fn [& args]
(maybe-post before args)
(if error
(try
(inv args)
(catch Exception ex
;; Post event and re-throw
(maybe-post error args ex)
(throw ex)))
(inv args)))))
(defn wait-for-event
"Utility fn that registers using an event filter and invokes the handler when one has
been received. Returns a deferred that realizes with the received event. An additional
predicate can do extra filtering if it's not supported by the event filter."
[events ef & [pred]]
(log/debug "Waiting for event to arrive that matches filter:" ef)
(let [r (md/deferred)
l (fn [evt]
(when (or (nil? pred) (pred evt))
(log/debug "Matching event has arrived for filter" ef ":" evt)
(md/success! r evt)))
unregister (fn []
(remove-listener events ef l))]
(add-listener events ef l)
;; Make sure to unregister the listener in any case
(md/finally r unregister)))
;;; Utility functions for building events
(defn make-result [status exit-code msg]
{:status status
:exit exit-code
:message msg})
(defn exception-result [ex]
(-> (make-result :error 1 (ex-message ex))
(assoc :exception ex)))
(defn set-result [evt r]
(assoc evt :result r))
(def result :result)
(def result-exit (comp :exit result))