-
Notifications
You must be signed in to change notification settings - Fork 11
/
channel.clj
263 lines (214 loc) · 5.99 KB
/
channel.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
(ns net.ty.channel
"Handy functions to deal with netty channels"
(:refer-clojure :exclude [await])
(:require [clojure.core.async :as a]
[clojure.core.async.impl.channels :as c]
[clojure.core.async.impl.protocols :as p]
[clojure.core.async.impl.dispatch :as dispatch])
(:import io.netty.channel.ChannelFuture
io.netty.channel.ChannelFutureListener
io.netty.channel.Channel
io.netty.channel.ChannelConfig
io.netty.channel.ChannelHandlerContext
io.netty.channel.DefaultChannelPromise
io.netty.channel.group.ChannelGroup
io.netty.channel.group.DefaultChannelGroup
io.netty.util.AttributeKey
io.netty.util.Attribute
io.netty.util.concurrent.GlobalEventExecutor
io.netty.bootstrap.AbstractBootstrap$PendingRegistrationPromise
java.util.concurrent.ArrayBlockingQueue
java.util.concurrent.locks.Lock))
(defprotocol ChannelBridge
(offer-val [this v]))
(defn await!
"Wait on a channel"
[^ChannelFuture channel]
(.await channel))
(defn read!
"Schedule read on a channel, preferably one with setAutoRead set to false"
[^Channel channel]
(.read channel))
(defn active?
[^Channel channel]
(.isActive channel))
(defn open?
[^Channel channel]
(.isOpen channel))
(defprotocol ChannelHolder
(get-channel [this] "Extract channel from holder"))
(defprotocol ChannelResource
(write! [this msg] "write a payload to resource")
(write-and-flush! [this msg] "write a payload, then flush a resource")
(flush! [this] "Request to flush all pending messages")
(close! [this] "Request to close resource")
(disconnect! [this])
(deregister! [this]))
(defn ^Channel channel
[x]
(get-channel x))
(extend-type AbstractBootstrap$PendingRegistrationPromise
ChannelHolder
(get-channel [this]
(.channel this)))
(extend-type ChannelHandlerContext
ChannelHolder
(get-channel [this]
(.channel this))
ChannelResource
(write! [ch msg]
(.write ch msg))
(write-and-flush! [ch msg]
(.writeAndFlush ch msg))
(flush! [ch]
(.flush ch))
(close! [ch]
(.close ch))
(disconnect! [ch]
(.disconnect ch))
(deregister! [ch]
(.deregister ch)))
(extend-type DefaultChannelPromise
ChannelHolder
(get-channel [this]
(.channel this)))
(extend-type Channel
ChannelHolder
(get-channel [this]
this)
ChannelResource
(write! [ch msg]
(.write ch msg))
(write-and-flush! [ch msg]
(.writeAndFlush ch msg))
(flush! [ch]
(.flush ch))
(close! [ch]
(.close ch))
(disconnect! [ch]
(.disconnect ch))
(deregister! [ch]
(.deregister ch)))
(extend-type ChannelGroup
ChannelResource
(write! [ch-g msg]
(.write ch-g msg))
(write-and-flush! [ch-g msg]
(.writeAndFlush ch-g msg))
(flush! [ch-g]
(.flush ch-g))
(close! [ch]
(.close ch))
(disconnect! [ch]
(.disconnect ch)))
(defn channel-group
"Create a named channel group"
[name]
(DefaultChannelGroup. ^String name
GlobalEventExecutor/INSTANCE))
(defn add-to-group
"Add a channel to a channel group"
[^ChannelGroup group
^Channel chan]
(.add group chan))
(defn remove-from-group
"Remove a channel from a channel group"
[^ChannelGroup group
^Channel chan]
(.remove group chan))
(defn sync-uninterruptibly!
"Sync a channel, without interruptions"
[^ChannelFuture chan]
(.syncUninterruptibly chan))
(defn sync!
[^ChannelFuture chan]
(.sync chan))
(defn ^ChannelFuture close-future
"Get the close future for a channel"
[^Channel chan]
(.closeFuture chan))
(defn get-attr-key
[k]
(let [sk (str k)]
(if (AttributeKey/exists sk)
(AttributeKey/valueOf sk)
(AttributeKey/newInstance sk))))
(defn ^Attribute get-attr
[^Channel chan k]
(.attr chan (get-attr-key k)))
(defn set-attr!
[^Channel chan k v]
(let [attr (get-attr chan k)]
(.set attr v)))
(defn compare-and-set-attr!
[^Channel chan k prev next]
(let [attr (get-attr chan k)]
(.compareAndSet attr prev next)))
(defn get-and-set-attr!
[^Channel chan k v]
(let [attr (get-attr chan k)]
(.getAndSet attr v)))
(defn set-attr-if-absent!
[^Channel chan k v]
(let [attr (get-attr chan k)]
(.setIfAbsent attr v)))
(defn get-attr-value
[^Channel chan k]
(let [attr (get-attr chan k)]
(.get attr)))
(defn get-and-remove-attr!
[^Channel chan k]
(let [attr (get-attr chan k)
val (.get attr)]
(.set attr nil)
val))
(defn remove-attr!
[^Channel chan k]
(let [attr (get-attr chan k)]
(.set attr nil)))
(defn config
[^Channel chan]
(.config chan))
(defn set-autoread!
[^ChannelConfig cfg autoread?]
(.setAutoRead cfg (boolean autoread?)))
(defn alloc!
[^Channel chan]
(.alloc chan))
(defn pipeline
[^Channel chan]
(.pipeline chan))
(defn channel?
[x]
(instance? Channel x))
(defn taker-fn
[^Lock handler val]
(.lock handler)
(let [f (and (p/active? handler) (p/commit handler))]
(.unlock handler)
(f val)))
(defn read-channel
"Bridge a Netty Channel to a readable channel"
[src bufsize]
(let [buffered (ArrayBlockingQueue. (int bufsize))
takers (ArrayBlockingQueue. (int bufsize))]
(reify p/ReadPort p/Channel ChannelBridge
(close! [this]
(close! src)
(doseq [taker takers]
(dispatch/run (taker-fn taker nil))))
(closed? [this]
(not (open? (channel src))))
(take! [this handler]
(if-let [val (.poll buffered)]
(c/box val)
(if-not (.offer takers handler)
(throw (ex-info "cannot enqueue read-channel taker"
{:type ::taker-queue-full}))
(and (read! (channel src)) nil))))
(offer-val [this val]
(if-let [taker (.poll takers)]
(dispatch/run (taker-fn taker val))
(when-not (.offer buffered val)
(throw (ex-info "cannot enqueue read-channel value"
{:type ::buffered-queue-full}))))))))