File tree Expand file tree Collapse file tree 3 files changed +82
-1
lines changed
Expand file tree Collapse file tree 3 files changed +82
-1
lines changed Original file line number Diff line number Diff line change @@ -40,6 +40,7 @@ Code examples are executed via `lein run`:
4040
4141[ Tutorial six: RPC] ( http://www.rabbitmq.com/tutorial-six-java.html )
4242
43- TBD
43+ lein run -m rabbitmq.tutorials.rpc-server
44+ lein run -m rabbitmq.tutorials.rpc-client
4445
4546To learn more, visit [ Langohr documentation] ( http://clojurerabbitmq.info ) site.
Original file line number Diff line number Diff line change 1+ ; ; note: this is example code and shouldn't be run in production as-is
2+
3+ (ns rabbitmq.tutorials.rpc-client
4+ (:require [langohr.core :as lc]
5+ [langohr.channel :as lch]
6+ [langohr.queue :as lq]
7+ [langohr.basic :as lb]
8+ [langohr.consumers :as lcons]))
9+
10+ (def ^{:const true } q " rpc_queue" )
11+
12+ (defn correlation-id-equals?
13+ [correlation-id d]
14+ (= (.getCorrelationId (.getProperties d)) correlation-id))
15+
16+ (defrecord FibonacciClient [conn ch cbq consumer]
17+ clojure.lang.IFn
18+ (invoke [this n]
19+ (let [correlation-id (str (java.util.UUID/randomUUID ))]
20+ (lb/publish ch " " q (str n) {:reply-to cbq
21+ :correlation-id correlation-id})
22+ (lb/consume ch cbq consumer)
23+ (-> (first (filter (partial correlation-id-equals? correlation-id)
24+ (lcons/deliveries-seq consumer)))
25+ .getBody
26+ (String. " UTF-8" )
27+ (read-string ))))
28+ java.io.Closeable
29+ (close [this]
30+ (.close conn)))
31+
32+ (defn make-fibonacci-rpc-client
33+ []
34+ (let [conn (lc/connect )
35+ ch (lch/open conn)
36+ cbq (lq/declare ch " " {:auto-delete false :exclusive true })
37+ consumer (lcons/create-queueing ch {})]
38+ (->FibonacciClient conn ch (:queue cbq) consumer)))
39+
40+ (defn -main
41+ [& args]
42+ (with-open [fibonacci-rpc (make-fibonacci-rpc-client )]
43+ (println " [x] Requesting fib(30)" )
44+ (let [response (fibonacci-rpc 30 )]
45+ (println (format " [.] Got %s" response)))))
Original file line number Diff line number Diff line change 1+ (ns rabbitmq.tutorials.rpc-server
2+ (:require [langohr.core :as lc]
3+ [langohr.channel :as lch]
4+ [langohr.queue :as lq]
5+ [langohr.basic :as lb]
6+ [langohr.consumers :as lcons]))
7+
8+ (def ^{:const true } q " rpc_queue" )
9+
10+ (defn fib
11+ [n]
12+ (if (zero? n)
13+ 0
14+ (if (= n 1 )
15+ 1
16+ (+ (fib (- n 1 ))
17+ (fib (- n 2 ))))))
18+
19+ (defn handle-delivery
20+ " Handles message delivery"
21+ [ch {:keys [delivery-tag reply-to correlation-id]} payload]
22+ (let [n (read-string (String. payload " UTF-8" ))]
23+ (println (format " [.] fib(%s)" n))
24+ (let [response (fib n)]
25+ (lb/publish ch " " reply-to (str response) {:correlation-id correlation-id})
26+ (lb/ack ch delivery-tag))))
27+
28+ (defn -main
29+ [& args]
30+ (with-open [conn (lc/connect )]
31+ (let [ch (lch/open conn)]
32+ (lq/declare ch q {:auto-delete false })
33+ (lb/qos ch 1 )
34+ (println " [x] Awaiting RPC requests" )
35+ (lcons/blocking-subscribe ch " rpc_queue" handle-delivery))))
You can’t perform that action at this time.
0 commit comments