Permalink
Browse files

first commit

  • Loading branch information...
0 parents commit 30f04aa94e279c1eb4672fc1cb55b2767f975353 @ztellman ztellman committed Jul 7, 2010
Showing with 316 additions and 0 deletions.
  1. +4 −0 .gitignore
  2. +16 −0 README.textile
  3. +8 −0 project.clj
  4. +19 −0 src/aleph.clj
  5. BIN src/aleph/.DS_Store
  6. +65 −0 src/aleph/buffer.clj
  7. +61 −0 src/aleph/core.clj
  8. +137 −0 src/aleph/http.clj
  9. +6 −0 test/aleph/core_test.clj
@@ -0,0 +1,4 @@
+pom.xml
+*jar
+lib
+classes
@@ -0,0 +1,16 @@
+Aleph is an asynchronous web server, built on top of "Netty":http://www.jboss.org/netty.
+
+It conforms to the interface described by "Ring":http://github.com/mmcgrana/ring, with one small difference: the request and response are decoupled.
+
+<pre><code>(use 'aleph)
+
+(defn hello-world [request]
+ (respond! request
+ {:status 200
+ :headers {"Content-Type" "text/html"}
+ :body "Hello World!"}))
+
+(run-aleph hello-world {:port 8080})
+</code></pre>
+
+Notice that the response is an explicit step, using @(respond! request response)@. This gives a much greater degree of flexibility than the servlet model, and allows more straightforward use of Clojure's concurrency primitives.
@@ -0,0 +1,8 @@
+(defproject aleph "0.5.0"
+ :description ""
+ :repositories [["JBoss" "http://repository.jboss.org/maven2"]]
+ :dev-dependencies [[swank-clojure "1.2.1"]
+ [autodoc "0.7.1"]]
+ :dependencies [[org.clojure/clojure "1.2.0-master-SNAPSHOT"]
+ [org.clojure/clojure-contrib "1.2.0-SNAPSHOT"]
+ [org.jboss.netty/netty "3.2.0.BETA1"]])
@@ -0,0 +1,19 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns aleph
+ (:require
+ [aleph.http :as http]
+ [aleph.core :as core]))
+
+(defn run-aleph [handler options]
+ (let [port (:port options)]
+ (core/start-server port #(http/http-pipeline port handler))))
+
+(defn respond! [request msg]
+ ((:respond request) request (http/transform-response request msg)))
Binary file not shown.
@@ -0,0 +1,65 @@
+(ns aleph.buffer
+ (:use [clojure.contrib.def :only (defvar-)])
+ (:import [org.jboss.netty.buffer ChannelBuffer DirectChannelBufferFactory]))
+
+(defn- to-byte [b]
+ (byte
+ (if (instance? Boolean b)
+ (if b 1 0)
+ b)))
+
+(defvar- write-fns
+ {:float #(.writeFloat ^ChannelBuffer %1 %2)
+ :double #(.writeDouble ^ChannelBuffer %1 %2)
+ :int #(.writeInt ^ChannelBuffer %1 %2)
+ :long #(.writeLong ^ChannelBuffer %1 %2)
+ :short #(.writeShort ^ChannelBuffer %1 %2)
+ :char #(.writeChar ^ChannelBuffer %1 %2)
+ :byte #(.writeByte ^ChannelBuffer %1 (to-byte %2))})
+
+(defvar- read-fns
+ {:float #(.readFloat ^ChannelBuffer %)
+ :double #(.readDouble ^ChannelBuffer %)
+ :int #(.readInt ^ChannelBuffer %)
+ :long #(.readLong ^ChannelBuffer %)
+ :short #(.readShort ^ChannelBuffer %)
+ :char #(.readChar ^ChannelBuffer %)
+ :byte #(.readByte ^ChannelBuffer %)})
+
+(defvar- type-size
+ {:float 4
+ :double 8
+ :int 4
+ :long 8
+ :short 2
+ :char 2
+ :byte 1})
+
+(defn- wrap-sig [sig]
+ (if (sequential? sig) sig [sig]))
+
+(defn- num-bytes [sig]
+ (reduce + (map type-size sig)))
+
+(defn create-buffer [dim]
+ (let [buf (.getBuffer (DirectChannelBufferFactory/getInstance) dim)]
+ (.writerIndex buf dim)
+ buf))
+
+(defn- shift-read [^ChannelBuffer buf offset]
+ (let [buf (.duplicate buf)]
+ (.readerIndex buf (+ offset (.readerIndex buf)))
+ buf))
+
+(defn as-types [buf sig]
+ (let [sig (wrap-sig sig)
+ num-bytes (num-bytes sig)]
+ (when (>= (.readableBytes buf) num-bytes)
+ (lazy-seq
+ (cons
+ (let [buf (.duplicate buf)
+ res (map #((read-fns %1) %2) sig (repeat buf))]
+ (if (= 1 (count sig))
+ (first res)
+ res))
+ (as-types (shift-read buf num-bytes) sig))))))
@@ -0,0 +1,61 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns aleph.core
+ (:use [clojure.contrib.def :only (defvar-)])
+ (:import
+ [org.jboss.netty.channel
+ ChannelHandler ChannelUpstreamHandler ChannelDownstreamHandler
+ ChannelHandlerContext MessageEvent ChannelPipelineFactory Channels
+ ChannelPipeline]
+ [org.jboss.netty.channel.socket.nio
+ NioServerSocketChannelFactory]
+ [org.jboss.netty.bootstrap
+ ServerBootstrap]
+ [java.util.concurrent Executors]
+ [java.net InetSocketAddress]))
+
+(defn event-message [evt]
+ (when (instance? MessageEvent evt)
+ (.getMessage ^MessageEvent evt)))
+
+(defn event-origin [evt]
+ (when (instance? MessageEvent evt)
+ (.getRemoteAddress ^MessageEvent evt)))
+
+(defn upstream-stage [handler]
+ (reify ChannelUpstreamHandler
+ (handleUpstream [_ ctx evt]
+ (when-let [upstream-evt (handler evt)]
+ (.sendUpstream ctx upstream-evt)))))
+
+(defn downstream-stage [handler]
+ (reify ChannelDownstreamHandler
+ (handleDownstream [_ ctx evt]
+ (when-let [downstream-evt (handler evt)]
+ (.sendDownstream ctx downstream-evt)))))
+
+(defn create-pipeline [& stages]
+ (let [stages (partition 2 stages)
+ pipeline (Channels/pipeline)]
+ (doseq [[id stage] stages]
+ (.addLast pipeline (name id) stage))
+ pipeline))
+
+(defn start-server [port pipeline]
+ (let [server (ServerBootstrap.
+ (NioServerSocketChannelFactory.
+ (Executors/newCachedThreadPool)
+ (Executors/newCachedThreadPool)))]
+ (.setPipelineFactory server
+ (reify ChannelPipelineFactory
+ (getPipeline [_]
+ (if (fn? pipeline)
+ (pipeline)
+ pipeline))))
+ (.bind server (InetSocketAddress. port))))
@@ -0,0 +1,137 @@
+;; Copyright (c) Zachary Tellman. All rights reserved.
+;; The use and distribution terms for this software are covered by the
+;; Eclipse Public License 1.0 (http://opensource.org/licenses/eclipse-1.0.php)
+;; which can be found in the file epl-v10.html at the root of this distribution.
+;; By using this software in any fashion, you are agreeing to be bound by
+;; the terms of this license.
+;; You must not remove this notice, or any other, from this software.
+
+(ns aleph.http
+ (:use [aleph.core]
+ [clojure.pprint])
+ (:import
+ [org.jboss.netty.handler.codec.http
+ HttpRequest HttpMessage HttpMethod HttpHeaders HttpHeaders$Names
+ HttpChunk DefaultHttpChunk
+ DefaultHttpResponse HttpVersion HttpResponseStatus
+ HttpRequestDecoder HttpResponseEncoder HttpContentCompressor]
+ [org.jboss.netty.channel
+ Channel ChannelPipeline MessageEvent ChannelFutureListener Channels]
+ [org.jboss.netty.buffer
+ ChannelBuffer ChannelBuffers]
+ [java.util
+ Map$Entry]
+ [java.io
+ ByteArrayInputStream InputStream File FileInputStream]))
+
+(defn transform-headers [headers]
+ (->> headers
+ (map #(list (.getKey ^Map$Entry %) (.getValue ^Map$Entry %)))
+ flatten
+ (apply hash-map)))
+
+(defn request-method [^HttpRequest msg]
+ (->> msg .getMethod .getName .toLowerCase keyword))
+
+(defn channel-buffer->input-stream [^ChannelBuffer buf]
+ (let [buf (.duplicate buf)]
+ (proxy
+ [InputStream]
+ []
+ (read
+ ([_]
+ (if (pos? (.readableBytes buf))
+ (.readByte buf)
+ -1))
+ ([_ ary off len]
+ (.readBytes buf off len))))))
+
+(defn request-headers [^HttpMessage req]
+ {:headers (transform-headers (.getHeaders req))
+ :chunked? (.isChunked req)
+ :keep-alive? (HttpHeaders/isKeepAlive req)
+ :last-chunk (and (.isChunked req) (.isLast ^HttpChunk req))})
+
+(defn request-body [^HttpMessage req]
+ (let [content-length (HttpHeaders/getContentLength req)
+ has-content? (pos? content-length)]
+ (when has-content?
+ {:content-length content-length
+ :body (channel-buffer->input-stream (.getContent req))
+ })))
+
+(defn request-uri [^HttpMessage req]
+ {:uri (.getUri req)})
+
+(defn transform-request [^HttpRequest req]
+ (merge
+ (request-body req)
+ (request-headers req)
+ (request-uri req)))
+
+;;;
+
+(defn input-stream->channel-buffer [^InputStream stream]
+ (let [ary (make-array Byte/TYPE (.available stream))]
+ (.read stream ary)
+ (ChannelBuffers/wrappedBuffer ary)))
+
+(defn to-channel-buffer
+ ([body]
+ (to-channel-buffer body "UTF-8"))
+ ([body charset]
+ (cond
+ (string? body) (ChannelBuffers/copiedBuffer ^String body charset)
+ (sequential? body) (ChannelBuffers/copiedBuffer ^String (apply str body) charset)
+ (instance? File body) (input-stream->channel-buffer (FileInputStream. body))
+ (instance? InputStream body) (input-stream->channel-buffer body)
+ :else body)))
+
+(defn transform-response
+ [request response]
+ (let [msg (DefaultHttpResponse.
+ HttpVersion/HTTP_1_1
+ (HttpResponseStatus/valueOf (:status response)))
+ body (:body response)]
+ (doseq [[k v] (:headers response)]
+ (.addHeader msg k v))
+ (.setContent msg (to-channel-buffer body))
+ (when (:keep-alive? request)
+ (.addHeader msg HttpHeaders$Names/CONTENT_LENGTH (-> msg .getContent .readableBytes)))
+ msg))
+
+;;;
+
+(defn response-listener [req]
+ (if (HttpHeaders/isKeepAlive req)
+ (reify ChannelFutureListener
+ (operationComplete [_ future]
+ ))
+ ChannelFutureListener/CLOSE))
+
+(defn request-handler [handler]
+ (upstream-stage
+ (fn [evt]
+ (when-let [req ^HttpRequest (event-message evt)]
+ (handler
+ (merge
+ (transform-request req)
+ {:channel (.getChannel ^MessageEvent evt)
+ :respond (fn [this msg]
+ (.getPipeline (:channel this))
+ (-> (:channel this)
+ (Channels/write msg)
+ (.addListener (response-listener req))))}))))))
+
+(defn http-pipeline [port handler]
+ (create-pipeline
+ :decoder (HttpRequestDecoder.)
+ :encoder (HttpResponseEncoder.)
+ :deflater (HttpContentCompressor.)
+ :request (request-handler
+ #(handler (assoc %
+ :scheme :http
+ :port port)))))
+
+
+
@@ -0,0 +1,6 @@
+(ns aleph.core-test
+ (:use [aleph.core] :reload-all)
+ (:use [clojure.test]))
+
+(deftest replace-me ;; FIXME: write
+ (is false "No tests have been written."))

0 comments on commit 30f04aa

Please sign in to comment.