-
Notifications
You must be signed in to change notification settings - Fork 11
/
buffer.clj
415 lines (343 loc) · 9.39 KB
/
buffer.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
(ns net.ty.buffer
"Utility functions to build fixed size buffers.
These functions hold onto data and yield when
a specific size has been reached."
(:require [net.ty.coerce :as co])
(:import io.netty.buffer.Unpooled
io.netty.buffer.CompositeByteBuf
io.netty.buffer.ByteBuf
io.netty.buffer.ByteBufHolder
java.nio.ByteBuffer))
(defprotocol Bufferizable
(as-buffer [this]))
(extend-protocol Bufferizable
ByteBufHolder
(as-buffer [this] (.content this))
ByteBuf
(as-buffer [this] this))
(defn buffer?
[x]
(instance? ByteBuf x))
(defn buffer
([]
(Unpooled/buffer))
([init]
(Unpooled/buffer (int init)))
([init max]
(Unpooled/buffer (int init) (int max))))
(defn refcount
[^ByteBuf buf]
(.refCnt buf))
(defn capacity
[^ByteBuf buf]
(.capacity buf))
(defn direct-buffer
([]
(Unpooled/directBuffer))
([init]
(Unpooled/directBuffer (int init)))
([init max]
(Unpooled/directBuffer (int init) (int max))))
(defn ^CompositeByteBuf composite
([]
(Unpooled/compositeBuffer))
([max]
(Unpooled/compositeBuffer (int max))))
(defn ^CompositeByteBuf consolidate
[^CompositeByteBuf cb]
(.consolidate cb))
(defn unwrap
[^ByteBuf buf]
(.unwrap buf))
(defn composite?
([buf]
(instance? CompositeByteBuf buf)))
(defn augment-composite
([^CompositeByteBuf cb ^ByteBuf buf]
(augment-composite (or (unwrap cb) cb) true buf))
([^CompositeByteBuf cb inc-index ^ByteBuf buf]
(.addComponent cb (boolean inc-index) buf)))
(defn composite-of
[& parts]
(reduce augment-composite (composite) parts))
(defn component-count
[^CompositeByteBuf cb]
(.numComponents cb))
(defn component
[^CompositeByteBuf cb i]
(.component cb (int i)))
(defn component-iterator
[^CompositeByteBuf cb]
(.iterator cb))
(defn components
[^CompositeByteBuf cb]
(iterator-seq (component-iterator cb)))
(defn ^ByteBuf ro-view
[^ByteBuf buf]
(.asReadOnly buf))
(defn is-direct?
[^ByteBuf buf]
(.isDirect buf))
(defn release
[^ByteBuf buf]
(.release buf))
(defn ensure-released [buf]
(while (and (pos? (refcount buf)) (release buf))))
(defn touch
[^ByteBuf buf]
(.touch buf))
(defn retain
([^ByteBuf buf]
(.retain buf))
([^ByteBuf buf incr]
(.retain buf (int incr))))
(defn retain-last
[^ByteBuf buf]
(if (composite? buf)
(let [sub (last (components buf))]
(retain sub))
(retain buf)))
(defn retain-all
[buf]
(let [buf (or (unwrap buf) buf)]
(retain buf)
(when (composite? buf)
(doseq [c (components buf)]
(retain c))))
buf)
(defn release-all
[buf]
(let [buf (or (unwrap buf) buf)]
(when (composite? buf)
(doseq [c (components buf)]
(release c)))
(release buf))
buf)
(defn ^ByteBuf copy
[^ByteBuf buf]
(Unpooled/copiedBuffer buf))
(defn ^ByteBuf duplicate
[^ByteBuf buf]
(.duplicate buf))
(def empty-buffer "An empty bytebuffer" Unpooled/EMPTY_BUFFER)
(defn readable-bytes
[^ByteBuf buf]
(.readableBytes buf))
(defn empty-buffer?
[buf]
(or (= buf empty-buffer)
(zero? (readable-bytes buf))))
(defn wrapped-bytes
[^"[B" ba]
(Unpooled/wrappedBuffer ba))
(defn wrapped-string
[^String s]
(let [ba (.getBytes s)]
(wrapped-bytes ba)))
(defn reader-index
[^ByteBuf buf]
(.readerIndex buf))
(defn writer-index
[^ByteBuf buf]
(.writerIndex buf))
(defn index-of
"Look for the first index of the byte b in a ByteBuf return it or nil"
([^ByteBuf buf b]
(when-let [idx (index-of buf (reader-index buf) (writer-index buf) b)]
(- idx (reader-index buf))))
([^ByteBuf buf from to b]
(let [idx (.indexOf buf (int from) (int to) (byte b))]
(when-not (neg? idx)
idx))))
(defn exhausted?
[^ByteBuf buf]
(zero? (readable-bytes buf)))
(defn ^ByteBuf skip-bytes
([^ByteBuf buf]
(skip-bytes buf (readable-bytes buf)))
([^ByteBuf buf len]
(.skipBytes buf (int len))))
(defn ^ByteBuf write-bytes
([^ByteBuf dst bytes]
(cond
(nil? dst) bytes
(bytes? bytes) (.writeBytes dst ^"[B" bytes)
(instance? ByteBuf bytes) (.writeBytes dst ^ByteBuf bytes)
:else (throw (IllegalArgumentException.
(str "cannot write bytes from" (class bytes))))))
([^ByteBuf dst ^"[B" ba idx len]
(.writeBytes dst ba (int idx ) (int len)) ))
(defn ^ByteBuf write-string
[^ByteBuf buf ^String s]
(.writeBytes buf (.getBytes s)))
(defn ^ByteBuf merge-bufs
[buffers]
(Unpooled/wrappedBuffer ^"[Lio.netty.buffer.ByteBuf;"
(into-array ByteBuf buffers)))
(defn ^ByteBuf write-and-skip-bytes
([^ByteBuf dst ^ByteBuf src]
(write-and-skip-bytes dst src (readable-bytes src)))
([^ByteBuf dst ^ByteBuf src len]
(skip-bytes src len)
(write-bytes dst src)))
(defn ^String to-string
([^ByteBuf buf ^Boolean release?]
(let [ba (byte-array (readable-bytes buf))]
(.readBytes buf ba)
(when release?
(release buf))
(String. ba "UTF-8")))
([^ByteBuf buf]
(to-string buf false)))
(defn ^"[B" to-bytes
[^ByteBuf buf]
(let [ba (byte-array (readable-bytes buf))]
(.readBytes buf ba)
ba))
(defn ^ByteBuf mark-reader-index
[^ByteBuf buf]
(.markReaderIndex buf))
(defn ^ByteBuf reset-reader-index
[^ByteBuf buf]
(.resetReaderIndex buf))
(defn ^ByteBuf retained-slice
([^ByteBuf src]
(if (empty-buffer? src)
empty-buffer
(.retainedSlice src)))
([^ByteBuf src length]
(if (pos? length)
(.retainedSlice src (reader-index src) (int length))
empty-buffer))
([^ByteBuf src index length]
(if (pos? length)
(.retainedSlice src (int index) (int length))
empty-buffer)))
(defn ^ByteBuf slice
([^ByteBuf src]
(if (empty-buffer? src)
empty-buffer
(.slice src)))
([^ByteBuf src length]
(if (pos? length)
(.slice src (reader-index src) (int length))
empty-buffer))
([^ByteBuf src index length]
(if (pos? length)
(.slice src (int index) (int length))
empty-buffer)))
(def sliced-string (comp to-string slice))
(defn ^ByteBuf read-retained-slice
([^ByteBuf src]
(if (empty-buffer? src)
empty-buffer
(.readRetainedSlice src (readable-bytes src))))
([^ByteBuf src length]
(if (pos? length)
(.readRetainedSlice src (int length))
empty-buffer)))
(defn ^ByteBuf read-slice
([^ByteBuf src]
(if (empty-buffer? src)
empty-buffer
(.readSlice src (readable-bytes src))))
([^ByteBuf src length]
(if (pos? length)
(.readSlice src (int length))
empty-buffer)))
(defn ^ByteBuffer nio-buffer
[^ByteBuf buf]
(.nioBuffer buf))
(defn retain-augment
([^CompositeByteBuf cb ^ByteBuf buf]
(let [prevcount (refcount buf)]
(augment-composite cb (retained-slice buf))
(when (> (refcount buf) prevcount)
(release buf))
cb)))
(defn refcounts
[buf]
(let [buf (or (unwrap buf) buf)]
(if (composite? buf)
[(refcount buf)
(let [len (component-count buf)]
(vec (for [i (range len)]
(try
(refcount (component buf i))
(catch Exception _)))))]
(try (refcount buf) (catch Exception _)))))
(defn ^Exception wrong-byte-arg
"Build an illegal argument exception"
[b]
(IllegalArgumentException.
(str "wrong argument to byte-length: " (pr-str b))))
(defn ^Long byte-length
"Figure out how "
[bytes]
(cond
(nil? bytes) 0
(bytes? bytes) (count bytes)
(instance? ByteBuf bytes) (- (.writerIndex ^ByteBuf bytes)
(.readerIndex ^ByteBuf bytes))
:else (throw (wrong-byte-arg bytes))))
(defn ^ByteBuf write-ushort
"Push a short to a bytebuf, we keep host endianness over
the network for optimization reasons"
[^ByteBuf buf ^Long i]
(.writeShortLE buf (co/ushort->short i)))
(defn ^ByteBuf write-uint
"Push an int to a bytebuf, we keep host endianness over
the network for optimization reasons"
[^ByteBuf buf ^Long i]
(.writeIntLE buf (co/uint->int i)))
(defn ^ByteBuf write-ulong
"Push a long to a bytebuf, we keep host endianness over
the network for optimization reasons"
[^ByteBuf buf ^Long l]
(.writeLongLE buf (co/ulong->long l)))
(defn ^ByteBuf write-byte-range
"Push the specified range of contents of a byte array to a ByteBuf"
[^ByteBuf buf bytes ^long index ^long length]
(cond
(bytes? bytes) (.writeBytes buf ^"[B" bytes index length)
(instance? ByteBuf bytes) (.writeBytes buf ^ByteBuf bytes index length)
:else (throw (IllegalArgumentException.
(str "cannot write bytes from" (class bytes)))))
buf)
(defn ^Long read-short
"Fetch a short from a ByteBuf. Host endianness."
[^ByteBuf buf]
(.readShortLE buf))
(defn ^Long read-ushort
[^ByteBuf buf]
(co/short->ushort (read-short buf)))
(defn ^Long read-int
"Fetch an integer from a ByteBuf. Host endianness."
[^ByteBuf buf]
(.readIntLE buf))
(defn ^Long read-uint
[^ByteBuf buf]
(co/int->uint (read-int buf)))
(defn ^Long read-long
"Fetch a long from a ByteBuf. Host endianness."
[^ByteBuf buf]
(.readLongLE buf))
(defn ^Long read-ulong
[^ByteBuf buf]
(co/long->ulong (read-long buf)))
(defn read-byte
[^ByteBuf buf]
(.readByte buf))
(defn read-ubyte
[^ByteBuf buf]
(co/byte->ubyte (read-byte buf)))
(defn ^"[B" read-bytes
[^ByteBuf buf ^"[B" ba]
(.readBytes buf ba)
ba)
(defn read-char
[buf]
(char (read-byte buf)))
(defn read-digest
[buf]
(read-bytes buf (byte-array 16)))