Skip to content

Commit

Permalink
make bandwidth tracking opt-in, mark alpha1
Browse files Browse the repository at this point in the history
  • Loading branch information
ztellman committed Jun 18, 2015
1 parent ed682a8 commit c5d61c8
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 64 deletions.
7 changes: 3 additions & 4 deletions project.clj
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
(defproject aleph "0.4.1-SNAPSHOT"
(defproject aleph "0.4.1-alpha1"
:description "a framework for asynchronous communication"
:repositories {"jboss" "http://repository.jboss.org/nexus/content/groups/public/"
"sonatype-oss-public" "https://oss.sonatype.org/content/groups/public/"}
:license {:name "MIT License"}
:dependencies [[org.clojure/tools.logging "0.3.1" :exclusions [org.clojure/clojure]]
[io.netty/netty-all "4.1.0.Beta4"]
[io.aleph/dirigiste "0.1.0"]
[manifold "0.1.0"]
[io.netty/netty-all "4.1.0.Beta5"]
[manifold "0.1.1-alpha2"]
[byte-streams "0.2.0"]
[potemkin "0.3.13"]]
:profiles {:dev {:dependencies [[org.clojure/clojure "1.7.0-alpha5"]
Expand Down
5 changes: 4 additions & 1 deletion src/aleph/http/client.clj
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,10 @@
(TextWebSocketFrame. (bs/to-string %))
(BinaryWebSocketFrame. (netty/to-byte-buf ctx %))))]

(d/success! d (s/splice out @in))
(d/success! d
(doto
(s/splice out @in)
(reset-meta! {:aleph/channel ch})))

(s/on-drained @in
#(d/chain (.writeAndFlush ch (CloseWebSocketFrame.))
Expand Down
9 changes: 7 additions & 2 deletions src/aleph/http/server.clj
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,10 @@

;;;

(defn websocket-server-handler [raw-stream? ^Channel ch ^WebSocketServerHandshaker handshaker]
(defn websocket-server-handler
[raw-stream?
^Channel ch
^WebSocketServerHandshaker handshaker]
(let [d (d/deferred)
out (netty/sink ch false
#(if (instance? CharSequence %)
Expand All @@ -463,7 +466,9 @@
netty/wrap-future
(fn [_] (.close ch))))

[(s/splice out in)
[(doto
(s/splice out in)
(reset-meta! {:aleph/channel ch}))

(netty/channel-handler

Expand Down
114 changes: 64 additions & 50 deletions src/aleph/netty.clj
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,23 @@
(def ^ConcurrentHashMap channel-inbound-throughput (ConcurrentHashMap.))
(def ^ConcurrentHashMap channel-outbound-throughput (ConcurrentHashMap.))

(defn- connection-stats [^Channel ch]
{:local-address (str (.localAddress ch))
:remote-address (str (.remoteAddress ch))
:writable? (.isWritable ch)
:readable? (-> ch .config .isAutoRead)
:closed? (not (.isOpen ch))})
(defn- connection-stats [^Channel ch inbound?]
(merge
{:local-address (str (.localAddress ch))
:remote-address (str (.remoteAddress ch))
:writable? (.isWritable ch)
:readable? (-> ch .config .isAutoRead)
:closed? (not (.isActive ch))}
(let [^ConcurrentHashMap throughput (if inbound?
channel-inbound-throughput
channel-outbound-throughput)]
(when-let [^AtomicLong throughput (.get throughput ch)]
{:throughput (.get throughput)}))))

(manifold/def-sink ChannelSink
[coerce-fn
^AtomicLong throughput
downstream?
ch]
^Channel ch]
(close [this]
(when downstream?
(close ch))
Expand All @@ -289,30 +294,32 @@
(description [_]
(let [ch (channel ch)]
{:type "netty"
:closed? (not (.isOpen ch))
:closed? (not (.isActive ch))
:sink? true
:connection (assoc (connection-stats ch)
:direction :outbound
:througput (.get throughput))}))
:connection (assoc (connection-stats ch false)
:direction :outbound)}))
(isSynchronous [_]
false)
(put [this msg blocking?]
(let [msg (try
(coerce-fn msg)
(catch Exception e
(log/error e
(str "cannot coerce "
(.getName (class msg))
" into binary representation"))
(close ch)))
^ChannelFuture f (write-and-flush ch msg)
d (or (wrap-future f) (d/success-deferred true))]
(if blocking?
@d
d)))
(when-not (s/closed? this)
(let [msg (try
(coerce-fn msg)
(catch Exception e
(log/error e
(str "cannot coerce "
(.getName (class msg))
" into binary representation"))
(close ch)))
^ChannelFuture f (write-and-flush ch msg)
d (or (wrap-future f) (d/success-deferred true))]
(if blocking?
@d
d))))
(put [this msg blocking? timeout timeout-value]
(.put this msg blocking?)))



(defn sink
([ch]
(sink ch true identity))
Expand All @@ -321,43 +328,38 @@
last-count (AtomicLong. 0)
sink (->ChannelSink
coerce-fn
(.get channel-outbound-throughput ch)
downstream?
ch)]

(d/chain' (.closeFuture (channel ch))
wrap-future
(fn [_] (s/close! sink)))

sink)))
(doto sink (reset-meta! {:aleph/channel ch})))))

(defn source
[^Channel ch]
(let [^AtomicLong throughput (.get channel-inbound-throughput ch)]
(let [src (s/stream*
{:description
(fn [m]
(assoc m
:type "netty"
:direction :inbound
:connection (assoc (connection-stats ch)
:direction :inbound
:throughput (.get throughput))))})]
src)))
(let [src (s/stream*
{:description
(fn [m]
(assoc m
:type "netty"
:direction :inbound
:connection (assoc (connection-stats ch true)
:direction :inbound)))})]
(doto src (reset-meta! {:aleph/channel ch}))))

(defn buffered-source
[^Channel ch metric capacity]
(let [^AtomicLong throughput (.get channel-inbound-throughput ch)]
(let [src (s/buffered-stream
metric
capacity
(fn [m]
(assoc m
:type "netty"
:connection (assoc (connection-stats ch)
:direction :inbound
:throughput (.get throughput)))))]
src)))
(let [src (s/buffered-stream
metric
capacity
(fn [m]
(assoc m
:type "netty"
:connection (assoc (connection-stats ch true)
:direction :inbound))))]
(doto src (reset-meta! {:aleph/channel ch}))))

;;;

Expand Down Expand Up @@ -493,12 +495,24 @@
(try
(.remove pipeline this)
(pipeline-builder pipeline)
(.addFirst pipeline "bandwidth-tracker" (bandwidth-tracker (channel ctx)))
(.fireChannelRegistered ctx)
(catch Throwable e
(log/warn e "Failed to initialize channel")
(.close ctx)))))))

(defn instrument!
[stream]
(if-let [^Channel ch (->> stream meta :aleph/channel)]
(do
(safe-execute ch
(let [pipeline (.pipeline ch)]
(when (and
(.isActive ch)
(nil? (.get pipeline "bandwidth-tracker")))
(.addFirst pipeline "bandwidth-tracker" (bandwidth-tracker ch)))))
true)
false))

;;;

(defn self-signed-ssl-context
Expand Down
16 changes: 10 additions & 6 deletions src/aleph/tcp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
([_ ctx]
(let [ch (.channel ctx)]
(handler
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch)))
(doto
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch)))
(reset-meta! {:aleph/channel ch}))
(->TcpConnection ch))))

:channel-read
Expand Down Expand Up @@ -101,9 +103,11 @@
([_ ctx]
(let [ch (.channel ctx)]
(d/success! d
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch))))))
(doto
(s/splice
(netty/sink ch true netty/to-byte-buf)
(reset! in (netty/source ch)))
(reset-meta! {:aleph/channel ch})))))

:channel-read
([_ ctx msg]
Expand Down
4 changes: 3 additions & 1 deletion src/aleph/udp.clj
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,9 @@
(netty/release-buf->array (.content packet)))))
in)]
(d/success! d
(s/splice out in))))
(doto
(s/splice out in)
(reset-meta! {:aleph/channel ch})))))

:channel-read
([_ ctx msg]
Expand Down

0 comments on commit c5d61c8

Please sign in to comment.