/
core.clj
157 lines (139 loc) · 5.43 KB
/
core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
(ns link.core
(:refer-clojure :exclude [send])
(:use [link.util :only [make-handler-macro]])
(:import [java.net InetSocketAddress InetAddress])
(:import [io.netty.channel
Channel
ChannelId
ChannelFuture
ChannelHandlerContext
SimpleChannelInboundHandler])
(:import [io.netty.channel.socket.nio NioSocketChannel])
(:import [io.netty.util AttributeKey Attribute]
[io.netty.util.concurrent GenericFutureListener]))
(defprotocol LinkMessageChannel
(id [this])
(short-id [this])
(send! [this msg])
(send!* [this msg cb])
(valid? [this])
(channel-addr [this])
(remote-addr [this])
(close! [this]))
(defn- client-channel-valid? [^Channel ch]
(and ch (.isActive ch)))
(defn- addr-str [^InetSocketAddress addr]
(str (.. addr (getAddress) (getHostAddress))
":" (.getPort addr)))
(defn channel-id [^Channel ch]
(.asLongText ^ChannelId (.id ch)))
(defn short-channel-id [^Channel ch]
(.asShortText ^ChannelId (.id ch)))
(deftype ClientSocketChannel [ch-agent factory-fn stopped]
LinkMessageChannel
(id [this]
(channel-id @ch-agent))
(short-id [this]
(short-channel-id @ch-agent))
(send! [this msg]
(send!* this msg nil))
(send!* [this msg cb]
(clojure.core/send-off ch-agent
(fn [ch]
(let [ch- (if (client-channel-valid? ch)
ch
(do
(when ch
(.close ^Channel ch))
(factory-fn)))
cf (when (client-channel-valid? ch-)
(if cb
(.writeAndFlush ^Channel ch- msg)
(.writeAndFlush ^Channel ch- msg (.voidPromise ^Channel ch-))))]
(when (and cf cb)
(.addListener ^ChannelFuture cf
(reify GenericFutureListener
(operationComplete [this f]
(cb f)))))
ch-))))
(channel-addr [this]
(.localAddress ^Channel @ch-agent))
(remote-addr [this]
(.remoteAddress ^Channel @ch-agent))
(close! [this]
(reset! stopped true)
(when @ch-agent
(.close ^Channel @ch-agent)))
(valid? [this]
(client-channel-valid? @ch-agent)))
(extend-protocol LinkMessageChannel
NioSocketChannel
(id [this]
(channel-id this))
(short-id [this]
(short-channel-id this))
(send! [this msg]
(.writeAndFlush this msg (.voidPromise this)))
(send!* [this msg cb]
(if cb
(let [cf (.writeAndFlush this msg)]
(.addListener ^ChannelFuture cf (reify GenericFutureListener
(operationComplete [this f] (cb f)))))
(.writeAndFlush this msg (.voidPromise this))))
(channel-addr [this]
(.localAddress this))
(remote-addr [this]
(.remoteAddress this))
(close! [this]
(.close this))
(valid? [this]
(.isActive this)))
(make-handler-macro message)
(make-handler-macro error)
(make-handler-macro active)
(make-handler-macro inactive)
(make-handler-macro event)
(make-handler-macro channel-writability-changed)
(defmacro create-handler0 [sharable & body]
`(let [handlers# (merge ~@body)]
(proxy [SimpleChannelInboundHandler] []
(isSharable [] ~sharable)
(channelActive [^ChannelHandlerContext ctx#]
(if-let [handler# (:on-active handlers#)]
(when (false? (handler# (.channel ctx#)))
(.fireChannelActive ctx#))
(.fireChannelActive ctx#)))
(channelInactive [^ChannelHandlerContext ctx#]
(if-let [handler# (:on-inactive handlers#)]
(when (false? (handler# (.channel ctx#)))
(.fireChannelInactive ctx#))
(.fireChannelInactive ctx#)))
(channelWritabilityChanged [^ChannelHandlerContext ctx#]
(if-let [handler# (:on-channel-writability-changed handlers#)]
(when (false? (handler# (.channel ctx#)))
(.fireChannelWritabilityChanged ctx#))
(.fireChannelWritabilityChanged ctx#)))
(exceptionCaught [^ChannelHandlerContext ctx#
^Throwable e#]
(if-let [handler# (:on-error handlers#)]
(when (false? (handler# (.channel ctx#) e#))
(.fireExceptionCaught ctx# e#))
(.fireExceptionCaught ctx# e#)))
(channelRead0 [^ChannelHandlerContext ctx# msg#]
(when-let [handler# (:on-message handlers#)]
(handler# (.channel ctx#) msg#)))
(userEventTriggered [^ChannelHandlerContext ctx# evt#]
(if-let [handler# (:on-event handlers#)]
(when (false? (handler# (.channel ctx#) evt#))
(.fireUserEventTriggered ctx# evt#))
(.fireUserEventTriggered ctx# evt#))))))
(defmacro create-handler [& body]
`(create-handler0 true ~@body))
(defmacro create-stateful-handler [& body]
`(fn [_] (create-handler0 false ~@body)))
(defn channel-attr-set! [ch key value]
(let [a (.attr ^Channel ch (AttributeKey/valueOf (str key)))]
(.set ^Attribute a value)))
(defn channel-attr-get [ch key]
(when-let [a (.attr ^Channel ch (AttributeKey/valueOf (str key)))]
(.get ^Attribute a)))