-
Notifications
You must be signed in to change notification settings - Fork 32
/
shared.clj
211 lines (193 loc) · 8.52 KB
/
shared.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
(ns puppetlabs.pcp.broker.shared
(:require [metrics.gauges :as gauges]
[puppetlabs.trapperkeeper.services.websocket-session :as websocket-session]
[puppetlabs.pcp.broker.connection :as connection :refer [Codec]]
[puppetlabs.pcp.broker.websocket]
[puppetlabs.pcp.broker.message :as message :refer [Message multicast-message?]]
[puppetlabs.pcp.client :as pcp-client]
[puppetlabs.pcp.protocol :as p]
[puppetlabs.metrics :refer [time!]]
[puppetlabs.structured-logging.core :as sl]
[schema.core :as s]
[puppetlabs.i18n.core :as i18n])
(:import [puppetlabs.pcp.broker.connection Connection]
[org.joda.time DateTime]
[org.eclipse.jetty.server.handler ContextHandler]
[clojure.lang IFn]))
(def BrokerState
(s/enum :starting :running :stopping))
(def PatternSets
{:explicit #{p/Uri} :wildcard #{p/ExplodedUri}})
(def Subscription
{;; Promise that resolves to the Connection after inventory response has been
;; sent. This ensures no updates are sent before the initial response.
:connection Object
:pattern-sets PatternSets
;; the index of the next update to process (note that to get the offset
;; of the corresponding InventoryChange record in the :updates vector, you
;; must substract the broker database's :first-update-index from this value)
:next-update-index s/Int})
(def Inventory
{p/Uri Connection})
(def BrokerDatabase
{:inventory Inventory
;; the index of the first InventoryChange record in the :updates vector
;; (note that this index can overflow)
:first-update-index s/Int
:warning-bin {p/Uri DateTime}
:updates [p/InventoryChange]
:subscriptions {p/Uri Subscription}})
(def Broker
{:broker-name (s/maybe s/Str)
:authorization-check IFn
:max-connections s/Int
:max-message-size s/Int
:idle-timeout s/Int
:crl-check-period s/Int
:expired-conn-throttle s/Int
:database (s/atom BrokerDatabase)
:controllers (s/atom {p/Uri Connection})
:should-stop Object ;; Promise used to signal the inventory updates should stop
:metrics-registry Object
:metrics {s/Keyword Object}
:state (s/atom BrokerState)
:handlers (s/atom [ContextHandler])})
(s/defn get-connection :- (s/maybe Connection)
[broker :- Broker uri :- p/Uri]
(-> broker :database deref :inventory (get uri)))
(s/defn get-controller :- (s/maybe Connection)
([broker :- Broker uri :- p/Uri]
(get-controller broker uri 0))
([broker :- Broker uri :- p/Uri timeout :- s/Int]
(when-let [controller (get @(:controllers broker) uri)]
(pcp-client/wait-for-connection (:websocket controller) timeout)
(if (pcp-client/connected? (:websocket controller))
controller))))
(s/defn build-and-register-metrics :- {s/Keyword Object}
[broker :- Broker]
(let [registry (:metrics-registry broker)]
(gauges/gauge-fn registry ["puppetlabs.pcp.connections"]
(fn [] (-> broker :database deref :inventory count)))
{:on-connect (.timer registry "puppetlabs.pcp.on-connect")
:on-close (.timer registry "puppetlabs.pcp.on-close")
:on-message (.timer registry "puppetlabs.pcp.on-message")
:on-send (.timer registry "puppetlabs.pcp.on-send")}))
;;
;; Message sending
;;
(def MessageLog
"Schema for a loggable summary of a message"
{:messageid p/MessageId
:source s/Str
:messagetype s/Str
:destination p/Uri})
(s/defn summarize :- MessageLog
[message :- Message]
{:messageid (:id message)
:messagetype (:message_type message)
:source (or (:sender message) "pcp:///server")
:destination (or (:target message) "pcp:///server")})
(s/defn send-message
[connection :- Connection
message :- Message]
(sl/maplog :trace {:type :outgoing-message-trace
:uri (:uri connection)
:rawmsg message}
;; 0 : connection uri
;; 1 : raw message
#(i18n/trs "Sending PCP message to {0}: {1}" (:uri %) (:rawmsg %)))
(websocket-session/send! (:websocket connection)
((get-in connection [:codec :encode]) message))
nil)
(s/defn send-error-message
[in-reply-to-message :- (s/maybe Message)
description :- s/Str
connection :- Connection]
(let [error-msg (cond-> (message/make-message
{:message_type "http://puppetlabs.com/error_message"
:data description})
in-reply-to-message (assoc :in_reply_to (:id in-reply-to-message)))]
(try
(locking (:websocket connection)
(send-message connection error-msg))
(catch Exception e
(sl/maplog :warn e
{:target (:uri connection)
:type :message-delivery-error}
#(i18n/trs "Attempted error message delivery to {0} failed." (:target %)))))))
(s/defn log-delivery-failure
"Log message delivery failure given the message and failure reason."
[message :- Message reason :- s/Str]
(sl/maplog :trace (assoc (summarize message)
:type :message-delivery-failure
:reason reason)
;; 0 : message id (uuid)
;; 1 : destination uri
;; 2 : reason for failure
#(i18n/trs "Failed to deliver {0} for {1}: {2}"
(:messageid %) (:destination %) (:reason %)))
nil) ;; ensure nil is returned
(s/defn handle-delivery-failure
"Send an error message with the specified description."
[message :- Message sender :- (s/maybe Connection) reason :- s/Str]
(log-delivery-failure message reason)
(send-error-message message reason sender))
(s/defn deliver-message
"Message consumer. Delivers a message to the websocket indicated by the :target field"
[broker :- Broker
message :- Message
sender :- (s/maybe Connection)]
(assert (not (multicast-message? message)))
(if-let [connection (or (get-connection broker (:target message))
(get-controller broker (:target message)))]
(try
(sl/maplog
:debug (merge (summarize message)
(connection/summarize connection)
{:type :message-delivery})
;; 0 : message id (uuid)
;; 1 : destination uri
;; 2 : remote address
#(i18n/trs "Delivering {0} to {1} at {2}."
(:messageid %) (:destination %) (:remoteaddress %)))
(locking (:websocket connection)
(time! (:on-send (:metrics broker))
(send-message connection message)))
(catch Exception e
(sl/maplog :warn e
(merge (summarize message)
{:type :message-delivery-error})
#(i18n/trs "Attempted message delivery to {0} failed." (:destination %)))
(handle-delivery-failure message sender (str e))))
(handle-delivery-failure message sender (i18n/trs "Not connected."))))
(s/defn deliver-server-message
"Message consumer. Delivers a message to the websocket indicated by the :target field but only if it still
routed to the connection specified by the client argument"
[broker :- Broker
message :- Message
client :- Connection]
(assert (not (multicast-message? message)))
(let [connection (or (get-connection broker (:target message))
(get-controller broker (:target message)))]
(if (identical? connection client)
(try
(sl/maplog
:debug (merge (summarize message)
(connection/summarize connection)
{:type :message-delivery})
;; 0 : message id (uuid)
;; 1 : destination uri
;; 2 : remote address
#(i18n/trs "Delivering {0} to {1} at {2}."
(:messageid %) (:destination %) (:remoteaddress %)))
(locking (:websocket connection)
(time! (:on-send (:metrics broker))
(send-message connection message))
true)
(catch Exception e
(sl/maplog :warn e
(merge (summarize message)
{:type :message-delivery-error})
#(i18n/trs "Attempted message delivery to {0} failed." (:destination %)))
(log-delivery-failure message (str e))))
(log-delivery-failure message (i18n/trs "Client no longer connected.")))))