/
inventory.clj
182 lines (167 loc) · 10 KB
/
inventory.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
(ns puppetlabs.pcp.broker.inventory
(:require [puppetlabs.pcp.protocol :as p]
[puppetlabs.pcp.broker.shared :refer [Broker deliver-server-message] :as shared]
[puppetlabs.pcp.broker.message :as message]
[clojure.set :refer [intersection union]]
[schema.core :as s])
(:import [clojure.lang Numbers]
[puppetlabs.pcp.broker.connection Connection]))
(s/defn init-database :- shared/BrokerDatabase
[]
{:inventory {}
:warning-bin {}
:updates []
:first-update-index 0
:subscriptions {}})
(s/defn unchecked+ :- s/Int
"An addition which can overflow"
[a :- s/Int b :- s/Int]
(Numbers/unchecked_add (long a) (long b)))
(s/defn unchecked- :- s/Int
"A substraction which can overflow"
[a :- s/Int b :- s/Int]
(Numbers/unchecked_minus (long a) (long b)))
(s/defn build-pattern-sets :- shared/PatternSets
"Parse the passed patterns and split them into explicit and wildcard sets for faster matching."
[patterns :- [p/Uri]]
(loop [explicit (transient #{}) wildcard (transient #{}) patterns (seq patterns)]
(if patterns
(let [pattern (first patterns)]
(if-let [exploded-pattern (p/uri-wildcard? pattern)]
(recur explicit (conj! wildcard exploded-pattern) (next patterns))
(recur (conj! explicit pattern) wildcard (next patterns))))
{:explicit (persistent! explicit) :wildcard (persistent! wildcard)})))
(s/defn ^:private exploded-uri-pattern-match? :- s/Bool
"Does an exploded subject uri value match the exploded pattern? Here is where wildcards happen."
[exploded-pattern :- p/ExplodedUri exploded-subject :- p/ExplodedUri]
(let [[pattern-client pattern-type] exploded-pattern
[subject-client subject-type] exploded-subject]
(and (or (= "*" pattern-client) (= subject-client pattern-client))
(or (= "*" pattern-type) (= subject-type pattern-type)))))
(s/defn uri-pattern-sets-match? :- s/Bool
"Does a subject uri match the pattern sets?"
[{:keys [explicit wildcard]} :- shared/PatternSets subject :- p/Uri]
(or (contains? explicit subject) ;; the set of explicit patterns contains the subject
(let [exploded-subject (p/explode-uri subject)]
(some #(exploded-uri-pattern-match? % exploded-subject) wildcard))
false)) ;; to satisfy the s/Bool schema for the return value
(s/defn build-inventory-data :- p/InventoryResponse
"Build the payload of the inventory response message given the inventory snapshot and
a set of patterns to filter the snapshot."
[inventory :- shared/Inventory pattern-sets :- shared/PatternSets]
(let [matched (->> inventory
(reduce
(fn [matched-clients [client _]]
(if (uri-pattern-sets-match? pattern-sets client)
(conj! matched-clients client)
matched-clients))
(transient #{}))
persistent!)]
{:uris (sort matched)}))
(s/defn build-update-data :- (s/maybe p/InventoryUpdate)
"Build the payload of the inventory update message given the inventory updates snapshot and
a set of patterns to filter the snapshot. Return nil if there are no updates matching the
patterns."
[updates :- [p/InventoryChange] pattern-sets :- shared/PatternSets]
(let [filtered (->> updates
(reduce
(fn [filtered-updates update]
(if (uri-pattern-sets-match? pattern-sets (:client update))
;; FIXME there should be a limit on the maximum number of elements
;; in the filtered vector to ensure the update message can't grow
;; too long
(conj! filtered-updates update)
filtered-updates))
(transient []))
persistent!)]
(if (seq filtered)
{:changes filtered})))
(s/defn subscribe-client! :- shared/BrokerDatabase
"Subscribe the specified client for inventory updates. Expects a promise of a connection, to be
fulfilled when the initial inventory response has been sent; that prevents messages appearing out
of order. Return the broker database snapshot at the time of subscribing."
[broker :- Broker client :- p/Uri connection :- Object pattern-sets :- shared/PatternSets]
(let [database (:database broker)]
(swap! database
#(update % :subscriptions assoc client {:connection connection
:pattern-sets pattern-sets
:next-update-index (unchecked+ (-> % :first-update-index) (-> % :updates count))}))))
(s/defn unsubscribe-client! :- shared/BrokerDatabase
"Unsubscribe the specified client from inventory updates. Return the broker database snapshot
at the time of unsubscribing."
[broker :- Broker client :- p/Uri]
(let [database (:database broker)]
(swap! database
update :subscriptions dissoc client)))
(s/defn ^:private remove-processed-updates
"Remove the `processed-updates-count` updates from the :updates vector and increase
:first-update-index accordingly."
[broker :- Broker processed-updates-count :- s/Int]
(let [database (:database broker)]
(swap! database #(-> %
;; need to call vec as the subvec keeps a reference to the original vector
(update :updates (fn [updates] (vec (subvec updates processed-updates-count))))
(update :first-update-index unchecked+ processed-updates-count)))))
(s/defn send-updates
[broker :- Broker]
"Send inventory update messages to subscribed clients. Subsequently remove the processed inventory
change records from the :updates vector."
(let [database (:database broker)
database-snapshot @database
updates (:updates database-snapshot)
updates-count (count updates)
first-update-index (:first-update-index database-snapshot)]
(->> (reduce ;; this reduce returns the number of updates which can be removed from the :updates vector
(fn [processed-updates-count subscriber]
(if-let [subscription (-> @database :subscriptions (get subscriber))]
(let [next-update-offset (unchecked- (:next-update-index subscription) first-update-index)
processed-count-atom (atom nil)] ;; how many records from the updates vector were processed
(swap! database
(fn [database]
(if (identical? (-> database :subscriptions (get subscriber)) subscription) ;; is the subscription in the live database still the same?
(do
(if (nil? @processed-count-atom) ;; have we not sent the update to this subscriber yet?
(let [data (-> (subvec updates next-update-offset) ;; skip updates which have already been sent to this subscriber
(build-update-data (:pattern-sets subscription)))]
(if (or (nil? data) ;; there are no updates for this subscriber
(and (realized? (:connection subscription)) ;; prevent update before inventory response
(deliver-server-message broker
(message/make-message
{:message_type "http://puppetlabs.com/inventory_update"
:target subscriber
:data data})
@(:connection subscription))))
(reset! processed-count-atom updates-count)
(reset! processed-count-atom next-update-offset))))
(let [processed-count @processed-count-atom]
(if (> processed-count next-update-offset)
;; add the count of the InventoryChange records which were processed when building the
;; inventory update for this subscriber to the :next-update-index in the subscription
(update database :subscriptions assoc subscriber
(update subscription :next-update-index unchecked+ (- processed-count next-update-offset)))
database))) ;; no need to update the subscription if nothing past the next-update-offset was processed
(do
;; if the subscription has been changed, ignore it but pretend the entire updates vector was processed
(reset! processed-count-atom updates-count)
database))))
(min @processed-count-atom processed-updates-count))
processed-updates-count))
updates-count
(-> database-snapshot :subscriptions keys))
(remove-processed-updates broker))))
(def batch-update-interval-ms 1000)
(s/defn start-inventory-updates!
"Start periodic sending of the inventory updates."
[broker :- Broker]
(future
(let [should-stop (:should-stop broker)]
(loop []
(send-updates broker)
(if (nil? (deref should-stop batch-update-interval-ms nil))
(recur))))))
(s/defn stop-inventory-updates!
"Stop the periodic sending of the inventory updates."
[broker :- Broker]
(let [database (:database broker)]
(swap! database assoc :subscriptions {})) ;; clear all subscriptions
(deliver (:should-stop broker) true))