-
Notifications
You must be signed in to change notification settings - Fork 161
/
indexer.clj
45 lines (38 loc) · 1.64 KB
/
indexer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
(ns crux.metrics.indexer
(:require [crux.bus :as bus]
[crux.api :as api]
[crux.tx :as tx]
[crux.metrics.dropwizard :as dropwizard]))
(defn assign-tx-id-lag [registry {:crux.node/keys [node]}]
(dropwizard/gauge registry
["indexer" "tx-id-lag"]
#(when-let [completed (api/latest-completed-tx node)]
(- (::tx/tx-id (api/latest-submitted-tx node))
(::tx/tx-id completed)))))
(defn assign-doc-meter [registry {:crux.node/keys [bus]}]
(let [meter (dropwizard/meter registry ["indexer" "indexed-docs"])]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-docs}}
(fn [{:keys [doc-ids]}]
(dropwizard/mark! meter (count doc-ids))))
meter))
(defn assign-tx-timer [registry {:crux.node/keys [bus]}]
(let [timer (dropwizard/timer registry ["indexer" "indexed-txs"])
!timer (atom nil)]
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexing-tx}}
(fn [_]
(reset! !timer (dropwizard/start timer))))
(bus/listen bus
{:crux.bus/event-types #{:crux.tx/indexed-tx}}
(fn [_]
(let [[ctx _] (reset-vals! !timer nil)]
(dropwizard/stop ctx))))
timer))
(defn assign-listeners
"Assigns listeners to an event bus for a given node.
Returns an atom containing uptading metrics"
[registry deps]
{:tx-id-lag (assign-tx-id-lag registry deps)
:docs-ingest-meter (assign-doc-meter registry deps)
:tx-ingest-timer (assign-tx-timer registry deps)})