forked from riemann/riemann
-
Notifications
You must be signed in to change notification settings - Fork 0
/
index.clj
136 lines (116 loc) · 4.39 KB
/
index.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
(ns riemann.index
"Maintains a stateful index of events by [host, service] key. Can be queried
to return the most recent indexed events matching some expression. Can expire
events which have exceeded their TTL. Presently the only implementation of
the index protocol is backed by a nonblockinghashmap, but I plan to add an
HSQLDB backend as well.
Indexes must extend three protocols:
- Index: indexing and querying events
- Seqable: returning a list of events
- Service: lifecycle management"
(:refer-clojure :exclude [update])
(:require [riemann.query :as query]
[riemann.common :refer [deprecated]]
[riemann.instrumentation :refer [Instrumented]]
[clojure.tools.logging :refer [error]])
(:use [riemann.time :only [unix-time]]
riemann.service)
(:import (org.cliffc.high_scale_lib NonBlockingHashMap)))
(defprotocol Index
(clear [this]
"Resets the index")
(delete [this event]
"Deletes any event with this host & service from index. Returns the deleted
event, or nil.")
(delete-exactly [this event]
"Deletes event from index. Returns the deleted event, or nil.")
(expire [this]
"Return a seq of expired states from this index, removing each.")
(search [this query-ast]
"Returns a seq of events from the index matching this query AST")
(insert [this event]
"Updates index with event")
(lookup [this host service]
"Lookup an indexed event from the index"))
; The index accepts states and maintains a table of the most recent state for
; each unique [host, service]. It can be searched for states matching a query.
(def default-ttl 60)
(defn query-for-host-and-service
"Check if the AST is only searching for the host and service"
[query-ast]
(if (and (list? query-ast)
(= 'and (first query-ast)))
(let [and-exprs (rest query-ast)]
(if (and (= 2 (count and-exprs))
(every? list? and-exprs)
(= 2 (count (filter #(= (first %) '=) and-exprs))))
(let [host (first (filter #(= (second %) :host) and-exprs))
service (first (filter #(= (second %) :service) and-exprs))]
(if (and host service)
[(last host) (last service)]))))))
(defn nbhm-index
"Create a new nonblockinghashmap backed index"
[]
(let [hm (NonBlockingHashMap.)]
(reify
Index
(clear [this]
(.clear hm))
(delete [this event]
(.remove hm [(:host event) (:service event)]))
(delete-exactly [this event]
(.remove hm [(:host event) (:service event)] event))
(expire [this]
(filter
(fn [event]
(try
(let [age (- (unix-time) (:time event))
ttl (or (:ttl event) default-ttl)]
(when (< ttl age)
(delete-exactly this event)
true))
(catch Exception e
(do (error e "Caught exception while trying to expire this event"
event)
(delete-exactly this event)
false))))
(.values hm)))
(search [this query-ast]
"O(n) unless the query is for exactly a host and service"
(if-let [[host service] (query-for-host-and-service query-ast)]
(when-let [e (.lookup this host service)]
(list e))
(let [matching (query/fun query-ast)]
(filter matching (.values hm)))))
(insert [this event]
(if (= "expired" (:state event))
(delete this event)
(.put hm [(:host event) (:service event)] event)))
(lookup [this host service]
(.get hm [host service]))
Instrumented
(events [this]
(let [base {:state "ok"
:time (unix-time)
:tags ["riemann"]}]
(map (partial merge base)
[{:service "riemann index size"
:metric (.size hm)}])))
clojure.lang.Seqable
(seq [this]
(seq (.values hm)))
ServiceEquiv
(equiv? [this other] (= (class this) (class other)))
Service
(conflict? [this other] false)
(reload! [this new-core])
(start! [this])
(stop! [this]))))
(defn index
"Create a new index (currently: an nhbm index)"
[]
(nbhm-index))
(defn update
[index-instance event]
(deprecated "Update is now a reserved name in clojure, please use insert!"
(insert index-instance event)))