-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
events.clj
119 lines (95 loc) · 4.51 KB
/
events.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
(ns metabase.events
"Provides a very simply event bus using `core.async` to allow publishing and subscribing to interesting topics
happening throughout the Metabase system in a decoupled way.
## Regarding Events Initialization:
The most appropriate way to initialize event listeners in any `metabase.events.*` namespace is to implement the
`events-init` function which accepts zero arguments. This function is dynamically resolved and called exactly once
when the application goes through normal startup procedures. Inside this function you can do any work needed and add
your events subscribers to the bus as usual via `start-event-listener!`."
(:require [clojure.core.async :as async]
[clojure.string :as str]
[clojure.tools.logging :as log]
[metabase
[config :as config]
[util :as u]]
[metabase.util.i18n :refer [trs]]))
;;; --------------------------------------------------- LIFECYCLE ----------------------------------------------------
(defonce ^:private events-initialized?
(atom nil))
(defn- find-and-load-event-handlers!
"Search Classpath for namespaces that start with `metabase.events.`, and call their `events-init` function if it
exists."
[]
(when-not config/is-test?
(doseq [ns-symb @u/metabase-namespace-symbols
:when (.startsWith (name ns-symb) "metabase.events.")]
(require ns-symb)
;; look for `events-init` function in the namespace and call it if it exists
(when-let [init-fn (ns-resolve ns-symb 'events-init)]
(log/info (trs "Starting events listener:") (u/format-color 'blue ns-symb) (u/emoji "👂"))
(init-fn)))))
(defn initialize-events!
"Initialize the asynchronous internal events system."
[]
(when-not @events-initialized?
(find-and-load-event-handlers!)
(reset! events-initialized? true)))
;;; -------------------------------------------------- PUBLICATION ---------------------------------------------------
(def ^:private events-channel
"Channel to host events publications."
(async/chan))
(def ^:private events-publication
"Publication for general events channel. Expects a map as input and the map must have a `:topic` key."
(async/pub events-channel :topic))
(defn publish-event!
"Publish an item into the events stream. Returns the published item to allow for chaining."
{:style/indent 1}
[topic event-item]
{:pre [(keyword topic)]}
(async/go (async/>! events-channel {:topic (keyword topic), :item event-item}))
event-item)
;;; -------------------------------------------------- SUBSCRIPTION --------------------------------------------------
(defn- subscribe-to-topic!
"Subscribe to a given topic of the general events stream. Expects a topic to subscribe to and a `core.async` channel.
Returns the channel to allow for chaining."
[topic channel]
{:pre [(keyword topic)]}
(async/sub events-publication (keyword topic) channel)
channel)
(defn- subscribe-to-topics!
"Convenience method for subscribing to a series of topics against a single channel."
[topics channel]
{:pre [(coll? topics)]}
(doseq [topic topics]
(subscribe-to-topic! topic channel)))
(defn start-event-listener!
"Initialize an event listener which runs on a background thread via `go-loop`."
[topics channel handler-fn]
{:pre [(seq topics) (fn? handler-fn)]}
;; create the core.async subscription for each of our topics
(subscribe-to-topics! topics channel)
;; start listening for events we care about and do something with them
(async/go-loop []
;; try/catch here to get possible exceptions thrown by core.async trying to read from the channel
(try
(handler-fn (async/<! channel))
(catch Throwable e
(log/error e (trs "Unexpected error listening on events"))))
(recur)))
;;; ------------------------------------------------ HELPER FUNCTIONS ------------------------------------------------
(defn topic->model
"Determine a valid `model` identifier for the given TOPIC."
[topic]
;; just take the first part of the topic name after splitting on dashes.
(first (str/split (name topic) #"-")))
(defn object->model-id
"Determine the appropriate `model_id` (if possible) for a given OBJECT."
[topic object]
(if (contains? (set (keys object)) :id)
(:id object)
(let [model (topic->model topic)]
(get object (keyword (format "%s_id" model))))))
(defn object->user-id
"Determine the appropriate `user_id` (if possible) for a given OBJECT."
[object]
(or (:actor_id object) (:user_id object) (:creator_id object)))