-
-
Notifications
You must be signed in to change notification settings - Fork 48
/
websocket.clj
125 lines (116 loc) · 4.81 KB
/
websocket.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
(ns ring.adapter.jetty9.websocket
(:import [org.eclipse.jetty.server Request Response Server]
[org.eclipse.jetty.server.handler ContextHandler]
[org.eclipse.jetty.websocket.api Session Session$Listener$AutoDemanding Callback]
[org.eclipse.jetty.websocket.server ServerWebSocketContainer
WebSocketCreator ServerUpgradeRequest WebSocketUpgradeHandler]
[org.eclipse.jetty.websocket.common JettyExtensionConfig]
[clojure.lang IFn]
[java.nio ByteBuffer]
[java.util Locale]
[java.time Duration])
(:require [clojure.string :as string]
[ring.websocket.protocols :as ring-ws]
[ring.adapter.jetty9.common :refer [build-request-map
get-headers set-headers! noop]]))
(defn- write-callback
[write-success write-failed]
(reify Callback
(succeed [_]
(write-success))
(fail [_ throwable]
(write-failed throwable))))
(extend-type Session
ring-ws/Socket
(-send [this msg]
(if (instance? CharSequence msg)
(.sendText this msg (write-callback noop noop))
(.sendBinary this msg (write-callback noop noop))))
(-ping [this msg]
(.sendPing this msg (write-callback noop noop)))
(-pong [this msg]
(.sendPong this msg (write-callback noop noop)))
(-close [this status-code reason]
(.close this status-code reason (write-callback noop noop)))
(-open? [this]
(.isOpen this))
ring-ws/AsyncSocket
(-send-async [this msg succeed fail]
(if (instance? CharSequence msg)
(.sendText this msg (write-callback succeed fail))
(.sendBinary this msg (write-callback succeed fail)))))
(defn- proxy-ws-adapter
[listener]
(let [session (atom nil)]
(reify Session$Listener$AutoDemanding
(^void onWebSocketOpen [this ^Session current-session]
(ring-ws/on-open listener current-session)
;; save session
(reset! session current-session))
(^void onWebSocketError [this ^Throwable e]
(ring-ws/on-error listener @session e))
(^void onWebSocketText [this ^String message]
(ring-ws/on-message listener @session message))
(^void onWebSocketClose [this ^int status ^String reason]
(ring-ws/on-close listener @session status reason))
(^void onWebSocketBinary [this ^ByteBuffer payload ^Callback cb]
(ring-ws/on-message listener @session payload))
(^void onWebSocketPing [this ^ByteBuffer bytebuffer]
(when (extends? ring-ws/PingListener listener)
(ring-ws/on-ping listener @session bytebuffer)))
(^void onWebSocketPong [this ^ByteBuffer bytebuffer]
(ring-ws/on-pong listener @session bytebuffer)))))
(defn reify-ws-creator
[resp-map]
(reify WebSocketCreator
(createWebSocket [this req resp cb]
(let [listener (:ring.websocket/listener resp-map)
protocol (:ring.websocket/protocol resp-map)]
(when (some? protocol)
(.setAcceptedSubProtocol resp protocol))
#_(when-let [exts (not-empty (:extensions ws-results))]
(.setExtensions resp (mapv #(if (string? %)
(JettyExtensionConfig. ^String %)
%)
exts)))
(proxy-ws-adapter listener)))))
(defn upgrade-websocket
[^Request req
^Response resp
^Callback cb
ws-resp
{:as _options
:keys [ws-max-idle-time
ws-max-frame-size
ws-max-binary-message-size
ws-max-text-message-size
ws-configurator]
:or {ws-configurator (constantly nil)}}]
{:pre [(map? ws-resp)]}
(let [container (ServerWebSocketContainer/get (.getContext req))
creator (reify-ws-creator ws-resp)]
(when ws-max-idle-time
(.setIdleTimeout container (Duration/ofMillis ws-max-idle-time)))
(when ws-max-frame-size
(.setMaxFrameSize container ws-max-frame-size))
(when ws-max-binary-message-size
(.setMaxBinaryMessageSize container ws-max-binary-message-size))
(when ws-max-text-message-size
(.setMaxTextMessageSize container ws-max-text-message-size))
(ws-configurator container)
(.upgrade container creator req resp cb)))
(defn ws-upgrade-request?
"Checks if a request is a websocket upgrade request.
It is a websocket upgrade request when it contains the following headers:
- connection: upgrade
- upgrade: websocket
"
[{:keys [headers]}]
(let [upgrade (get headers "upgrade")
connection (get headers "connection")]
(and upgrade
connection
(string/includes? (string/lower-case upgrade) "websocket")
(string/includes? (string/lower-case connection) "upgrade"))))
(defn ensure-container [^Server server ^ContextHandler context-handler]
(ServerWebSocketContainer/ensure server context-handler))