Permalink
Browse files

more tests, cleaned up some of the type conversion stuff

  • Loading branch information...
1 parent 89242e3 commit f1a2939855b84fcd19b2c19ff3c1b75d5ef2f1c1 @joshrotenberg committed Dec 27, 2011
View
@@ -141,6 +141,7 @@ See the tests. Most are written as useful examples:
* echo - the standard echo scenario served up a few different ways
* reverse - standard and DSL APIs to create some reverse worker examples, lots of comments too
* http - an http frontended parallel echo system
+ * rpc - some contrived RPC examples
## TODO
View

Large diffs are not rendered by default.

Oops, something went wrong.
View
@@ -6,7 +6,8 @@
[re-rand "0.1.0"]
[compojure "0.6.5"]
[ring/ring-jetty-adapter "0.3.9"]
- [clj-http "0.2.6"]]
+ [clj-http "0.2.6"]
+ [cheshire "2.0.4"]]
:source-path "src/clj"
:java-source-path "src/jvm"
:jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"])
View
@@ -1,8 +1,17 @@
(ns md-clj.broker
+ "Majordomo Broker"
(:import mdbroker))
+;; The broker sits between the client(s) and the worker(s), delegating tasks
+;; from the client to an available worker and returning the worker's response
+;; back to the client. Aside from the specified service requested by the client,
+;; the broker has little interest in the payload itself. Current, the only
+;; task necessary from the user's perspective is to start the broker on
+;; the specified endpoint:
+
(defn start-broker
- "Start a broker on the specified endpoint."
+ "Start a broker on the specified endpoint, i.e.:
+ (start-broker \"tcp://*:5555\" false)"
[endpoint verbose]
(let [broker (mdbroker. verbose)]
(doto broker
View
@@ -33,13 +33,21 @@
(.destroy (:client this)))
(defmacro as-client
+ "Takes a service name keyword, an endpoint to connect to, and the body of
+ a client method. The return value of the body code will be sent as the
+ request, and the whole call will return the response."
[service endpoint & body]
`(let [client# (new-client-memoize ~endpoint false)
request# (do ~@body)
response# (send! client# (name ~service) request#)]
(from-zmsg response# *return-type*)))
(defmacro as-client-async
+ "Takes a service name keyword, an endpoint to connect to, and the body of
+ a client method. The body should return a sequence of values, each of
+ which will be a single asynchronous request sent to the worker. When the
+ sequence has been exhausted, the results will then be fetched and returned as
+ as a sequence in the same order as the request."
[service endpoint & body]
`(let [client# (new-client-memoize ~endpoint true)
request# (do ~@body)]
@@ -50,9 +58,7 @@
col#
(do
(let [item# (recv client#)
- res# (if (> (.size item#) 1)
- (map #(.getData %) (.toArray item#))
- (.getData (.getFirst item#)))]
+ res# (from-zmsg item#)]
(recur (conj col# res#) (+ n# 1))))))))
View
@@ -59,9 +59,10 @@
(defmethod to-zmsg :default [a]
(throw (Exception. (format "to-zmsg doesn't handle %s" (class a)))))
-(defmacro with-return-type
- "Specify the type for the elements returned in the response if the default
- of byte array doesn't do it for you. Options are passed directly to from-zmsg:
+(defmacro with-message-type
+ "Specify the type for the elements in a request to or response from
+ a worker function if the default of byte array doesn't do it for
+ you. Options are passed directly to from-zmsg:
:as-string - all response items will be strings
:as-number - all response items will be parsed as numbers
:as-bytes - the default, all items will be byte arrays
View
@@ -1,4 +1,5 @@
(ns md-clj.worker
+ "Majordomo Worker"
(:use md-clj.core)
(:import mdwrkapi
[org.zeromq ZMsg ZFrame]))
@@ -7,6 +8,10 @@
(defrecord Worker [^String service ^String endpoint ^Boolean verbose function])
(defn new-worker
+ "Create a new worker. Takes a service name keyword, an endpoint (broker)
+ to connect to, and a function that accepts two arguments: the request and
+ response, both being ZMsg objects. The function should populate the response
+ manually, but any return value will be ignored."
[service endpoint function]
(Worker. (name service) endpoint *worker-debug* function))
@@ -31,9 +36,10 @@
reply# (ZMsg.)]
(while (not (.isInterrupted (Thread/currentThread)))
(let [requezt# (.receive worker# reply#)
- ~'request (if (> (.size requezt#) 1)
- (map #(.getData %) (.toArray requezt#))
- (.getData (.getFirst requezt#)))
+ ;;~'request (if (> (.size requezt#) 1)
+ ;;(map #(.getData %) (.toArray requezt#))
+ ;;(.getData (.getFirst requezt#)))
+ ~'request (from-zmsg requezt# *return-type*)
repli# (do ~@body)]
(if (seq? repli#)
(doall (map #(.add reply# (ZFrame. %)) repli#))
View
@@ -3,7 +3,6 @@
clojure.test)
(:import [org.zeromq ZMsg ZFrame]))
-
(deftest to-zmsg-test
(let [zmsg (to-zmsg (doto (ZMsg.) (.add (ZFrame. "something"))))
zframe (to-zmsg (ZFrame. "something else"))
@@ -61,26 +60,26 @@
(is (= '("foo" "baz") (map #(String. %) (from-zmsg (to-zmsg ["foo" "baz"]) :as-bytes))))
)
-(deftest with-return-type-test
- (with-return-type :as-bytes
+(deftest with-message-type-test
+ (with-message-type :as-bytes
(is (= (Class/forName "[B") (class (from-zmsg (to-zmsg "2"))))))
- (with-return-type :as-string
+ (with-message-type :as-string
(is (= java.lang.String (class (from-zmsg (to-zmsg "2"))))))
- (with-return-type :as-number
+ (with-message-type :as-number
(is (= java.lang.Long (class (from-zmsg (to-zmsg "2"))))))
- (with-return-type :as-zmsg
+ (with-message-type :as-zmsg
(is (= org.zeromq.ZMsg (class (from-zmsg (to-zmsg "2"))))))
- (with-return-type :as-bytes
+ (with-message-type :as-bytes
(is (= (Class/forName "[B")
(class (first (from-zmsg (to-zmsg ["2" "3"])))))))
- (with-return-type :as-string
+ (with-message-type :as-string
(is (= java.lang.String
(class (first (from-zmsg (to-zmsg ["2" "3"])))))))
- (with-return-type :as-number
+ (with-message-type :as-number
(is (= java.lang.Long
(class (first (from-zmsg (to-zmsg ["2" "3"])))))))
- (with-return-type :as-zmsg
+ (with-message-type :as-zmsg
(is (= org.zeromq.ZMsg
(class (from-zmsg (to-zmsg ["2" "3"]))))))
)
@@ -86,10 +86,11 @@
;; batch of requests that may contain multiple items.
;; see the note above regarding workers that
;; may need to handle requests with one or more items.
- reply-more-async (mdc/as-client-async :reverse-more ep
- ["duh"
- ["boof"]
- ["what" "now"]])
+ reply-more-async (with-message-type :as-string
+ (mdc/as-client-async :reverse-more ep
+ ["duh"
+ ["boof"]
+ ["what" "now"]]))
;; you can really get your async on by firing off a bunch of async
;; requests in a future and come back later for the results.
ft (future (mdc/as-client-async :reverse-one ep '("one" "two" "three"
@@ -109,9 +110,9 @@
(is (= (map #(String. %) reply-more-vec)
(map #(String. %) reply-more-vec-mix)))
(is (= '("foob" "huhc") (map #(String. %) reply-one-async)))
- (is (= "hud" (String. (first reply-more-async))))
- (is (= "foob" (String. (second reply-more-async))))
- (is (= '("tahw" "won") (map #(String. %) (last reply-more-async))))
+ (is (= "hud" (first reply-more-async)))
+ (is (= "foob" (second reply-more-async)))
+ (is (= '("tahw" "won") (last reply-more-async)))
(is true (future-done? ft))
(is (= '("eno" "owt" "eerht" "ruof" "evif" "xis" "neves" "thgie")
(map #(String. %) @ft))))))
View
@@ -1,21 +1,53 @@
(ns md-clj.test.rpc
(:use clojure.test
md-clj.core
- re-rand)
+ re-rand
+ cheshire.core)
(:require [md-clj.broker :only [start-broker] :as mdb]
[md-clj.worker :only [new-worker run as-worker] :as mdw]
[md-clj.client :only [new-client send!] :as mdc])
(:import org.zeromq.ZMsg))
-
+;; send some clojure to a worker and have the worker run it.
+;; you probably wouldn't do this in real life.
(deftest clj-rpc
(let [ep "tcp://localhost:5555"
broker (future (mdb/start-broker "tcp://*:5555" false))]
(future (mdw/as-worker :clj-rpc ep
(eval (read-string (String. request)))))
- (let [f (with-return-type :as-number
+ (let [f (with-message-type :as-number
(mdc/as-client :clj-rpc ep (str '((comp str +) 8 8 8))))]
(is (= 24 f)))))
+;; silly json-rpc example. you might do something like this in a situation
+;; where you want a single worker to handle multiple types of requests.
+(deftest json-rpc
+ (let [ep "tcp://localhost:5555"
+ broker (future (mdb/start-broker "tcp://*:5555" false))
+ add (fn [a b] (+ a b))
+ multiply (fn [a b] (* a b))]
+
+ (future (with-message-type :as-string
+ (mdw/as-worker :json-rpc ep
+ (let [r (parse-string request true)
+ res (condp = (:fn r)
+ "add" (add (:a r) (:b r))
+ "multiply" (multiply (:a r) (:b r)))]
+ (generate-string {:res res})))))
+
+ (let [a-response (with-message-type :as-string
+ (mdc/as-client :json-rpc ep
+ (generate-string {:fn "add"
+ :a 2
+ :b 4})))
+ m-response (with-message-type :as-string
+ (mdc/as-client :json-rpc ep
+ (generate-string {:fn "multiply"
+ :a 2
+ :b 4})))]
+ (is (= 6 (:res (parse-string a-response true))))
+ (is (= 8 (:res (parse-string m-response true)))))))
+
+

0 comments on commit f1a2939

Please sign in to comment.