Permalink
Browse files

everything seems to be in working order

  • Loading branch information...
1 parent a442301 commit 87c06a1266d2d080389ed8caa786b80970cb062e @ztellman committed Dec 28, 2011
View
Binary file not shown.
View
@@ -4,9 +4,10 @@
:license {:name "Eclipse Public License - v 1.0"
:url "http://www.eclipse.org/legal/epl-v10.html"
:distribution :repo}
+ :main aloha.core
:dependencies [[org.clojure/clojure "1.3.0"]
- [commons-logging "1.1.1"]
[org.clojure/tools.logging "0.2.3"]
[org.jboss.netty/netty "3.2.7.Final"]
+ [clj-http "0.2.6"]
[potemkin "0.1.1-SNAPSHOT"]]
- :jvm-opts ["-server" "-XX:+UseConcMarkSweepGC"])
+ :jvm-opts ["-server" "-XX:+UseConcMarkSweepGC"])
View
@@ -1,13 +1,61 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
(ns aloha.core
- (:use [aloha netty]))
-
-(defn echo-pipeline [channel-group]
- (create-netty-pipeline "echo-server" channel-group
- :main-handler (upstream-stage
- (fn [evt]
- (when-let [msg (message-event evt)]
- (.write (.getChannel evt) msg))
- nil))))
-
-(defn start-echo-server []
- (start-server echo-pipeline {:port 10000}))
+ (:use [aloha netty requests responses])
+ (:import
+ [org.jboss.netty.channel
+ Channel]
+ [org.jboss.netty.handler.codec.http
+ HttpHeaders
+ HttpMessage
+ HttpRequest
+ HttpRequestDecoder
+ HttpResponseEncoder
+ HttpContentCompressor]))
+
+(defn http-request-handler [handler]
+ (let [current-request (atom nil)]
+ (message-handler
+ (fn [^HttpMessage msg ^Channel channel]
+ (if (instance? HttpRequest msg)
+ (let [request (transform-netty-request channel msg)]
+ ;;(reset! current-request request)
+ (let [response (handler request)
+ keep-alive? (HttpHeaders/isKeepAlive msg)]
+ (respond channel
+ (transform-response response keep-alive?)
+ (:body response)
+ (when-not keep-alive?
+ #(.close channel)))))
+ (throw (Exception. "Chunked requests are not currently supported.")))))))
+
+(defn http-pipeline [handler]
+ (fn [channel-group]
+ (create-netty-pipeline "http-server" channel-group
+ :deocder (HttpRequestDecoder.)
+ :encoder (HttpResponseEncoder.)
+ :deflater (HttpContentCompressor.)
+ :handler (http-request-handler handler))))
+
+(defn start-http-server [handler options]
+ (start-server
+ (http-pipeline handler)
+ options))
+
+(defn start-hello-world-server []
+ (start-http-server
+ (fn [request]
+ {:status 200
+ :headers {"content-type" "text/plain"}
+ :body "Aloha!"})
+ {:port 8080}))
+
+(defn -main [& args]
+ (start-hello-world-server)
+ (println "Running server on port 8080."))
View
@@ -1,11 +1,19 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
(ns aloha.netty
(:require
[clojure.tools.logging :as log])
(:import
[org.jboss.netty.channel
Channels
Channel
- ChannelHandler
+ ChannelFutureListener
ChannelUpstreamHandler
ChannelDownstreamHandler
ChannelPipelineFactory
@@ -24,18 +32,18 @@
[java.net
InetSocketAddress]))
-(def upstream-error-handler
+(defn upstream-error-handler [pipeline-name]
(reify ChannelUpstreamHandler
(handleUpstream [_ ctx evt]
(if (instance? ExceptionEvent evt)
- (log/error (.getCause ^ExceptionEvent evt) "Error in Netty pipeline.")
+ (log/error (.getCause ^ExceptionEvent evt) (str "error in " pipeline-name))
(.sendUpstream ctx evt)))))
-(def downstream-error-handler
+(defn downstream-error-handler [pipeline-name]
(reify ChannelDownstreamHandler
(handleDownstream [_ ctx evt]
(if (instance? ExceptionEvent evt)
- (log/error (.getCause ^ExceptionEvent evt) "Error in Netty pipeline.")
+ (log/error (.getCause ^ExceptionEvent evt) (str "error in " pipeline-name))
(.sendDownstream ctx evt)))))
(defn connection-handler [^ChannelGroup channel-group]
@@ -58,8 +66,8 @@
`(.addLast ~pipeline-sym ~(name stage-name) ~stage))
(partition 2 stages))
(.addFirst ~pipeline-sym "channel-group-handler" (connection-handler channel-group#))
- (.addLast ~pipeline-sym "outgoing-error" downstream-error-handler)
- (.addFirst ~pipeline-sym "incoming-error" upstream-error-handler)
+ (.addLast ~pipeline-sym "outgoing-error" (downstream-error-handler ~pipeline-name))
+ (.addFirst ~pipeline-sym "incoming-error" (upstream-error-handler ~pipeline-name))
~pipeline-sym)))
(defn create-pipeline-factory [channel-group pipeline-generator]
@@ -93,6 +101,22 @@
(when (instance? MessageEvent evt)
(.getMessage ^MessageEvent evt)))
+(defn ^ChannelUpstreamHandler message-handler
+ [handler]
+ (reify ChannelUpstreamHandler
+ (handleUpstream [_ ctx evt]
+ (if-let [msg (message-event evt)]
+ (handler msg (.getChannel evt))
+ (.sendUpstream ctx evt)))))
+
+(defn write-to-channel [^Channel ch msg on-complete]
+ (let [channel-future (.write ch msg)]
+ (when on-complete
+ (.addListener channel-future
+ (reify ChannelFutureListener
+ (operationComplete [_ future]
+ (on-complete)))))))
+
;;;
(def default-server-options
@@ -0,0 +1,68 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns aloha.requests
+ (:use
+ [potemkin]
+ [aloha netty utils])
+ (:require
+ [clojure.string :as str]
+ [clojure.tools.logging :as log])
+ (:import
+ [org.jboss.netty.channel
+ Channel]
+ [org.jboss.netty.handler.codec.http
+ HttpRequest]
+ [java.nio.channels
+ Pipe
+ Channels]
+ [org.jboss.netty.buffer
+ ChannelBufferInputStream]))
+
+;;;
+
+(def-custom-map LazyMap
+ :get
+ (fn [_ data _ key default-value]
+ `(if-not (contains? ~data ~key)
+ ~default-value
+ (let [val# (get ~data ~key)]
+ (if (delay? val#)
+ @val#
+ val#)))))
+
+(defn lazy-map [& {:as m}]
+ (LazyMap. m))
+
+(defn assoc-request-body [request ^HttpRequest netty-request]
+ (if-not (.isChunked netty-request)
+ (let [body (.getContent netty-request)]
+ (assoc request
+ :body (when-not (= 0 (.readableBytes body))
+ (ChannelBufferInputStream. body))))
+ (let [pipe (Pipe/open)]
+ (with-meta
+ (assoc request
+ :body (Channels/newInputStream (.source pipe)))
+ {::output-stream (Channels/newOutputStream (.sink pipe))}))))
+
+(defn transform-netty-request [^Channel channel ^HttpRequest netty-request]
+ (let [request (lazy-map
+ :scheme :http
+ :remote-addr (delay (channel-remote-host-address channel))
+ :server-name (delay (channel-local-host-address channel))
+ :server-port (delay (channel-local-port channel))
+ :request-method (delay (request-method netty-request))
+ :headers (delay (http-headers netty-request))
+ :content-type (delay (http-content-type netty-request))
+ :character-encoding (delay (http-character-encoding netty-request))
+ :uri (delay (request-uri netty-request))
+ :query-string (delay (request-query-string netty-request))
+ :content-length (delay (http-content-length netty-request)))]
+ (assoc-request-body request netty-request)))
+
@@ -0,0 +1,137 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns aloha.responses
+ (:use
+ [aloha netty utils])
+ (:require
+ [clojure.string :as str])
+ (:import
+ [java.nio
+ ByteBuffer]
+ [java.io
+ InputStream
+ File
+ RandomAccessFile]
+ [org.jboss.netty.buffer
+ ChannelBuffers]
+ [org.jboss.netty.handler.codec.http
+ HttpChunk
+ DefaultHttpChunk
+ HttpHeaders
+ DefaultHttpResponse
+ HttpResponse
+ HttpResponseStatus
+ HttpVersion]
+ [java.net
+ URLConnection]
+ [java.nio.channels
+ FileChannel
+ FileChannel$MapMode]))
+
+;;;
+
+(defn write-chunk [channel ^ByteBuffer chunk callback]
+ (write-to-channel channel
+ (if chunk
+ (DefaultHttpChunk. (ChannelBuffers/wrappedBuffer chunk))
+ (HttpChunk/LAST_CHUNK))
+ callback))
+
+(defn send-input-stream
+ [channel ^InputStream stream chunk-size callback]
+ (let [buffer? (and chunk-size (pos? chunk-size))
+ chunk-size (if buffer? chunk-size 1024)
+ create-array (if buffer?
+ #(byte-array chunk-size)
+ #(byte-array
+ (if (pos? (.available stream))
+ (.available stream)
+ 1024)))]
+ (loop [ary ^bytes (create-array), offset 0]
+ (let [ary-len (count ary)]
+ (if (= ary-len offset)
+ (do
+ (write-chunk channel (ByteBuffer/wrap ary) nil)
+ (recur (create-array) 0))
+ (let [byte-count (.read stream ary offset (- ary-len offset))]
+ (if (neg? byte-count)
+ (do
+ (.close stream)
+ (if (zero? offset)
+ (write-chunk channel nil callback)
+ (do
+ (write-chunk channel (ByteBuffer/wrap ary 0 offset) nil)
+ (write-chunk channel nil callback))))
+ (recur ary (+ offset byte-count)))))))))
+
+(defn respond-with-input-stream [channel ^HttpResponse response body callback]
+ (.setHeader response "Transfer-Encoding" "chunked")
+ (write-to-channel channel response nil)
+ (doto
+ (Thread. (fn [] (send-input-stream channel body 8192 callback)))
+ (.setName "InputStream reader")
+ .start))
+
+(defn respond-with-file [channel ^HttpResponse response ^File body callback]
+ (let [content-type (or (http-content-type response)
+ (let [content-type (or (URLConnection/guessContentTypeFromName (.getName body))
+ "application/octet-stream")]
+ (.setHeader response "Content-Type" content-type)
+ content-type))
+ fc (.getChannel (RandomAccessFile. body "r"))]
+ (.setContent response
+ (ChannelBuffers/wrappedBuffer (.map fc FileChannel$MapMode/READ_ONLY 0 (.size fc))))
+ (HttpHeaders/setContentLength response (-> response .getContent .readableBytes))
+ (write-to-channel channel response #(do (.close fc) (when callback (callback))))))
+
+(defn respond-with-string [channel ^HttpResponse response ^String body callback]
+ (let [encoding (or (http-character-encoding response)
+ (do
+ (.setHeader response "Content-Type"
+ (str (.getHeader response "Content-Type") "; charset=utf-8"))
+ "utf-8"))]
+ (.setContent response (-> body (.getBytes encoding) ChannelBuffers/wrappedBuffer))
+ (HttpHeaders/setContentLength response (-> response .getContent .readableBytes))
+ (write-to-channel channel response callback)))
+
+(defn respond [channel response body callback]
+ (cond
+ (= nil body)
+ (write-to-channel channel response callback)
+
+ (instance? String body)
+ (respond-with-string channel response body callback)
+
+ (instance? File body)
+ (respond-with-file channel response body callback)
+
+ (sequential? body)
+ (respond-with-string channel response (apply str (map str body)) callback)
+
+ (instance? InputStream body)
+ (respond-with-input-stream channel response body callback)))
+
+;;;
+
+(defn format-header-key
+ "content-length -> Content-Length"
+ [s]
+ (->> (str/split (name s) #"-")
+ (map str/capitalize)
+ (str/join "-")))
+
+(defn transform-response [rsp keep-alive?]
+ (let [response (DefaultHttpResponse.
+ (HttpVersion/HTTP_1_1)
+ (HttpResponseStatus/valueOf (:status rsp)))]
+ (doseq [[k v] (:headers rsp)]
+ (.setHeader response (format-header-key k) v))
+ (.setHeader response "Content-Length" 0)
+ (.setHeader response "Connection" (if keep-alive? "keep-alive" "close"))
+ response))
Oops, something went wrong. Retry.

0 comments on commit 87c06a1

Please sign in to comment.