Permalink
Browse files

clean up streaming requests in wrap-ring-handler, and mark beta1

  • Loading branch information...
1 parent e576247 commit ab97dd81409db98cc28e3f4544b39f74a1ac7574 @ztellman committed Aug 22, 2012
Showing with 115 additions and 92 deletions.
  1. +8 −8 project.clj
  2. +83 −48 src/aleph/formats.clj
  3. +10 −23 src/aleph/http.clj
  4. +3 −3 src/aleph/http/options.clj
  5. +1 −1 src/aleph/trace/parse.clj
  6. +10 −9 test/aleph/test/http.clj
View
@@ -1,4 +1,4 @@
-(defproject aleph "0.3.0-SNAPSHOT"
+(defproject aleph "0.3.0-beta1"
:description "a framework for asynchronous communication"
:repositories {"jboss" "http://repository.jboss.org/nexus/content/groups/public/"
"sonatype-oss-public" "https://oss.sonatype.org/content/groups/public/"}
@@ -7,21 +7,21 @@
:distribution :repo}
:dependencies [[org.clojure/clojure "1.4.0"]
[org.clojure/tools.logging "0.2.3"]
- [io.netty/netty "3.5.2.Final"]
- [lamina "0.5.0-SNAPSHOT"]
- [gloss "0.2.2-alpha3"]
+ [io.netty/netty "3.5.4.Final"]
+ [lamina "0.5.0-beta1"]
+ [gloss "0.2.2-beta1"]
[cheshire "4.0.1"]
[prxml "1.3.1"]]
:multi-deps {:all [[org.clojure/tools.logging "0.2.3"]
- [io.netty/netty "3.5.2.Final"]
- [lamina "0.5.0-SNAPSHOT"]
- [gloss "0.2.2-alpha3"]
+ [io.netty/netty "3.5.4.Final"]
+ [lamina "0.5.0-beta1"]
+ [gloss "0.2.2-beta1"]
[cheshire "4.0.1"]
[prxml "1.3.1"]]
"master" [[org.clojure/clojure "1.5.0-master-SNAPSHOT"]]
"1.2" [[org.clojure/clojure "1.2.0"]]
"1.3" [[org.clojure/clojure "1.3.0"]]}
- :dev-dependencies [[criterium "0.3.0-SNAPSHOT"]
+ :dev-dependencies [[criterium "0.2.1-SNAPSHOT"]
[codox "0.4.1"]]
:test-selectors {:default #(not (some #{:benchmark :redis} (cons (:tag %) (keys %))))
:integration :redis
View
@@ -19,6 +19,9 @@
[gloss.core :as gloss])
(:import
[java.io
+ PipedInputStream
+ PipedOutputStream
+ IOException
InputStream
InputStreamReader
PrintWriter
@@ -100,48 +103,6 @@
(byte-buffers->channel-buffer (conj bufs (ByteBuffer/wrap ary 0 offset)))
(recur ary (+ offset byte-count) bufs)))))))))
-(defn- input-stream->channel-
- [ch ^InputStream stream chunk-size]
- (let [buffer? (and chunk-size (pos? chunk-size))
- chunk-size (if buffer? chunk-size 1024)
- create-array (if buffer?
- #(byte-array chunk-size)
- #(byte-array
- (if (pos? (.available stream))
- (.available stream)
- 1024)))]
- (loop [ary ^bytes (create-array), offset 0]
- (let [ary-len (count ary)]
- (if (= ary-len offset)
- (do
- (enqueue ch (ByteBuffer/wrap ary))
- (recur (create-array) 0))
- (let [byte-count (.read stream ary offset (- ary-len offset))]
- (if (neg? byte-count)
- (if (zero? offset)
- (close ch)
- (enqueue-and-close ch (ByteBuffer/wrap ary 0 offset)))
- (recur ary (+ offset byte-count)))))))))
-
-(defn input-stream->channel
- "Converts an InputStream to a channel that emits bytes. Spawns a separate thread to read
- from the stream.
-
- If 'chunk-size' is specified, the channel will only emit byte sequences of the specified
- length, unless the stream is closed before a full chunk can be accumulated. If it is nil
- or non-positive, bytes will be sent through the channel as soon as they enter the InputStream."
- ([stream]
- (input-stream->channel stream nil))
- ([stream chunk-size]
- (if-not stream
- (closed-channel)
- (let [ch (channel)]
- (doto
- (Thread. (fn [] (input-stream->channel- ch stream chunk-size)))
- (.setName "InputStream reader")
- .start)
- ch))))
-
(defn- string->channel-buffer
([s]
(string->channel-buffer s "UTF-8"))
@@ -363,13 +324,14 @@
reverse))
(defn url-decode
- "Takes a URL-encoded string, and returns a standard representation of the string. By default, 'charset' is UTF-8.
+ "Takes a URL-encoded string, and returns a standard representation of the string.
+ By default, 'charset' is UTF-8.
- 'options' may contain a :url-decodings hash, which maps encoded strings onto how they should be decoded. For instance,
- the Java decoder doesn't recognize %99 as a trademark symbol, even though it's sometimes encoded that way. To allow
- for this encoding, you can simply use:
+ 'options' may contain a :url-decodings hash, which maps encoded strings onto how they should
+ be decoded. For instance, the Java decoder doesn't recognize %99 as a trademark symbol, even
+ though it's sometimes encoded that way. To allow for this encoding, you can simply use:
- (url-decode s \"utf-8\" {:url-decodings {\"%99\" \"\\u2122\"}})"
+ (url-decode s \"utf-8\" {:url-decodings {\"%99\" \"\\u2122\"}})"
([s]
(url-decode s "utf-8"))
([s charset]
@@ -384,7 +346,8 @@
(defn url-encode
"URL-encodes a string. By default 'charset' is UTF-8.
- 'options' may contain a :url-encodings hash, which can specify custom encodings for certain characters or substrings."
+ 'options' may contain a :url-encodings hash, which can specify custom encodings for certain characters
+ or substrings."
([s]
(url-encode s "utf-8"))
([s charset]
@@ -398,6 +361,78 @@
;;;
+(defn- input-stream->channel-
+ [ch ^InputStream stream chunk-size]
+ (let [buffer? (and chunk-size (pos? chunk-size))
+ chunk-size (if buffer? chunk-size 1024)
+ create-array (if buffer?
+ #(byte-array chunk-size)
+ #(byte-array
+ (if (pos? (.available stream))
+ (.available stream)
+ 1024)))]
+ (loop [ary ^bytes (create-array), offset 0]
+ (let [ary-len (count ary)]
+
+ (if (= ary-len offset)
+
+ ;; if we've filled the array, enqueue it and restart
+ (do
+ (enqueue ch (ByteBuffer/wrap ary))
+ (recur (create-array) 0))
+
+ ;; read in as many bytes as we can
+ (let [byte-count (try
+ (.read stream ary offset (- ary-len offset))
+ (catch IOException e
+ -1))]
+
+ (if (neg? byte-count)
+
+ ;; if we've reached the end, enqueue the last bytes and close
+ (if (zero? offset)
+ (close ch)
+ (enqueue-and-close ch (ByteBuffer/wrap ary 0 offset)))
+
+ (recur ary (+ offset byte-count)))))))))
+
+(defn input-stream->channel
+ "Converts an InputStream to a channel that emits bytes. Spawns a separate thread to read
+ from the stream.
+
+ If 'chunk-size' is specified, the channel will only emit byte sequences of the specified
+ length, unless the stream is closed before a full chunk can be accumulated. If it is nil
+ or non-positive, bytes will be sent through the channel as soon as they enter the InputStream."
+ ([stream]
+ (input-stream->channel stream nil))
+ ([stream chunk-size]
+ (if-not stream
+ (closed-channel)
+ (let [ch (channel)]
+ (doto
+ (Thread. (fn [] (input-stream->channel- ch stream chunk-size)))
+ (.setName "InputStream reader")
+ .start)
+ ch))))
+
+(defn channel->input-stream
+ "Consumes messages from a channel that emits bytes, and feeds them into an InputStream.
+
+ This does not create any threads. If a blocking read on the InputStream is performed on
+ a thread which is responsible for feeding messages into the channel, this will cause a
+ deadlock."
+ ([ch]
+ (channel->input-stream ch "utf-8"))
+ ([ch charset]
+ (let [in (PipedInputStream.)
+ out (PipedOutputStream. in)
+ bytes (map* #(bytes->byte-array % charset) ch)]
+ (receive-all bytes #(.write out %))
+ (on-drained bytes #(.close out))
+ in)))
+
+;;;
+
(defn options->frame [{:keys [frame delimiters strip-delimiters?]
:or {strip-delimiters? true}}]
(cond
View
@@ -13,7 +13,7 @@
[aleph.formats :as formats]
[aleph.http.options :as options])
(:use
- [lamina.core]
+ [lamina core executor]
[potemkin]))
(import-fn ws/websocket-client)
@@ -36,30 +36,17 @@
(run-pipeline request
{:error-handler (fn [ex] (error ch ex))}
- ;; transform body
- (if (options/streaming-ring-requests?)
- identity
- (fn [{:keys [body character-encoding] :as request}]
-
- ;; if it's a streaming request, wait for it to complete
- (if (channel? body)
- (run-pipeline nil
- {:error-handler (fn [_])}
- (fn [_]
- (if (channel? body)
- (reduce* conj [] body)
- body))
- #(assoc request
- :body
- (formats/bytes->input-stream % character-encoding)))
+ ;; call into handler
+ (fn [{:keys [body character-encoding] :as request}]
+ (if (and (channel? body) (not (options/channel-ring-requests?)))
- ;; if it's not a streaming request, just turn it into an input stream
- (update-in request [:body]
- formats/bytes->input-stream character-encoding))))
+ ;; spawn off a new thread, since there will be blocking reads
+ (task "input-stream-reader"
+ (f (assoc request
+ ::channel ch
+ :body (formats/channel->input-stream body character-encoding))))
- ;; call into handler
- (fn [request]
- (f (assoc request ::channel ch)))
+ (f (assoc request ::channel ch))))
;; send response
(fn [response]
@@ -27,11 +27,11 @@
([options]
(or (:charset options) "utf-8")))
-(defn streaming-ring-requests?
+(defn channel-ring-requests?
([]
- (streaming-ring-requests? (current-options)))
+ (channel-ring-requests? (current-options)))
([options]
- (boolean (:streaming-ring-requests? options))))
+ (boolean (:channel-ring-requests? options))))
(defn websocket?
([]
@@ -145,7 +145,7 @@
(deftoken string #"[a-zA-Z0-9\-\*_]")
(deftoken whitespace #"[ \t,]*")
(deftoken empty-token #"")
-(def colon (token #"[ \t]*:[ \t]*"))
+(deftoken colon #"[ \t]*:[ \t]*")
(def relationship
(chain
View
@@ -70,10 +70,6 @@
(catch Exception e
)))})
-(defn print-vals [& args]
- (apply prn args)
- (last args))
-
(defn basic-handler [ch request]
(when-let [handler (route-map (:uri request))]
(enqueue ch (handler request))))
@@ -96,6 +92,11 @@
:content-type (:content-type request)
:body (:body request)}))
+(defn ring-streaming-request-handler [request]
+ {:status 200
+ :content-type (:content-type request)
+ :body (:body request)})
+
(defn json-response-handler [ch request]
(enqueue ch
{:status 200
@@ -152,11 +153,11 @@
:probes {:error (sink (fn [& _#]))}})
~@body))
-(defmacro with-handlers [aleph-handler ring-handler & body]
+(defmacro with-handlers [[aleph-handler ring-handler] & body]
`(do
(with-handler ~aleph-handler
~@body)
- #_(with-handler (wrap-ring-handler ~ring-handler)
+ (with-handler (wrap-ring-handler ~ring-handler)
~@body)))
(defn is-closed? [handler & requests]
@@ -169,7 +170,7 @@
(is (closed? connection)))))
(defn test-handler-response [expected aleph-handler ring-handler]
- (with-handlers aleph-handler ring-handler
+ (with-handlers [aleph-handler ring-handler]
(is (= expected (:status (sync-http-request {:method :get, :url "http://localhost:8080"} 1000))))
(is (= expected (:status (sync-http-request {:method :get, :url "http://localhost:8080", :keep-alive? true} 1000))))))
@@ -203,7 +204,7 @@
(close-connection client))))
(deftest test-streaming-response
- (with-handler streaming-request-handler
+ (with-handlers [streaming-request-handler ring-streaming-request-handler]
(let [content "abcdefghi"
client (default-http-client)]
(try
@@ -214,7 +215,7 @@
:auto-transform true
:headers {"content-type" "text/plain"}
:body (apply closed-channel (map str content))})
- 1000)]
+ 2000)]
(is (= content (:body response)))))
(finally
(close-connection client))))))

0 comments on commit ab97dd8

Please sign in to comment.