Permalink
Browse files

mark beta13, update to match changes to lamina, and make a few perfor…

…mance tweaks
  • Loading branch information...
1 parent 0443d28 commit fa631e37001f08142c6338339b52f0eac7d30391 @ztellman committed Feb 18, 2013
Showing with 80 additions and 61 deletions.
  1. +2 −2 project.clj
  2. +54 −31 src/aleph/http/core.clj
  3. +19 −15 src/aleph/netty/core.clj
  4. +1 −1 src/aleph/stomp.clj
  5. +3 −3 src/aleph/stomp/core.clj
  6. +1 −9 test/aleph/test/stomp.clj
View
@@ -1,4 +1,4 @@
-(defproject aleph "0.3.0-SNAPSHOT"
+(defproject aleph "0.3.0-beta13"
:description "a framework for asynchronous communication"
:repositories {"jboss" "http://repository.jboss.org/nexus/content/groups/public/"
"sonatype-oss-public" "https://oss.sonatype.org/content/groups/public/"}
@@ -8,7 +8,7 @@
:dependencies [[org.clojure/tools.logging "0.2.3"]
[org.clojure/data.xml "0.0.7"]
[io.netty/netty "3.6.2.Final"]
- [lamina "0.5.0-SNAPSHOT"]
+ [lamina "0.5.0-beta12"]
[gloss "0.2.2-beta5"]
[cheshire "5.0.1"]
[commons-codec/commons-codec "1.7"]
@@ -240,46 +240,69 @@
;;;
-(def-map-type RequestMap [^HttpRequest netty-request ^Channel netty-channel headers body]
+(def-map-type RequestMap
+ [^HttpRequest netty-request
+ ^Channel netty-channel
+ ext
+ headers
+ body]
(get [_ k default-value]
- (case k
- :scheme :http
- :keep-alive? (HttpHeaders/isKeepAlive netty-request)
- :remote-addr (netty/channel-remote-host-address netty-channel)
- :server-name (netty/channel-local-host-name netty-channel)
- :server-port (netty/channel-local-port netty-channel)
- :request-method (request-method netty-request)
- :headers @headers
- :content-type (http-content-type netty-request)
- :character-encoding (http-character-encoding netty-request)
- :uri (request-uri netty-request)
- :query-string (request-query-string netty-request)
- :content-length (http-content-length netty-request)
- :body body
- default-value))
+ (if (and ext
+ (contains? ext k))
+ (get ext k default-value)
+ (case k
+ :scheme :http
+ :keep-alive? (HttpHeaders/isKeepAlive netty-request)
+ :remote-addr (netty/channel-remote-host-address netty-channel)
+ :server-name (netty/channel-local-host-name netty-channel)
+ :server-port (netty/channel-local-port netty-channel)
+ :request-method (request-method netty-request)
+ :headers @headers
+ :content-type (http-content-type netty-request)
+ :character-encoding (http-character-encoding netty-request)
+ :uri (request-uri netty-request)
+ :query-string (request-query-string netty-request)
+ :content-length (http-content-length netty-request)
+ :body body
+ default-value)))
(assoc [this k v]
- (assoc (into {} this) k v))
+ (RequestMap.
+ netty-request
+ netty-channel
+ (assoc ext k v)
+ headers
+ body))
(dissoc [this k]
- (dissoc (into {} this) k))
+ (if (and ext
+ (contains? ext k)
+ (not (contains? (keys this) k)))
@tomjack
tomjack Feb 21, 2013 Contributor

Should this be (not (contains? this k))?

+ (RequestMap.
+ netty-request
+ netty-channel
+ (dissoc ext k)
+ headers
+ body)
+ (dissoc (into {} this) k)))
(keys [this]
- [:scheme
- :keep-alive?
- :remote-addr
- :server-name
- :server-port
- :request-method
- :headers
- :content-type
- :character-encoding
- :uri
- :query-string
- :content-length
- :body]))
+ #{:scheme
+ :keep-alive?
+ :remote-addr
+ :server-name
+ :server-port
+ :request-method
+ :headers
+ :content-type
+ :character-encoding
+ :uri
+ :query-string
+ :content-length
+ :body}))
(defn netty-request->ring-map [{netty-request :msg, chunks :chunks}]
(RequestMap.
netty-request
(netty/current-channel)
+ nil
(delay (http-headers netty-request))
(if chunks
(map* #(.getContent ^HttpChunk %) chunks)
@@ -202,45 +202,49 @@
(compare-and-set! open? false channel-open?))
(do
(.add channel-group (.getChannel ^ChannelEvent evt))
- (enqueue connection-probe
- {:name pipeline-name
- :connections (+ offset (count channel-group))}))
+ (when (probe-enabled? connection-probe)
+ (enqueue connection-probe
+ {:name pipeline-name
+ :connections (+ offset (count channel-group))})))
(and
(not channel-open?)
@open?
(compare-and-set! closed? true channel-open?))
- (enqueue connection-probe
- {:name pipeline-name
- :connections (+ offset (count channel-group))}))))
+ (when (probe-enabled? connection-probe)
+ (enqueue connection-probe
+ {:name pipeline-name
+ :connections (+ offset (count channel-group))})))))
(.sendUpstream ctx evt)))))
(defn socket-address->map [^InetSocketAddress socket-address]
(when (instance? InetSocketAddress socket-address)
{:host (.getHostName socket-address)
- :canonical-host (-> socket-address .getAddress .getCanonicalHostName)
+ ;:canonical-host (-> socket-address .getAddress .getCanonicalHostName)
@tomjack
tomjack Feb 21, 2013 Contributor

I was experiencing a problem where this line, and the .getHostName before it, would each take 5s (so without this change every request took 10s). The socket-address was my router, since I'm port-forwarding in. Adding an entry to /etc/hosts for my router IP seems to solve the problem. Is this expected behavior given a bad networking setup, or should Aleph avoid this?

@ztellman
ztellman Feb 21, 2013 Owner

I've made it so that this code is only executed when the traffic:in or traffic:out probes are consumed, so this should only come up when you're using that instrumentation. However, should you do that, there's no avoiding these sorts of issues.

@tomjack
tomjack Feb 21, 2013 Contributor

Excellent. Thanks again.

:port (.getPort socket-address)}))
(defn upstream-traffic-handler [pipeline-name]
(let [traffic-probe (probe-channel [pipeline-name :traffic :in])]
(reify ChannelUpstreamHandler
(handleUpstream [_ ctx evt]
(when-let [^ChannelBuffer msg (event-message evt)]
- (enqueue traffic-probe
- {:name pipeline-name
- :address (socket-address->map (event-remote-address evt))
- :bytes (.readableBytes msg)}))
+ (when (probe-enabled? traffic-probe)
+ (enqueue traffic-probe
+ {:name pipeline-name
+ :address (socket-address->map (event-remote-address evt))
+ :bytes (.readableBytes msg)})))
(.sendUpstream ctx evt)))))
(defn downstream-traffic-handler [pipeline-name]
(let [traffic-probe (probe-channel [pipeline-name :traffic :out])]
(reify ChannelDownstreamHandler
(handleDownstream [_ ctx evt]
(when-let [^ChannelBuffer msg (event-message evt)]
- (enqueue traffic-probe
- {:name pipeline-name
- :address (socket-address->map (event-remote-address evt))
- :bytes (.readableBytes msg)}))
+ (when (probe-enabled? traffic-probe)
+ (enqueue traffic-probe
+ {:name pipeline-name
+ :address (socket-address->map (event-remote-address evt))
+ :bytes (.readableBytes msg)})))
(.sendDownstream ctx evt)))))
View
@@ -9,7 +9,7 @@
(ns aleph.stomp
(:use
[potemkin]
- [lamina core connections trace])
+ [lamina core connections])
(:require
[clojure.string :as str]
[lamina.cache :as cache]
@@ -108,7 +108,7 @@
:subscribe
(siphon
- (c/subscribe outer-router (message-destination msg))
+ (c/subscribe outer-router (message-destination msg) {})
(c/get-or-create bridges (message-id msg) nil)
ch)
@@ -120,7 +120,7 @@
c/IRouter
(inner-cache [_]
(c/inner-cache inner-router))
- (subscribe- [_ topic args]
- (c/subscribe- outer-router topic args)))))
+ (subscribe [_ topic options]
+ (c/subscribe outer-router topic options)))))
;;;
@@ -11,7 +11,7 @@
[clojure test]
[aleph stomp tcp]
[aleph.test utils]
- [lamina core connections trace executor]))
+ [lamina core connections executor]))
;;;
@@ -46,14 +46,6 @@
(finally
(stop-server#)))))
-(defn probe-producer [id]
- (map*
- (fn [msg]
- {:command :send
- :headers {"destination" id}
- :body (pr-str msg)})
- (probe-channel id)))
-
;;;
(def messages

0 comments on commit fa631e3

Please sign in to comment.