/
sse.clj
230 lines (206 loc) · 9.51 KB
/
sse.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
; Copyright 2013 Relevance, Inc.
; Copyright 2014-2022 Cognitect, Inc.
; The use and distribution terms for this software are covered by the
; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0)
; which can be found in the file epl-v10.html at the root of this distribution.
;
; By using this software in any fashion, you are agreeing to be bound by
; the terms of this license.
;
; You must not remove this notice, or any other, from this software.
(ns io.pedestal.http.sse
(:require [ring.util.response :as ring-response]
[clojure.core.async :as async]
[clojure.core.async.impl.protocols :as asyncimpl]
[io.pedestal.http.servlet :refer :all]
[io.pedestal.log :as log]
[io.pedestal.interceptor :as interceptor]
[clojure.stacktrace]
[clojure.string :as string])
(:import [java.nio.charset Charset]
[java.io BufferedReader StringReader OutputStream]
[java.util.concurrent Executors ThreadFactory TimeUnit ScheduledExecutorService ScheduledFuture]
[javax.servlet ServletResponse]
[com.fasterxml.jackson.core.util ByteArrayBuilder]))
(set! *warn-on-reflection* true)
(def ^String UTF-8 "UTF-8")
(defn get-bytes ^bytes [^String s]
(.getBytes s UTF-8))
(def CRLF (get-bytes "\r\n"))
(def EVENT_FIELD (get-bytes "event: "))
(def DATA_FIELD (get-bytes "data: "))
(def COMMENT_FIELD (get-bytes ": "))
(def ID_FIELD (get-bytes "id: "))
;; Cloned from core.async.impl.concurrent
(defn counted-thread-factory
"Create a ThreadFactory that maintains a counter for naming Threads.
name-format specifies thread names - use %d to include counter
daemon is a flag for whether threads are daemons or not"
[name-format daemon]
(let [counter (atom 0)]
(reify
ThreadFactory
(newThread [this runnable]
(doto (Thread. runnable)
(.setName (format name-format (swap! counter inc)))
(.setDaemon daemon))))))
(def ^ThreadFactory daemon-thread-factory (counted-thread-factory "pedestal-sse-%d" true))
(def ^ScheduledExecutorService scheduler (Executors/newScheduledThreadPool 1 daemon-thread-factory))
(defn mk-data
([name data]
(mk-data name data nil))
([name data id]
(let [bab (ByteArrayBuilder.)]
(when name
(.write bab ^bytes EVENT_FIELD)
(.write bab ^bytes (get-bytes name))
(.write bab ^bytes CRLF))
(doseq [part (string/split data #"\r?\n")]
(.write bab ^bytes DATA_FIELD)
(.write bab ^bytes (get-bytes part))
(.write bab ^bytes CRLF))
(when (not-empty id)
(.write bab ^bytes ID_FIELD)
(.write bab ^bytes (get-bytes id))
(.write bab ^bytes CRLF))
(.write bab ^bytes CRLF)
(.toByteArray bab))))
(defn send-event
([channel name data]
(send-event channel name data nil))
([channel name data id]
(send-event channel name data id async/>!!))
([channel name data id put-fn]
(log/trace :msg "writing event to stream"
:name name
:data data
:id id)
(log/histogram ::payload-size (count data))
(try
(put-fn channel (mk-data name data id))
(catch Throwable t
(async/close! channel)
(log/error :msg "exception sending event"
:throwable t
:stacktrace (with-out-str (clojure.stacktrace/print-stack-trace t)))
(throw t)))))
(defn do-heartbeat
([channel] (do-heartbeat channel {}))
([channel {:keys [on-client-disconnect]}]
(try
(log/trace :msg "writing heartbeat to stream")
(async/>!! channel CRLF)
(catch Throwable t
(async/close! channel)
(log/error :msg "exception sending heartbeat"
:throwable t
:stacktrace (with-out-str (clojure.stacktrace/print-stack-trace t)))
(throw t)))))
(defn end-event-stream
"DEPRECATED. Given a `context`, clean up the event stream it represents."
{:deprecated "0.4.0"}
[{end-fn ::end-event-stream}]
;;(end-fn)
)
;; This is extracted as a separate function mainly to support advanced
;; users who want to rebind it during tests. Note to those that do so:
;; the function is private to indicate that the contract may break in
;; future revisions. Use at your own risk. If you find yourself using
;; this to see what data is being put on `event-channel` consider
;; instead modifying your application's stream-ready-fn to support the
;; tests you want to write.
(defn- start-dispatch-loop
"Kicks off the loop that transfers data provided by the application
on `event-channel` to the HTTP infrastructure via
`response-channel`."
[{:keys [event-channel response-channel heartbeat-delay on-client-disconnect]}]
(async/go
(log/counter ::active-streams 1)
(try
(loop []
(let [hb-timeout (async/timeout (* 1000 heartbeat-delay))
[event port] (async/alts! [event-channel hb-timeout])]
(cond
(= port hb-timeout)
(if (async/>! response-channel CRLF)
(recur)
(log/info :msg "Response channel was closed when sending heartbeat. Shutting down SSE stream."))
(and (some? event) (= port event-channel))
;; You can name your events using the maps
;; {:name "my-event" :data "some message data here"}
;; .. and optionally supply IDs (strings) that make sense to your application
;; {:name "my-event" :data "some message data here" :id "1234567890ae"}
(let [event-name (if (map? event) (str (:name event)) nil)
event-data (if (map? event) (str (:data event)) (str event))
event-id (if (map? event) (str (:id event)) nil)]
(if (send-event response-channel event-name event-data event-id async/put!)
(recur)
(log/info :msg "Response channel was closed when sending event. Shutting down SSE stream.")))
:else
(log/info :msg "Event channel has closed. Shutting down SSE stream."))))
(finally
(async/close! event-channel)
(async/close! response-channel)
(log/counter ::active-streams -1)
(when on-client-disconnect (on-client-disconnect))))))
(defn start-stream
"Starts an SSE event stream and initiates a heartbeat to keep the
connection alive. `stream-ready-fn` will be called with a core.async
channel. The application can then put maps with keys :name and :data
on that channel to cause SSE events to be sent to the client. Either
the client or the application may close the channel to terminate and
clean up the event stream; the client closes it by closing the
connection.
The SSE's core.async buffer can either be a fixed buffer (n) or a 0-arity
function that returns a buffer."
([stream-ready-fn context heartbeat-delay]
(start-stream stream-ready-fn context heartbeat-delay 10))
([stream-ready-fn context heartbeat-delay bufferfn-or-n]
(start-stream stream-ready-fn context heartbeat-delay bufferfn-or-n {}))
([stream-ready-fn context heartbeat-delay bufferfn-or-n opts]
(let [{:keys [on-client-disconnect]} opts
response-channel (async/chan (if (fn? bufferfn-or-n) (bufferfn-or-n) bufferfn-or-n))
response (-> (ring-response/response response-channel)
(ring-response/content-type "text/event-stream")
(ring-response/charset "UTF-8")
(ring-response/header "Connection" "close")
(ring-response/header "Cache-Control" "no-cache")
(update-in [:headers] merge (:cors-headers context)))
event-channel (async/chan (if (fn? bufferfn-or-n) (bufferfn-or-n) bufferfn-or-n))
context* (assoc context
:response-channel response-channel
:response response)]
(async/thread
(stream-ready-fn event-channel context*))
(start-dispatch-loop (merge {:event-channel event-channel
:response-channel response-channel
:heartbeat-delay heartbeat-delay
:context context*}
(when on-client-disconnect
{:on-client-disconnect #(on-client-disconnect context*)})))
context*)))
(defn start-event-stream
"Returns an interceptor which will start a Server Sent Event stream
with the requesting client, and set the ServletResponse to go
async. After the request handling context has been paused in the
Servlet thread, `stream-ready-fn` will be called in a future, with
the resulting context from setting up the SSE event stream.
opts is a map with optional keys:
:on-client-disconnect - A function of one argument which will be
called when the client permanently disconnects."
([stream-ready-fn]
(start-event-stream stream-ready-fn 10 10))
([stream-ready-fn heartbeat-delay]
(start-event-stream stream-ready-fn heartbeat-delay 10))
([stream-ready-fn heartbeat-delay bufferfn-or-n]
(start-event-stream stream-ready-fn heartbeat-delay bufferfn-or-n {}))
([stream-ready-fn heartbeat-delay bufferfn-or-n opts]
(interceptor/interceptor
{:name (keyword (str (gensym "io.pedestal.http.sse/start-event-stream")))
:enter (fn [context]
(log/trace :msg "switching to sse")
(start-stream stream-ready-fn context heartbeat-delay bufferfn-or-n opts))})))
(defn sse-setup
"See start-event-stream. This function is for backward compatibility."
[& args]
(apply start-event-stream args))