Skip to content

Commit

Permalink
Merge pull request #378 from kachayev/ft-split-inbound-outbound
Browse files Browse the repository at this point in the history
Split Inbound and Outbound handlers to take advantage of Netty's optimizations
  • Loading branch information
ztellman committed Apr 23, 2018
2 parents aeeda4b + 0d85601 commit a5686af
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 10 deletions.
8 changes: 4 additions & 4 deletions src/aleph/http/client.clj
Expand Up @@ -96,7 +96,7 @@
complete
body)))]

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down Expand Up @@ -148,7 +148,7 @@
complete
body)))]

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down Expand Up @@ -288,7 +288,7 @@
"Consider setting 'tunnel?' to 'true' or omit it at all"))))

(if (non-tunnel-proxy? options')
(netty/channel-handler
(netty/channel-outbound-handler
:connect
([_ ctx remote-address local-address promise]
(.connect ^ChannelHandlerContext ctx address local-address promise)))
Expand Down Expand Up @@ -536,7 +536,7 @@

[d

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down
6 changes: 3 additions & 3 deletions src/aleph/http/server.clj
Expand Up @@ -305,7 +305,7 @@

(handle-request ctx @request s))))))))]

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down Expand Up @@ -355,7 +355,7 @@
@previous-response
body
(HttpHeaders/isKeepAlive req))))]
(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down Expand Up @@ -511,7 +511,7 @@
(s/splice out in)
(reset-meta! {:aleph/channel ch}))

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down
90 changes: 90 additions & 0 deletions src/aleph/netty.clj
Expand Up @@ -482,6 +482,96 @@
~@(or (:write handlers)
`([_# ctx# msg# promise#]
(.write ctx# msg# promise#))))
(flush
~@(or (:flush handlers)
`([_# ctx#]
(.flush ctx#))))))

(defmacro channel-inbound-handler
[& {:as handlers}]
`(reify
ChannelHandler
ChannelInboundHandler

(handlerAdded
~@(or (:handler-added handlers) `([_# _#])))
(handlerRemoved
~@(or (:handler-removed handlers) `([_# _#])))
(exceptionCaught
~@(or (:exception-caught handlers)
`([_# ctx# cause#]
(.fireExceptionCaught ctx# cause#))))
(channelRegistered
~@(or (:channel-registered handlers)
`([_# ctx#]
(.fireChannelRegistered ctx#))))
(channelUnregistered
~@(or (:channel-unregistered handlers)
`([_# ctx#]
(.fireChannelUnregistered ctx#))))
(channelActive
~@(or (:channel-active handlers)
`([_# ctx#]
(.fireChannelActive ctx#))))
(channelInactive
~@(or (:channel-inactive handlers)
`([_# ctx#]
(.fireChannelInactive ctx#))))
(channelRead
~@(or (:channel-read handlers)
`([_# ctx# msg#]
(.fireChannelRead ctx# msg#))))
(channelReadComplete
~@(or (:channel-read-complete handlers)
`([_# ctx#]
(.fireChannelReadComplete ctx#))))
(userEventTriggered
~@(or (:user-event-triggered handlers)
`([_# ctx# evt#]
(.fireUserEventTriggered ctx# evt#))))
(channelWritabilityChanged
~@(or (:channel-writability-changed handlers)
`([_# ctx#]
(.fireChannelWritabilityChanged ctx#))))))

(defmacro channel-outbound-handler
[& {:as handlers}]
`(reify
ChannelHandler
ChannelOutboundHandler

(handlerAdded
~@(or (:handler-added handlers) `([_# _#])))
(handlerRemoved
~@(or (:handler-removed handlers) `([_# _#])))
(exceptionCaught
~@(or (:exception-caught handlers)
`([_# ctx# cause#]
(.fireExceptionCaught ctx# cause#))))
(bind
~@(or (:bind handlers)
`([_# ctx# local-address# promise#]
(.bind ctx# local-address# promise#))))
(connect
~@(or (:connect handlers)
`([_# ctx# remote-address# local-address# promise#]
(.connect ctx# remote-address# local-address# promise#))))
(disconnect
~@(or (:disconnect handlers)
`([_# ctx# promise#]
(.disconnect ctx# promise#))))
(close
~@(or (:close handlers)
`([_# ctx# promise#]
(.close ctx# promise#))))
(read
~@(or (:read handlers)
`([_# ctx#]
(.read ctx#))))
(write
~@(or (:write handlers)
`([_# ctx# msg# promise#]
(.write ctx# msg# promise#))))
(flush
~@(or (:flush handlers)
`([_# ctx#]
Expand Down
4 changes: 2 additions & 2 deletions src/aleph/tcp.clj
Expand Up @@ -28,7 +28,7 @@
(defn- ^ChannelHandler server-channel-handler
[handler {:keys [raw-stream?] :as options}]
(let [in (atom nil)]
(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down Expand Up @@ -95,7 +95,7 @@
in (atom nil)]
[d

(netty/channel-handler
(netty/channel-inbound-handler

:exception-caught
([_ ctx ex]
Expand Down
2 changes: 1 addition & 1 deletion src/aleph/udp.clj
Expand Up @@ -51,7 +51,7 @@
(.channel (if epoll? EpollDatagramChannel NioDatagramChannel))
(.option ChannelOption/SO_BROADCAST (boolean broadcast?))
(.handler
(netty/channel-handler
(netty/channel-inbound-handler
:exception-caught
([_ ctx ex]
(when-not (d/error! d ex)
Expand Down

0 comments on commit a5686af

Please sign in to comment.