Skip to content

Commit

Permalink
Some restructuring
Browse files Browse the repository at this point in the history
  • Loading branch information
ragnard committed Jun 22, 2010
1 parent 4b07440 commit 66039be
Show file tree
Hide file tree
Showing 8 changed files with 179 additions and 150 deletions.
121 changes: 31 additions & 90 deletions src/main/clojure/redis.clj
@@ -1,107 +1,49 @@
(ns redis
(:refer-clojure :exclude [get keys set send sort read read-line type])
(:use [redis channels connection protocol]))
(:use [redis defcommand channel connection protocol pipeline]))

;;;; Global vars

(def *pool* (make-non-pooled-connection-pool))
(def *channel* (make-debug-channel))
(def #^{:doc "Bound to an implementation of RedisConnectionPool"}
*pool*
(make-non-pooled-connection-pool))

(defmacro with-server [server-spec & body]
`(let [connection# (get-connection *pool* ~server-spec)]
(def #^{:doc "Bound to an implementation of RedisChannel"}
*channel* nil)

;;;; Macros

(defmacro with-server
"Evaluates body in the context of a connection to Redis server
specified by server-spec.
server-spec is a map with any of the following keys:
:host (\"127.0.0.1\")
:port (6379)
:db (0)
:timeout (5000)
:password (nil)"
[server-spec & body]
`(let [connection# (get-connection *pool* ~server-spec)]
(try
(binding [*channel* (make-direct-channel connection#)]
~@body)
(catch Exception e#
(release-connection *pool* connection# e#))
(finally
(release-connection *pool* connection#)))))

(defmacro pipeline [& body]
(defmacro pipeline
"Evaluate body, pipelining any Redis commands. Commands in body will
return nil, and pipeline will return a vector of replies."
[& body]
`(binding [*channel* (make-pipelined-channel *channel*)]
~@body
(send-pipelined-commands *channel*)))

(defn- upcase [#^String s] (.toUpperCase s))

(defn default-key-fn [args]
"Return a vector of keys in args.
This default implementation detects keys in three ways:
- The argument is named key
- The argument is a vector named keys
- The argument is a vector named keyvals"
(loop [args args
keys []]
(let [[first & rest] args]
(if (nil? first)
keys
(if (= 'key first)
(recur rest (conj keys identity))
(recur rest (conj keys nil)))))))

(defn get-key-values [key-fns args]
(vec (filter #(not (nil? %))
(map #(when (not (nil? %1))
(%1 %2)) key-fns args))))

(def *default-opts* {:type :multi-bulk
:reply-fn identity
:key-fn default-key-fn})

(def *command-types* {:inline make-inline-command
:multi-bulk make-multi-bulk-command})

;;;
;;;
;;;

(defn parse-opts+body [opts+body]
(loop [opts *default-opts*
args opts+body]
(let [[v & rest] args]
(cond
(nil? v) [opts nil]
(list? v) [opts v]
(or (var? v)
(symbol? v)
(fn? v)) (recur (assoc opts :reply-fn v) rest)
(keyword? v) (condp = v
:inline (recur (assoc opts :type v) rest)
:multi-bulk (recur (assoc opts :type v) rest))))))

(defn flatten-args [args]
(let [[args rest] (split-with #(not= % '&) args)]
[args (last rest)]))



(defmacro defcommand
([name args & opts+body]
(let [[opts body] (parse-opts+body opts+body)
{:keys [type reply-fn key-fn]} opts
command-name (upcase (str name))
command-fn (type *command-types*)
[command-args command-rest-args] (flatten-args args)
args-without-and (vec (filter #(not= '& %) args))
key-fns (key-fn args-without-and)]
(if body
`(defn ~name ~args
(let [command# ~body]
(send *channel* command#)))
`(defn ~name ~args
(let [command# (apply ~command-fn
~command-name
~@command-args
~command-rest-args)
keys# (get-key-values ~key-fns ~args-without-and)]
(~reply-fn (send *channel*
(with-meta command#
{:redis-keys (vec keys#)})))))))))

(defmacro defcommands [& command-defs]
`(do ~@(map (fn [command-def]
`(defcommand ~@command-def)) command-defs)))


;;;; Command definitions

;;; Utility conversion functions
(defn int-to-bool [n] (< 0 n))
(defn string-to-keyword [s] (keyword s))
(defn string-to-double [s] (when s (Double/parseDouble s)))
Expand Down Expand Up @@ -204,6 +146,7 @@
(hgetall [key] seq-to-map)
)

;;; Sort command

(defn- parse-sort-args [args]
(loop [bulks []
Expand All @@ -226,8 +169,6 @@
:desc (recur (conj bulks "DESC") args)
(throw (Exception. (str "Error parsing arguments to SORT command: Unknown argument: " type))))))))



(defcommand sort [key & args]
(with-meta
(apply make-multi-bulk-command "SORT" key (parse-sort-args args))
Expand Down
@@ -1,4 +1,4 @@
(ns redis.channels
(ns redis.channel
(:refer-clojure :exclude [send read read-line])
(:use [redis protocol connection])
(:import [java.io ByteArrayOutputStream]))
Expand All @@ -8,63 +8,30 @@
"A RedisChannel supports sending commands"
(send [channel #^redis.protocol.RedisCommand command]))

;;; Debug channel
(defrecord DebugChannel []
RedisChannel
(send [this command]
(let [buf (ByteArrayOutputStream.)]
(write-to-buffer command buf)
(println "Sending command")
(println (meta command))
(println (String. (.toByteArray buf) "ASCII"))
nil)))

(defn make-debug-channel []
(DebugChannel.))

;;; Direct channel
(defrecord DirectChannel [#^redis.connection.RedisConnection connection]
RedisChannel
(send [this command]
(let [buf (ByteArrayOutputStream.)
out (output-stream connection)
in (input-stream connection)
keys (:redis-keys (meta command))]
in (input-stream connection)]
(write-to-buffer command buf)
(write-to-stream buf out)
(read-reply in))))

(defn make-direct-channel [connection]
(DirectChannel. connection))

;;; Pipelining
(defrecord PipelinedChannel [channel commands]
;;; Debug channel
(defrecord DebugChannel []
RedisChannel
(send [this command]
(swap! commands conj command)
nil))

(defn make-pipelined-channel [channel]
(PipelinedChannel. channel (atom [])))

(defn send-pipelined-commands [pipeline]
(do
(let [buf (ByteArrayOutputStream.)
channel (:channel pipeline)
connection (:connection channel)
commands @(:commands pipeline)
ncommands (count commands)
out (output-stream connection)
in (input-stream connection)]
(dorun
(map #(write-to-buffer % buf) commands))
(write-to-stream buf out)
(let [reply (transient [])]
(dotimes [i ncommands]
(try
(conj! reply (read-reply in))
(catch Exception e
(conj! reply e))))
(persistent! reply)))))

(let [buf (ByteArrayOutputStream.)]
(write-to-buffer command buf)
(println "Sending command")
(println (meta command))
(println (String. (.toByteArray buf) "ASCII"))
nil)))

(defn make-debug-channel []
(DebugChannel.))
11 changes: 6 additions & 5 deletions src/main/clojure/redis/connection.clj
Expand Up @@ -3,24 +3,23 @@
(:import [java.net Socket]
[java.io BufferedInputStream]))

;;; Protocols
;;;; Protocols
(defprotocol RedisConnection
(close [connection])
(input-stream [connection])
(output-stream [connection]))

(defprotocol RedisConnectionPool
(get-connection [pool connection-spec])
(release-connection [pool connection]))

(release-connection [pool connection] [pool connection exception]))

;;;; Implementations
(extend-type Socket
RedisConnection
(close [this] (.close this))
(input-stream [this] (BufferedInputStream. (.getInputStream this)))
(output-stream [this] (.getOutputStream this)))


(def default-connection-spec {:host "127.0.0.1"
:port 6379
:timeout 5000
Expand All @@ -41,7 +40,9 @@
(get-connection [this connection-spec]
(make-connection connection-spec))
(release-connection [this connection]
(close connection)))
(close connection))
(release-connection [this connection exception]
(close connection)))

(defn make-non-pooled-connection-pool []
(NonPooledConnectionPool.))
Expand Down
82 changes: 82 additions & 0 deletions src/main/clojure/redis/defcommand.clj
@@ -0,0 +1,82 @@
(ns redis.defcommand
(:require [redis protocol channel]))

;;;; Command definition macros

(defn get-key-fns [args]
"Given a params vector, return a list of functions that knows how
to extract keys from the given args.
This implementation detects keys in three ways:
- The argument is named key
- The argument is a vector named keys
- The argument is a vector named keyvals"
(loop [key-fns []
[first & rest] args]
(if (nil? first)
key-fns
(condp = first
'key (recur (conj key-fns identity) rest)
'keys (recur (conj key-fns identity) rest)
'keyvals (recur (conj key-fns #(take-nth 2 %)) rest)
(recur (conj key-fns nil) rest)))))

(defn extract-keys [key-fns args]
(vec (flatten (filter #(not (nil? %))
(map #(when (not (nil? %1))
(%1 %2)) key-fns args)))))

(def *default-opts* {:type :multi-bulk
:reply-fn identity
:key-fn get-key-fns})

(def *command-types* {:inline redis.protocol/make-inline-command
:multi-bulk redis.protocol/make-multi-bulk-command})


(defn parse-opts+body [opts+body]
(loop [opts *default-opts*
args opts+body]
(let [[v & rest] args]
(cond
(nil? v) [opts nil]
(list? v) [opts v]
(or (var? v)
(symbol? v)
(fn? v)) (recur (assoc opts :reply-fn v) rest)
(keyword? v) (condp = v
:inline (recur (assoc opts :type v) rest)
:multi-bulk (recur (assoc opts :type v) rest))))))

(defn flatten-args [args]
(let [[args rest] (split-with #(not= % '&) args)]
[args (last rest)]))

(defn- upcase [#^String s] (.toUpperCase s))

(defmacro defcommand
([name args & opts+body]
(let [[opts body] (parse-opts+body opts+body)
{:keys [type reply-fn key-fn]} opts
command-name (upcase (str name))
command-fn (type *command-types*)
[command-args command-rest-args] (flatten-args args)
args-without-and (vec (filter #(not= '& %) args))
key-fns (key-fn args-without-and)]
(if body
`(defn ~name ~args
(let [command# ~body]
(redis.channel/send redis/*channel* command#)))
`(defn ~name ~args
(let [command# (apply ~command-fn
~command-name
~@command-args
~command-rest-args)
keys# (extract-keys ~key-fns ~args-without-and)]
(~reply-fn (redis.channel/send redis/*channel*
(with-meta command#
{:redis-keys (vec keys#)})))))))))

(defmacro defcommands [& command-defs]
`(do ~@(map (fn [command-def]
`(defcommand ~@command-def)) command-defs)))
38 changes: 38 additions & 0 deletions src/main/clojure/redis/pipeline.clj
@@ -0,0 +1,38 @@
(ns redis.pipeline
(:refer-clojure :exclude [send read read-line])
(:use [redis channel connection protocol])
(:import [java.io ByteArrayOutputStream]
[java.net SocketTimeoutException]))


(defrecord PipelinedChannel [channel commands]
RedisChannel
(send [this command]
(swap! commands conj command)
nil))

(defn make-pipelined-channel [channel]
(PipelinedChannel. channel (atom [])))

(defn send-pipelined-commands [pipeline]
(let [buf (ByteArrayOutputStream.)
channel (:channel pipeline)
connection (:connection channel)
commands @(:commands pipeline)
ncommands (count commands)
out (output-stream connection)
in (input-stream connection)]
(dorun
(map #(write-to-buffer % buf) commands))
(write-to-stream buf out)
(let [reply (transient [])]
(dotimes [i ncommands]
(try
(conj! reply (read-reply in))
(catch SocketTimeoutException e
(throw e))
(catch Exception e
(conj! reply e))))
(persistent! reply))))


0 comments on commit 66039be

Please sign in to comment.