-
Notifications
You must be signed in to change notification settings - Fork 0
/
listeners.clj
94 lines (86 loc) · 3.48 KB
/
listeners.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
(ns monkey.ci.listeners
(:require [clojure.tools.logging :as log]
[com.stuartsierra.component :as co]
[manifold.stream :as ms]
[monkey.ci
[runtime :as rt]
[storage :as st]
[utils :as u]]
[monkey.ci.events.core :as ec]))
(defn- save-build
"Saves the build in storage, then returns it"
[storage build]
(st/save-build storage build)
build)
(defn update-build [storage {:keys [sid build]}]
(log/debug "Updating build:" sid)
(let [existing (st/find-build storage sid)]
(-> (save-build storage
(-> (merge existing (dissoc build :script))
(dissoc :sid :cleanup?)))
(assoc :sid sid))))
(defn update-script [storage {:keys [sid script]}]
(log/debug "Updating build script for sid" sid)
(if-let [build (st/find-build storage sid)]
(let [orig (get-in build [:script :jobs])]
(save-build storage
(assoc build
:script (u/deep-merge (:script build) script))))
(log/warn "Build not found when updating script:" sid)))
(defn update-job [storage {:keys [sid job]}]
(let [job-id (:id job)]
(log/debug "Updating job for sid" sid ":" job-id)
(if-let [build (st/find-build storage sid)]
(save-build storage (assoc-in build [:script :jobs job-id] job))
(log/warn "Build not found when updating job:" sid))))
(def update-handlers
{:job/start update-job
:job/end update-job
:script/start update-script
:script/end update-script
:build/start update-build
:build/end update-build})
(defn build-update-handler
"Handles a build update event. Because many events may come in close proximity,
we need to queue them to avoid losing data."
[storage events]
(let [stream (ms/stream 10)]
;; Naive implementation: process them in sequence. This does not look
;; to the sid for optimization, so it could be faster.
(ms/consume (fn [evt]
(when-let [h (get update-handlers (:type evt))]
(log/debug "Handling:" evt)
(try
(when-let [build (h storage evt)]
;; Dispatch consolidated build updated event
(ec/post-events events
{:type :build/updated
:sid (:sid build)
:build build}))
(catch Exception ex
;; TODO Handle this better
(log/error "Unable to handle event" ex)))))
stream)
(fn [evt]
(ms/put! stream evt)
nil)))
(defrecord Listeners [events storage]
co/Lifecycle
(start [this]
(if (every? nil? ((juxt :event-filter :handler) this))
(let [ef {:types (set (keys update-handlers))}
handler (build-update-handler storage events)]
;; Register listeners
(ec/add-listener events ef handler)
(assoc this :event-filter ef :handler handler))
;; If already registered, do nothing
this))
(stop [{:keys [event-filter handler] :as this}]
(ec/remove-listener events event-filter handler)
(dissoc this :event-filter :handler)))
(defmethod rt/setup-runtime :listeners [conf _]
(when (and (= :server (:app-mode conf))
(every? conf [:events :storage]))
(log/debug "Setting up storage event listeners")
(-> (map->Listeners {})
(co/using [:events :storage]))))