forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.clj
309 lines (271 loc) · 11.1 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
(ns riemann.core
"Binds together an index, servers, and streams."
(:use [riemann.time :only [unix-time]]
[riemann.common :only [deprecated localhost event]]
clojure.tools.logging
[riemann.instrumentation :only [Instrumented]])
(:require riemann.streams
[riemann.service :as service :refer [Service ServiceEquiv]]
[riemann.index :as index :refer [Index]]
[riemann.pubsub :as ps]
[riemann.test :as test]
[riemann.instrumentation :as instrumentation]
clojure.set))
(defn stream!
"Applies an event to the streams in this core."
[core event]
(instrumentation/measure-latency (:streaming-metric core)
(doseq [stream (:streams core)]
(stream event))))
(defn core-services
"All services in a core--both the :services list and named services like the
index."
[core]
(remove nil?
(concat [(:index core)
(:pubsub core)]
(:services core))))
(defn instrumentation-service
"Returns a service which samples instrumented services in its core every
interval seconds, and sends their events to the core itself."
[opts]
(let [interval (long (* 1000 (get opts :interval 10)))
enabled? (if-not test/*testing*
(get opts :enabled? true)
false)]
(service/thread-service
::instrumentation [interval enabled?]
(fn measure [core]
(Thread/sleep interval)
(try
; Take events from core and instrumented services
(let [base (event {:host (localhost)
; Default TTL of 2 intervals, and convert ms to s.
:ttl (long (/ interval 500))})
events (mapcat instrumentation/events
(concat
[core
; lol circular deps
(deref (find-var 'riemann.transport/instrumentation))]
(filter instrumentation/instrumented?
(core-services core))))]
(if enabled?
; Stream each event through this core
(doseq [event events]
(stream! core (merge base event)))
; Ensure we consume all events, to avoid overflowing stats
(dorun events)))
(catch Exception e
(warn e "instrumentation service caught")))))))
(defrecord Core
[streams services index pubsub streaming-metric]
Instrumented
(events [this]
(instrumentation/events streaming-metric)))
(defn core
"Create a new core."
[]
(Core. []
[(instrumentation-service {})]
nil
(ps/pubsub-registry)
(instrumentation/rate+latency {:service "streams"
:tags ["riemann"]})))
(defn conj-service
"Adds a service to a core. Throws if any existing services would conflict. If
force? is passed, dissoc's any conflicting services."
([core service] (conj-service core service false))
([core service force?]
(if force?
; Remove conflicts and conj service
(assoc core :services
(conj (remove #(service/conflict? service %)
(:services core))
service))
; Throw if conflicts arise
(let [conflicts (filter #(service/conflict? service %)
(core-services core))]
(when-not (empty? conflicts)
(throw (IllegalArgumentException.
(binding [*print-level* 3]
(str "won't conj service: " (pr-str service)
" would conflict with " (pr-str conflicts))))))
(update-in core [:services] conj service)))))
(defn merge-cores
"Merge cores old-core and new-core into a new core comprised of services from
:new-core or their equivalents from :old-core where possible."
[old-core new-core]
(let [merged-services (map (fn [svc]
(or (first (filter #(service/equiv? % svc)
(:services old-core)))
svc))
(:services new-core))
merged (-> new-core
map->Core
(assoc :streaming-metric (or (:streaming-metric new-core)
(:streaming-metric old-core)))
(assoc :index (when (:index new-core)
(if (service/equiv? (:index new-core)
(:index old-core))
(:index old-core)
(:index new-core))))
(assoc :pubsub (when (:pubsub new-core)
(if (service/equiv? (:pubsub new-core)
(:pubsub old-core))
(:pubsub old-core)
(:pubsub new-core))))
(assoc :services merged-services))]
merged))
(defn transition!
"A core transition \"merges\" one core into another. Cores are immutable,
but the stateful resources associated with them aren't. When you call
(transition! old-core new-core), we:
1. Stop old core services without an equivalent in the new core.
2. Merge the new core's services with equivalents from the old core.
3. Reload all services with the merged core.
4. Start all services in the merged core.
Finally, we return the merged core. old-core and new-core can be discarded."
[old-core new-core]
(let [merged (merge-cores old-core new-core)
old-services (set (core-services old-core))
merged-services (set (core-services merged))]
; Stop old services
(dorun (pmap service/stop!
(clojure.set/difference old-services merged-services)))
; Reload merged services
(dorun (pmap #(service/reload! % merged) merged-services))
; Start merged services
(dorun (pmap service/start! merged-services))
(info "Hyperspace core online")
merged))
(defn start!
"Start the given core. Reloads and starts all services."
[core]
(let [services (core-services core)]
(dorun (pmap #(service/reload! % core) services))
(dorun (pmap service/start! services)))
(info "Hyperspace core online"))
(defn stop!
"Stops the given core and all services."
[core]
(info "Core stopping")
(dorun (pmap service/stop! (core-services core)))
(info "Hyperspace core shut down"))
(defn update-index
"Updates this core's index with an event."
[core event]
(deprecated "update-index is redundant; wrap-index provides pubsub
integration now."
((:index core) event)))
; Provides an accessor for a source index
(defprotocol WrappedIndex
(source [this]))
(defn- unwrap
"Get the inner index of a wrapped index."
[^riemann.core.WrappedIndex wrapped]
(source wrapped))
(defn wrap-index
"Wraps an index, exposing the normal Index and IFn protocols. If a second
argument is present it should implement the PubSub interface and will be
notified when events are updated in the index."
([source]
(wrap-index source nil))
([source registry]
(reify
Object
(equals [this other]
(= source (unwrap other)))
WrappedIndex
(source [this]
source)
Index
(clear [this]
(index/clear source))
(delete [this event]
(index/delete source event))
(delete-exactly [this event]
(index/delete-exactly source event))
(expire [this]
(index/expire source))
(search [this query-ast]
(index/search source query-ast))
(insert [this event]
(when-not (:time event)
(throw (ex-info "cannot index event with no time"
{:event event})))
(index/insert source event)
(when registry
(ps/publish! registry "index" event)))
(lookup [this host service]
(index/lookup source host service))
clojure.lang.Seqable
(seq [this]
(seq source))
Instrumented
(instrumentation/events [this]
(instrumentation/events source))
ServiceEquiv
(equiv? [this other]
(and (satisfies? WrappedIndex other)
(service/equiv? source (unwrap other))))
Service
(conflict? [this other]
(service/conflict? source (unwrap other)))
(reload! [this new-core]
(service/reload! source new-core))
(start! [this]
(service/start! source))
(stop! [this]
(service/stop! source))
clojure.lang.IFn
(invoke [this event]
(index/insert this event)))))
(defn delete-from-index
"Deletes similar events from the index. By default, deletes events with the
same host and service. If a field, or a list of fields, is given, deletes any
events with matching values for all of those fields.
; Delete all events in the index with the same host
(delete-from-index index :host event)
; Delete all events in the index with the same host and state.
(delete-from-index index [:host :state] event)"
([core event]
(index/delete (:index core) event))
([core fields event]
(let [match-fn (if (coll? fields) (apply juxt fields) fields)
match (match-fn event)
index (:index core)]
(doseq [event (filter #(= match (match-fn %)) index)]
(index/delete-exactly index event)))))
(defn reaper
"Returns a service which expires states from its core's index every interval
(default 10) seconds. Expired events are streamed to the core's streams. The
streamed states have only the host and service copied, current time, and
state expired. Expired events from the index are also published to the
\"index\" pubsub channel.
Options:
:keep-keys A list of event keys which should be preserved from the indexed
event in the expired event. Defaults to [:host :service], which
means that when an event expires, its :host and :service are
copied to a new event, but no other keys are preserved.
If :keep-keys is :all, all event keys will be preserved.
The state of an expired event is always \"expired\", and its time
is always the time that the event expired."
([] (reaper 10))
([interval] (reaper interval {}))
([interval opts]
(let [interval (or interval 10)
keep-keys (get opts :keep-keys [:host :service])]
(service/scheduled-task-service ::reaper keep-keys interval interval
(fn worker [core]
(when-let [i (:index core)]
(doseq [state (index/expire i)]
(try
(let [filtered-state (if (= keep-keys :all) state (select-keys state keep-keys))
e (-> filtered-state
(merge {:state "expired"
:time (unix-time)}))]
(when-let [registry (:pubsub core)]
(ps/publish! registry "index" e))
(stream! core e))
(catch Exception e
(warn e "Caught exception while processing expired events"))))))))))