This repository has been archived by the owner on Jun 19, 2018. It is now read-only.
/
websocket.clj
152 lines (127 loc) · 3.97 KB
/
websocket.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
(ns qbits.jet.websocket
"Shared by the server and the client impl"
(:require
[clojure.core.async :as async]
[qbits.jet.async :as a]
[clojure.string :as string])
(:import
(org.eclipse.jetty.websocket.api
WebSocketListener
RemoteEndpoint
Session
SuspendToken
UpgradeRequest)
(clojure.lang IFn)
(java.nio ByteBuffer)
(clojure.core.async.impl.channels ManyToManyChannel)))
(defprotocol ^:no-doc PWebSocket
(send! [this msg] "Send content to client connected to this WebSocket instance")
(close! [this] "Close active WebSocket")
(remote ^RemoteEndpoint [this] "Remote endpoint instance")
(session ^Session [this] "Session instance")
(remote-addr [this] "Address of remote client")
(idle-timeout! [this ms] "Set idle timeout on client"))
(defprotocol ^:no-doc BackPressure
(backpressure! [this status]))
(defprotocol ^:no-doc WebSocketSend
(-send! [x ^WebSocket ws] "How to encode content sent to the WebSocket clients"))
(defn ^:no-doc close-chans!
[& chs]
(doseq [ch chs]
(async/close! ch)))
(extend-protocol WebSocketSend
(Class/forName "[B")
(-send! [ba ws]
(-send! (ByteBuffer/wrap ba) ws))
ByteBuffer
(-send! [bb ws]
(some-> ws remote (.sendBytes ^ByteBuffer bb)))
String
(-send! [s ws]
(some-> ws remote (.sendString ^String s)))
IFn
(-send! [f ws]
(some-> ws remote f))
Object
(-send! [this ws]
(some-> ws remote (.sendString (str this))))
;; "nil" could PING?
;; nil
;; (-send! [this ws] ()
)
(deftype WebSocket
[^ManyToManyChannel in
^ManyToManyChannel out
^ManyToManyChannel ctrl
^IFn handler
^Session ^:volatile-mutable session
^SuspendToken ^:volatile-mutable reads-suspend-token]
WebSocketListener
(onWebSocketConnect [this s]
(set! session s)
(async/go
(loop []
;; if we pull out of value of out, we send it and recur for
;; another one, otherwise that means the user closed it, in
;; that case we close the Socket (if not closed already)
;; and exit the loop.
(if-let [x (async/<! out)]
(do
(send! this x)
(recur))
(close! this))))
(let [request (.getUpgradeRequest s)
uri (.getRequestURI request)
port (.getPort uri)]
(handler
{:in in
:out out
:ctrl ctrl
:ws this
:server-name (.getHost uri)
:server-port port
:remote-addr (-> session .getRemoteAddress .getAddress .getHostAddress)
:uri (.getPath uri)
:scheme (if (= 443 port) :wss :ws)
:query-string (.getQueryString request)
:request-method :get ;; (some-> request .getMethod string/lower-case keyword)
:headers (reduce(fn [m [k v]]
(assoc m (string/lower-case k) (string/join "," v)))
{}
(.getHeaders request))})))
(onWebSocketError [this e]
(async/put! ctrl [::error e])
(close-chans! in out ctrl))
(onWebSocketClose [this code reason]
(set! session nil)
(async/put! ctrl [::close code reason])
(close-chans! in out ctrl))
(onWebSocketText [this message]
(a/put! in message #(backpressure! this %)))
(onWebSocketBinary [this payload offset len]
(a/put! in (ByteBuffer/wrap payload offset len)
#(backpressure! this %)))
BackPressure
(backpressure! [this backpressure?]
(if backpressure?
(set! reads-suspend-token (.suspend session))
(do
(.resume reads-suspend-token)
(set! reads-suspend-token nil))))
PWebSocket
(remote [this]
(when session
(.getRemote session)))
(session [this] session)
(send! [this msg]
(-send! msg this))
(close! [this]
(when (some-> session .isOpen)
(.close session)))
(remote-addr [this]
(.getRemoteAddress session))
(idle-timeout! [this ms]
(.setIdleTimeout session (long ms))))
(defn ^:no-doc make-websocket
[in out ctrl handler]
(WebSocket. in out ctrl handler nil nil))