-
-
Notifications
You must be signed in to change notification settings - Fork 97
/
socket.clj
302 lines (261 loc) · 11.1 KB
/
socket.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
(ns nrepl.socket
"Compatibility layer for java.io vs java.nio sockets to allow an
incremental transition to nio, since the JDK's filesystem sockets
don't support the java.io socket interface, and we can't use the
compatibility layer for bidirectional read and write:
https://bugs.openjdk.java.net/browse/JDK-4509080."
(:require
[clojure.java.io :as io]
[nrepl.misc :refer [log]]
[nrepl.tls :as tls]
[nrepl.socket.dynamic :refer [get-path]])
(:import
(java.io BufferedInputStream BufferedOutputStream File OutputStream)
(java.net InetSocketAddress ProtocolFamily ServerSocket Socket SocketAddress
StandardProtocolFamily URI)
(java.nio ByteBuffer)
(java.nio.file Path)
(java.nio.channels Channels ClosedChannelException NetworkChannel
ServerSocketChannel SocketChannel)
(javax.net.ssl SSLServerSocket)))
(defmacro find-class [full-path]
`(try
(Class/forName (name ~full-path))
(catch ClassNotFoundException ex#
nil)))
;;; InetSockets (TCP)
(defn inet-socket
([bind port]
(let [port (or port 0)
addr (fn [^String bind port] (InetSocketAddress. bind (int port)))
;; We fallback to 127.0.0.1 instead of to localhost to avoid
;; a dependency on the order of ipv4 and ipv6 records for
;; localhost in /etc/hosts
bind (or bind "127.0.0.1")]
(doto (ServerSocket.)
(.setReuseAddress true)
(.bind (addr bind port)))))
([bind port tls-context]
(let [port (or port 0)
;; We fallback to 127.0.0.1 instead of to localhost to avoid
;; a dependency on the order of ipv4 and ipv6 records for
;; localhost in /etc/hosts
bind (or bind "127.0.0.1")]
(tls/server-socket tls-context bind port))))
;; Unix domain sockets
(def ^Class junixsocket-address-class
(find-class 'org.newsclub.net.unix.AFUNIXSocketAddress))
(def ^Class junixsocket-server-class
(find-class 'org.newsclub.net.unix.AFUNIXServerSocket))
(def ^Class junixsocket-class
(find-class 'org.newsclub.net.unix.AFUNIXSocket))
(def ^Class jdk-unix-address-class
(find-class 'java.net.UnixDomainSocketAddress))
(def ^Class jdk-unix-server-class
(find-class 'java.nio.channels.ServerSocketChannel))
(def ^Class jdk-unix-class
(find-class 'java.nio.channels.SocketChannel))
(def ^:private test-junixsocket?
;; Make it possible to test junixsocket even when JDK >= 16
(= "true" (System/getProperty "nrepl.test.junixsocket")))
(def unix-domain-flavor
(cond
test-junixsocket? (do
(assert junixsocket-address-class)
(assert junixsocket-server-class)
(binding [*out* *err*]
(println "nrepl.test: insisting on junixsocket support"))
:junixsocket)
(and jdk-unix-address-class jdk-unix-server-class) :jdk
(and junixsocket-address-class junixsocket-server-class) :junixsocket
:else nil))
(def jdk-unix-address-of
(when (= :jdk unix-domain-flavor)
(let [addr-of (.getDeclaredMethod jdk-unix-address-class "of"
(into-array Class [String]))]
(fn [path] (.invoke addr-of nil (into-array String [path]))))))
(def junix-address-of
(when (= :junixsocket unix-domain-flavor)
(or (try (let [addr-of (.getDeclaredMethod junixsocket-address-class "of"
(into-array Class [File]))]
(fn [path] (.invoke addr-of nil (into-array File [(File. ^String path)]))))
(catch NoSuchMethodException _))
;; For junixsocket versions < 2.4.0
(let [c (.getDeclaredConstructor junixsocket-address-class
(into-array Class [File]))]
(fn [path] (.newInstance c (into-array File [(File. ^String path)])))))))
(defn unix-socket-address
"Returns a filesystem socket address for the given path string."
[^String path]
(case unix-domain-flavor
:jdk (jdk-unix-address-of path)
:junixsocket (junix-address-of path)
(let [msg "Support for filesystem sockets requires JDK 16+ or a junixsocket dependency"]
(log msg)
(throw (ex-info msg {:nrepl/kind ::no-filesystem-sockets})))))
(def jdk-unix-server-socket
;; Dynamic because one argument open doesn't exist until jvm 15, nor UNIX
;; until jvm 16.
(when (= :jdk unix-domain-flavor)
(let [protocol (-> (.getDeclaredField StandardProtocolFamily "UNIX")
(.get StandardProtocolFamily))
protocol (into-array ProtocolFamily [protocol])
open (.getDeclaredMethod ServerSocketChannel "open"
(into-array Class [ProtocolFamily]))]
#(.invoke open nil protocol))))
(def jdk-unix-socket
;; Dynamic because one argument open doesn't exist until jvm 15, nor UNIX
;; until jvm 16.
(when (= :jdk unix-domain-flavor)
(let [protocol (-> (.getDeclaredField StandardProtocolFamily "UNIX")
(.get StandardProtocolFamily))
protocol (into-array ProtocolFamily [protocol])
open (.getDeclaredMethod SocketChannel "open"
(into-array Class [ProtocolFamily]))]
#(.invoke open nil protocol))))
(def junix-server-socket
(when (= :junixsocket unix-domain-flavor)
(let [make (.getDeclaredMethod junixsocket-server-class "newInstance" nil)]
#(.invoke make nil nil))))
(def junix-socket
(when (= :junixsocket unix-domain-flavor)
(let [make (.getDeclaredMethod junixsocket-class "newInstance" nil)]
#(.invoke make nil nil))))
(defn unix-server-socket
"Returns a filesystem socket bound to the path if the JDK is version
16 or newer or if com.kohlschutter.junixsocket/junixsocket-core can
be loaded dynamically. Otherwise throws the ex-info map
{:nrepl/kind ::no-filesystem-sockets}."
[^String path]
(let [^SocketAddress addr (unix-socket-address path)]
(case unix-domain-flavor
:jdk
(let [sock (jdk-unix-server-socket)]
(.bind ^ServerSocketChannel sock addr)
(let [^Path path (get-path addr)]
(-> path .toFile .deleteOnExit))
sock)
:junixsocket
(let [sock (junix-server-socket)]
(.bind ^ServerSocket sock addr)
(let [^String path (get-path addr)]
(-> path File. .deleteOnExit))
sock)
(let [msg "Support for filesystem sockets requires JDK 16+ or a junixsocket dependency"]
(log msg)
(throw (ex-info msg {:nrepl/kind ::no-filesystem-sockets}))))))
(defn unix-client-socket
"Returns a filesystem socket bound to the path if the JDK is version
16 or newer or if com.kohlschutter.junixsocket/junixsocket-core can
be loaded dynamically. Otherwise throws the ex-info map
{:nrepl/kind ::no-filesystem-sockets}."
[^String path]
(let [^SocketAddress addr (unix-socket-address path)]
(case unix-domain-flavor
:jdk
(let [sock (jdk-unix-socket)]
(.connect ^SocketChannel sock addr)
sock)
:junixsocket
(let [sock (junix-socket)]
(.connect ^Socket sock addr)
sock)
(let [msg "Support for filesystem sockets requires JDK 16+ or a junixsocket dependency"]
(log msg)
(throw (ex-info msg {:nrepl/kind ::no-filesystem-sockets}))))))
(defn as-nrepl-uri [sock transport-scheme]
(let [get-local-addr (fn [^NetworkChannel c] (.getLocalAddress c))]
(if-let [addr (and (some-> jdk-unix-server-class (instance? sock))
(get-local-addr sock))]
(URI. (str transport-scheme "+unix")
(let [^Path path (get-path addr)]
(-> path .toAbsolutePath str))
nil)
(if-let [addr (and (some-> junixsocket-server-class (instance? sock))
(.getLocalSocketAddress ^ServerSocket sock))]
(URI. (str transport-scheme "+unix")
(get-path addr)
nil)
;; Assume it's an InetAddress socket
(let [sock ^ServerSocket sock]
(URI. (str transport-scheme
(when (instance? SSLServerSocket sock)
"s"))
nil ;; userInfo
(-> sock .getInetAddress .getHostName)
(.getLocalPort sock)
nil ;; path
nil ;; query
nil)))))) ;; fragment
(defprotocol Acceptable
(accept [s]
"Accepts a connection on s. Throws ClosedChannelException if s is
closed."))
(extend-protocol Acceptable
ServerSocketChannel
(accept [s] (.accept s))
SSLServerSocket
(accept [s]
(when (.isClosed s)
(throw (ClosedChannelException.)))
(tls/accept s))
ServerSocket
(accept [s]
(when (.isClosed s)
(throw (ClosedChannelException.)))
(.accept s)))
;; We have to handle this ourselves for NIO because unfortunately read and write
;; hang if we use both Channels/newInputStream and Channels/newOutputStream.
;; Read and write deadlock on a shared channel input/output stream lock
;; (cf. https://bugs.openjdk.java.net/browse/JDK-4509080). Verified that this
;; still happens (via thread dump when hung) with jdk 16.
(defprotocol Writable
;; Underscores were added to satisfy clj-kondo
(write
[w byte-array]
[w byte-array offset length]
"Writes the given bytes to the output as per OutputStream write."))
(extend-protocol Writable
OutputStream
(write
([s byte-array] (.write ^OutputStream s ^"[B" byte-array))
([s byte-array offset length]
(.write ^OutputStream s byte-array offset length))))
(defrecord BufferedOutputChannel
[^SocketChannel channel ^ByteBuffer buffer]
java.io.Flushable
(flush [_this] ;; Underscore was added to satisfy clj-kondo
(.flip buffer)
(.write channel buffer)
(.clear buffer))
Writable
(write [this byte-array]
(.write this byte-array 0 (count byte-array)))
(write [this byte-array offset length]
(if (> length (.capacity buffer))
(do
(.flush this)
(.write channel (ByteBuffer/wrap byte-array offset length)))
(do
(when (> length (.remaining buffer))
(.flush this))
(.put buffer byte-array offset length)))))
(defn buffered-output-channel [^SocketChannel channel bytes]
(assert (.isBlocking channel))
(->BufferedOutputChannel channel (ByteBuffer/allocate bytes)))
(defprotocol AsBufferedInputStreamSubset
(buffered-input [x]
"Returns a buffered stream (subset of BufferedInputStream) reading from x."))
(extend-protocol AsBufferedInputStreamSubset
;; Use the Channels stream for input but not output to avoid the deadlock
SocketChannel (buffered-input [s] (-> s Channels/newInputStream io/input-stream))
Socket (buffered-input [s] (io/input-stream s))
BufferedInputStream (buffered-input [s] s))
(defprotocol AsBufferedOutputStreamSubset
(buffered-output [x]
"Returns a buffered stream (subset of BufferedOutputStream) reading from x."))
(extend-protocol AsBufferedOutputStreamSubset
;; Use the Channels stream for input but not output to avoid the deadlock
SocketChannel (buffered-output [s] (buffered-output-channel s 8192))
Socket (buffered-output [s] (io/output-stream s))
BufferedOutputStream (buffered-output [s] s))