Skip to content

Task channels #62

Merged
merged 1 commit into from Apr 29, 2012
View
26 src/aleph/redis.clj
@@ -55,7 +55,7 @@
(run-pipeline result
{:error-handler (fn [_] )}
(fn [_] (reset! database (-> args first second)))))
- result)))))
+ result)))))
(defn enqueue-task
"Enqueues a task onto a Redis queue. 'task' must be a printable Clojure data structure."
@@ -72,6 +72,24 @@
(redis-client (concat ["brpop"] queue-names [0]))
#(hash-map :queue (first %) :task (read-string (second %)))))
+(defn task-receiver-channel
+ "Returns a channel that will receive tasks on the specified queue(s).
+ If the channels is closed this will also close the client
+ connection to prevent an eventual pending command to cause the loss
+ of a message"
+ [redis-client & queue-names]
+ (let [ch (channel)]
+ (run-pipeline
+ nil
+ (fn [_]
+ (apply receive-task redis-client queue-names))
+ #(enqueue ch %)
+ (fn [_]
+ (when-not (closed? ch)
+ (restart nil))))
+ (on-closed ch #(close-connection redis-client))
+ ch))
+
(defn- filter-messages [ch]
(->> ch
(filter*
@@ -150,9 +168,3 @@
to the PUNSUBSCRIBE command."
[redis-stream & stream-patterns]
(enqueue redis-stream (list* "punsubscribe" stream-patterns)))
-
-
-
-
-
-
View
15 test/aleph/test/redis.clj
@@ -13,10 +13,23 @@
[aleph redis]
[aleph.test utils]))
+(def config {:host "localhost"})
+
(deftest ^:benchmark test-redis-roundtrip
- (let [r (redis-client {:host "localhost"})]
+ (let [r (redis-client config)]
(bench "simple roundtrip"
@(r [:ping]))
(bench "1e3 roundtrips"
@(apply merge-results (repeatedly 1e3 #(r [:ping]))))
(close-connection r)))
+
+(deftest test-task-channels
+ (let [c1 (redis-client config)
+ c2 (redis-client config)
+ _ @(c1 [:del :q])
+ emitter-channel (sink (partial enqueue-task c1 :q))
+ receiver-channel (task-receiver-channel c2 :q)
+ task {:foo "bar"}]
+ (enqueue emitter-channel task)
+ (is (= {:queue "q" :task task}
+ @(read-channel receiver-channel)))))
Something went wrong with that request. Please try again.