Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

add aleph.redis/task-receiver-channel

  • Loading branch information...
commit 49116f864586fa7ce68837b9537eaa144e3393fe 1 parent 1681caa
Max Penet authored

Showing 2 changed files with 33 additions and 8 deletions. Show diff stats Hide diff stats

  1. +19 7 src/aleph/redis.clj
  2. +14 1 test/aleph/test/redis.clj
26 src/aleph/redis.clj
@@ -55,7 +55,7 @@
55 55 (run-pipeline result
56 56 {:error-handler (fn [_] )}
57 57 (fn [_] (reset! database (-> args first second)))))
58   - result)))))
  58 + result)))))
59 59
60 60 (defn enqueue-task
61 61 "Enqueues a task onto a Redis queue. 'task' must be a printable Clojure data structure."
@@ -72,6 +72,24 @@
72 72 (redis-client (concat ["brpop"] queue-names [0]))
73 73 #(hash-map :queue (first %) :task (read-string (second %)))))
74 74
  75 +(defn task-receiver-channel
  76 + "Returns a channel that will receive tasks on the specified queue(s).
  77 + If the channels is closed this will also close the client
  78 + connection to prevent an eventual pending command to cause the loss
  79 + of a message"
  80 + [redis-client & queue-names]
  81 + (let [ch (channel)]
  82 + (run-pipeline
  83 + nil
  84 + (fn [_]
  85 + (apply receive-task redis-client queue-names))
  86 + #(enqueue ch %)
  87 + (fn [_]
  88 + (when-not (closed? ch)
  89 + (restart nil))))
  90 + (on-closed ch #(close-connection redis-client))
  91 + ch))
  92 +
75 93 (defn- filter-messages [ch]
76 94 (->> ch
77 95 (filter*
@@ -150,9 +168,3 @@
150 168 to the PUNSUBSCRIBE command."
151 169 [redis-stream & stream-patterns]
152 170 (enqueue redis-stream (list* "punsubscribe" stream-patterns)))
153   -
154   -
155   -
156   -
157   -
158   -
15 test/aleph/test/redis.clj
@@ -13,10 +13,23 @@
13 13 [aleph redis]
14 14 [aleph.test utils]))
15 15
  16 +(def config {:host "localhost"})
  17 +
16 18 (deftest ^:benchmark test-redis-roundtrip
17   - (let [r (redis-client {:host "localhost"})]
  19 + (let [r (redis-client config)]
18 20 (bench "simple roundtrip"
19 21 @(r [:ping]))
20 22 (bench "1e3 roundtrips"
21 23 @(apply merge-results (repeatedly 1e3 #(r [:ping]))))
22 24 (close-connection r)))
  25 +
  26 +(deftest test-task-channels
  27 + (let [c1 (redis-client config)
  28 + c2 (redis-client config)
  29 + _ @(c1 [:del :q])
  30 + emitter-channel (sink (partial enqueue-task c1 :q))
  31 + receiver-channel (task-receiver-channel c2 :q)
  32 + task {:foo "bar"}]
  33 + (enqueue emitter-channel task)
  34 + (is (= {:queue "q" :task task}
  35 + @(read-channel receiver-channel)))))

0 comments on commit 49116f8

Please sign in to comment.
Something went wrong with that request. Please try again.