File tree Expand file tree Collapse file tree 2 files changed +52
-0
lines changed
clojure/src/rabbitmq/tutorials Expand file tree Collapse file tree 2 files changed +52
-0
lines changed Original file line number Diff line number Diff line change 1+ (ns rabbitmq.tutorials.new-task
2+ (:require [langohr.core :as lc]
3+ [langohr.channel :as lch]
4+ [langohr.queue :as lq]
5+ [langohr.basic :as lb]
6+ [clojure.string :as s]))
7+
8+
9+ (defn -main
10+ [& args]
11+ (with-open [conn (lc/connect )]
12+ (let [ch (lch/open conn)
13+ payload (if (empty? args)
14+ " Hello, world!"
15+ (s/join " " args))]
16+ (lq/declare ch " task_queue" :durable true :auto-delete false )
17+ (lb/publish ch " " " task_queue" payload :persistent true )
18+ (println (format " [x] Sent %s" payload)))))
Original file line number Diff line number Diff line change 1+ (ns rabbitmq.tutorials.worker
2+ (:require [langohr.core :as lc]
3+ [langohr.channel :as lch]
4+ [langohr.queue :as lq]
5+ [langohr.basic :as lb]
6+ [langohr.consumers :as lcons]
7+ [clojure.string :as s]))
8+
9+ (def ^{:const true } q " task_queue" )
10+
11+ (defn ^:private occurences-of
12+ [^String s ^Character c]
13+ (let [chars (map identity s)]
14+ (count (filter (fn [x] (= x c)) chars))))
15+
16+ (defn handle-delivery
17+ " Handles message delivery"
18+ [ch {:keys [delivery-tag]} payload]
19+ (let [s (String. payload " UTF-8" )
20+ n (occurences-of s \.)]
21+ (println (format " [x] Received %s" s))
22+ (Thread/sleep ^double (* 1000 n))
23+ (println " [x] Done" )
24+ (lb/ack ch delivery-tag)))
25+
26+
27+ (defn -main
28+ [& args]
29+ (with-open [conn (lc/connect )]
30+ (let [ch (lch/open conn)]
31+ (lq/declare ch q :durable true :auto-delete false )
32+ (lb/qos ch 1 )
33+ (println " [*] Waiting for messages. To exit press CTRL+C" )
34+ (lcons/blocking-subscribe ch q handle-delivery))))
You can’t perform that action at this time.
0 commit comments