-
Notifications
You must be signed in to change notification settings - Fork 0
/
sse.clj
213 lines (192 loc) · 6.7 KB
/
sse.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
(ns clj-sse-client.sse
(:require
[clj-sse-client.body-handler :as h]
[clj-sse-client.client :as http :refer [noop]]
[clj-sse-client.event :as e]
[clojure.spec.alpha :as s])
(:import
(java.util.function BiFunction)
(java.util.concurrent
Flow$Subscriber
Flow$Subscription
Executor
CompletableFuture)))
(set! *warn-on-reflection* true)
(s/def ::on-complete (s/fspec :args (s/cat :state ::e/state)))
(s/def ::on-error (s/fspec :args (s/cat :state ::e/state
:error #(instance? Throwable %))))
(s/def ::on-next (s/fspec :args (s/cat :effect ::e/message)))
(s/def ::on-subscribe (s/fspec :args (s/cat :state ::e/state)))
(s/def ::sse-options
(s/keys
:opt-un [::on-complete ::on-error ::on-next ::on-subscribe]))
(defprotocol ISubscriber
"A protocol for accessing a subscription from an underlying object."
(-subscription [subscriber]))
(definterface ISSE
(nextEvent [line]))
(deftype SSEFlowSubscriber
[on-complete
on-error
on-next
on-subscribe
^:unsynchronized-mutable ^Flow$Subscription subscription
^:unsynchronized-mutable state]
ISSE
(nextEvent [_ line]
(when line
(let [message (e/parse line)
effect (e/effect state message)
state' (e/step state message)
ret (if effect (on-next effect) true)]
(set! state state')
ret)))
Flow$Subscriber
(onSubscribe [_ sub]
(set! subscription sub)
(set! state (e/->State "" "" ""))
(.request sub 1)
(on-subscribe state))
(onComplete [_]
(on-complete state))
(onError [_ throwable]
(on-error state throwable))
(onNext [this item]
(if (.nextEvent this item)
(.request subscription 1)
(.cancel subscription)))
ISubscriber
(-subscription [_] subscription))
(defn sse-flow-subscriber
"Create a [[SSEFlowSubscriber]] with callbacks similar to
a [[Flow$Subscriber]] with slight differences:
`on-complete`: called when the flow completes with an internal state.
`on-subscribe`: called on subscription with an internal state.
`on-next`: Unlike a [[Flow$Subscriber]], called when an SSE event is
emitted from the buffer with the event as an argument.
`on-next` SHOULD return a truth-y value. A false-y value will cause
events to stop being consumed.
`on-error`: Called with the internal state and a Throwable.
The returned subscriber also implements [[ISubscriber]], exposing
its internal [[Flow$Subscription]], and [[ISSE]] which defines how the
next SSE event must be handled."
^SSEFlowSubscriber
[{:keys [on-complete
on-error
on-next
on-subscribe]
:or {on-complete noop
on-error noop
on-next identity
on-subscribe noop}}]
(new SSEFlowSubscriber on-complete on-error on-next on-subscribe nil nil))
(s/fdef sse-flow-subscriber
:args (s/cat :options ::sse-options)
:ret #(instance? SSEFlowSubscriber %))
(defn sse-subscription
"Initialize a SSE subscription with `client` according to `request` and `opts`.
`opts` are passed to [[sse-flow-subscriber]].
Returns a tuple of [CompletableFuture<Response> [[ISubscriber]]].
It's worth considering the future won't be completed until the end of
the response. If the subscription is expected to go on for a while,
don't block a thread waiting for completion."
[client request opts]
(let [subscriber (sse-flow-subscriber opts)
resp (http/send-async! client request (h/from-line-subscriber subscriber))]
[resp subscriber]))
(defprotocol IConnection
(-connect [this] [this subscription])
(-reconnect [this]))
(defn reconnect-callback
^BiFunction [connection attempts max-reconnection-attemps]
(reify BiFunction
(apply [_ r e]
(if (nil? e)
(println "Connection complete")
(when (< attempts max-reconnection-attemps)
(println "Reconnecting")
(-reconnect connection))))))
(defn delayed-executor
^Executor [executor delay]
(if executor
(CompletableFuture/delayedExecutor
delay
java.util.concurrent.TimeUnit/SECONDS
executor)
(CompletableFuture/delayedExecutor
delay
java.util.concurrent.TimeUnit/SECONDS)))
(deftype SSEConnection
[client
connection-request
^:volatile-mutable subscription
options
^:volatile-mutable subscriber
^:volatile-mutable response
^Executor reconnect-executor
reconnect?
^:volatile-mutable reconnect-delay
^:volatile-mutable attempts]
IConnection
(-reconnect [this]
(set! attempts (inc attempts))
(-connect this))
(-connect [this]
(-connect this subscription))
(-connect [this sub]
(set! subscription sub)
(let [-subscriber (sse-flow-subscriber subscription)
-resp (http/send-async! client connection-request (h/from-line-subscriber -subscriber))
reconnect (reconnect-callback this attempts (:max-reconnection-attemps options))]
(.handleAsync ^CompletableFuture -resp reconnect (delayed-executor reconnect-executor reconnect-delay))
(set! response -resp)
(set! subscriber -subscriber)))
ISubscriber
(-subscription [_] (-subscription subscriber))
java.lang.AutoCloseable
(close [this] (.cancel ^Flow$Subscription (-subscription this))))
(def default-connection-options
{:max-reconnection-attemps 3
:reconnect? false
:reconnect-delay 20})
(defn sse-connection
"Takes HttpClient, connection request subscription specification and connection options.
Returns a [[SSEConnection]] which can connect, close and reconnect."
[client
connection-request
subscription
options]
(let [{:keys [executor reconnect? reconnect-delay]}
(merge default-connection-options options)]
(new
SSEConnection
client
connection-request
subscription
options
nil
nil
executor
reconnect?
reconnect-delay
0)))
(defn connect
"Initiated a SSE connection.
Optionally takes SSE options which override the existing options.
This allows referring to the connection object in the callbacks."
([connection]
(-connect connection))
([connection sse-options]
(-connect connection sse-options)))
(comment
(def client (http/client))
(def req (http/request {:uri "http://localhost:8005/lowfreq"
:headers {}
:method :get}))
(def opts {:on-complete (fn [state] (println "Subscription completed with state:" state))
:on-error (fn [state ^Throwable t] (println "Error with state:" state t))
:on-next (fn [eff] (doto eff println))
:on-subscribe (fn [state] (println "Initializing subscription with state:" state))})
(def conn (sse-connection client req opts {:reconnect? true}))
(connect conn)
(.close conn))