Permalink
Browse files

Send exceptions over the wire, just don't re-throw them.

  • Loading branch information...
1 parent dc26bc6 commit 095e92c64b04a394ab401058f5110bd16c3883ac @technomancy committed May 25, 2012
Showing with 90 additions and 59 deletions.
  1. +2 −2 README.md
  2. +71 −47 src/die/roboter.clj
  3. +17 −10 test/die/test/roboter.clj
View
@@ -32,15 +32,15 @@ The Robots get your work done in an straightforward way.
(roboter/send-off `(println "Boing, Boom Tschak.")) ; returns immediately
(let [f (roboter/send-back `(slurp "/etc/hosts"))]
- (println @f)) ; when you need a return value back
+ (println @f)) ; when you need a return value back, returns a promise
```
Jobs will not ack to the server until they've completed successfully,
so workers that throw exceptions or disappear entirely will have their
jobs automatically retried.
Exceptions occurring in `send-back` calls will be propagated back to the
-caller and thrown upon deref. When using `send-off` by default failed
+caller and returned upon deref. When using `send-off` by default failed
jobs will simply log using `clojure.tools.logging/warn`, but you can
rebind `*exception-handler*` to respond in your own way, including
acking the message back to the server:
View
@@ -59,52 +59,6 @@
(wabbit/with-exchange (:exchange ~config)
~@body))))))
-(defn send-off
- "Execute a form on a robot node."
- ([form] (send-off form {}))
- ([form config]
- (with-robots (merge {:implicit true} config)
- (log/trace "Published" (pr-str form) (:key config "die.roboter.work"))
- (wabbit/publish (:key config "die.roboter.work")
- (.getBytes (pr-str form))))))
-
-(defn broadcast
- "Like send-off, but the form runs on all robot nodes."
- ([form] (broadcast form {}))
- ([form config]
- (send-off form (merge {:exchange "die.roboter.broadcast"
- :exchange-type "fanout"
- :key "die.roboter.broadcast"} config))))
-
-(defn- serialize-64 [x]
- (let [baos (ByteArrayOutputStream.)]
- (.writeObject (ObjectOutputStream. baos) x)
- (String. (.encode (Base64.) (.toByteArray baos)))))
-
-(defn throw-form [e]
- `(do ::eval (-> (.decode (Base64.) ~(serialize-64 e))
- ByteArrayInputStream. ObjectInputStream. .readObject throw)))
-
-(defn read-or-eval [{:keys [body props]}]
- (let [value (-> body String. read-string)]
- (if (and (coll? value) (= (second value) ::eval))
- (eval value)
- value)))
-
-(defn send-back
- ([form] (send-back form {}))
- ([form config]
- (let [reply-queue (format "die.roboter.reply.%s" (UUID/randomUUID))]
- (clojure.core/future
- (with-robots (merge {:implict true} config)
- (wabbit/queue-declare reply-queue false true true)
- (send-off (list `wabbit/publish reply-queue
- `(.getBytes (pr-str (try ~form
- (catch Exception e#
- (throw-form e#)))))))
- (wabbit/with-queue reply-queue
- (-> (wabbit/consuming-seq true) first read-or-eval)))))))
-
(defn- success? [f timeout]
(try (.get f timeout TimeUnit/MILLISECONDS) true
(catch TimeoutException _)))
@@ -152,6 +106,72 @@
config)))
([] (work-on-broadcast {:implicit true})))
+(defn send-off
+ "Execute a form on a robot node."
+ ([form] (send-off form {}))
+ ([form config]
+ (with-robots (merge {:implicit true} config)
+ (log/trace "Published" (pr-str form) (:key config "die.roboter.work"))
+ (wabbit/publish (:key config "die.roboter.work")
+ (.getBytes (pr-str form))))))
+
+(defn broadcast
+ "Like send-off, but the form runs on all robot nodes."
+ ([form] (broadcast form {}))
+ ([form config]
+ (send-off form (merge {:exchange "die.roboter.broadcast"
+ :exchange-type "fanout"
+ :key "die.roboter.broadcast"} config))))
+
+;; Each send-back call places a promise in the responses map and
+;; queues up a form that sends back a form that delivers the evaluated
+;; response to the given promise.
+(defonce ^{:internal true} responses (atom {}))
+
+(defn- deserialize-if-needed [value]
+ (if (and (coll? value) (= ::base64 (first value)))
+ (-> (.decode (Base64.) (second value))
+ ByteArrayInputStream. ObjectInputStream. .readObject)
+ value))
+
+(defn deliver-response [responses id value]
+ (when-let [p (responses id)]
+ (deliver p (deserialize-if-needed value)))
+ (dissoc responses id))
+
+(defonce response-queue (str "die.roboter.response." (UUID/randomUUID)))
+
+(def deliverer
+ (Thread. #(work {:queue response-queue})))
+
+;; This is kinda lame; probably better to use j.io.Serializable
+;; outright and ditch the reader?
+(defn serialize-64 [x]
+ (let [baos (ByteArrayOutputStream.)]
+ (.writeObject (ObjectOutputStream. baos) x)
+ [::base64 (String. (.encode (Base64.) (.toByteArray baos)))]))
+
+(defn- send-back-form [id form]
+ (list `wabbit/publish response-queue
+ (list '.getBytes
+ (list 'pr-str
+ `(list 'swap! `responses `deliver-response
+ ~id (try ~form
+ (catch Exception e#
+ (serialize-64 e#))))))))
+
+(defn send-back
+ ([form] (send-back form {}))
+ ([form config]
+ (let [id (str (UUID/randomUUID)), response (promise)]
+ (swap! responses assoc id response)
+ (with-robots (merge {:implict true} config)
+ ;; (wabbit/queue-declare response-queue)
+ (send-off (send-back-form id form))
+ (when-not (.isAlive deliverer)
+ (.start deliverer))
+ response))))
+
(defn- progressive-input [input]
;; TODO: this fails without the erronous hint
(let [ins (io/input-stream ^java.io.File input)]
@@ -185,7 +205,11 @@
(defn -main [& {:as opts}]
(let [opts (into {:workers (or (System/getenv "WORKER_COUNT") 4)
- :log-level (or (System/getenv "LOG_LEVEL") "info")}
+ :log-level (or (System/getenv "LOG_LEVEL") "info")
+ ;; workaround for https://github.com/mefesto/wabbitmq/pull/7
+ :virtual-host (if-let [uri (System/getenv "RABBITMQ_URL")]
+ (->> (.getPath (java.net.URI. uri))
+ (re-find #"/?(.*)") (second)))}
(walk/keywordize-keys opts))]
(println "Starting" (:workers opts) "workers.")
(.setLevel (LogManager/getLogger "die.roboter")
View
@@ -6,7 +6,12 @@
[clojure.tools.logging :as log]
[clojure.java.io :as io])
(:import (java.util.concurrent TimeUnit TimeoutException ExecutionException)
- (java.io IOException)))
+ (java.io IOException)
+ (org.apache.log4j Level LogManager)))
+
+(when-let [level (System/getenv "LOG_LEVEL")]
+ (.setLevel (LogManager/getLogger "die.roboter")
+ (Level/toLevel (.toUpperCase level))))
(def ^{:dynamic true} *timeout-expected* false)
@@ -17,13 +22,14 @@
(defn ack-handler [e msg]
(com.mefesto.wabbitmq/ack (-> msg :envelope :delivery-tag)))
-(defn clear-queues! [queue-name]
+(defn clear-queues! [& queues]
(with-robots {}
- (wabbit/with-queue queue-name
- (doall (take 100 (wabbit/consuming-seq true 1))))))
+ (doseq [queue queues]
+ (wabbit/with-queue queue
+ (doall (take 100 (wabbit/consuming-seq true 1)))))))
(defn work-fixture [f]
- (clear-queues! "die.roboter.work")
+ (clear-queues! "die.roboter.work" "die.roboter.response")
(reset! state {})
(f))
@@ -62,8 +68,9 @@
(deftest test-send-back
(with-worker
- (is (= 1 (.get (send-back 1) 100 TimeUnit/MILLISECONDS)))
- (is (= 2 (.get (send-back `(+ 1 1)) 100 TimeUnit/MILLISECONDS)))))
+ (Thread/sleep 50) ; allow the workers to spin up
+ (is (= 1 (deref (send-back 1) 100 :timeout)))
+ (is (= 2 (deref (send-back `(+ 1 1)) 100 :timeout)))))
;; TODO: still too much nondeterminism here.
;; (deftest ^:broadcast test-simple-broadcast
@@ -121,7 +128,7 @@
(binding [*exception-handler* ack-handler]
(work {:timeout 100})))]
(try
- (is (instance? IOException (try @(send-back '(throw (java.io.IOException.)))
- (catch ExecutionException e
- (-> e .getCause)))))
+ (is (instance? IOException
+ (deref (send-back '(throw (java.io.IOException.)))
+ 100 :timeout)))
(finally (future-cancel worker)))))

0 comments on commit 095e92c

Please sign in to comment.