Permalink
Browse files

Send exceptions back to the caller in future. Fixes #2.

  • Loading branch information...
1 parent 700640c commit d51b2ff5648b571ef5d039f3ff107c56a7acfbba @technomancy committed Nov 4, 2011
Showing with 41 additions and 16 deletions.
  1. +6 −6 project.clj
  2. +17 −3 src/die/roboter.clj
  3. +18 −7 test/die/test/roboter.clj
View
@@ -1,9 +1,9 @@
(defproject die-roboter "1.0.0-SNAPSHOT"
:description "The robots get your work done in an straightforward way."
- :dependencies [[com.mefesto/wabbitmq "0.1.4"
- :exclusions [org.clojure/clojure org.clojure/clojure-contrib]]
- [org.clojure/tools.logging "0.2.0"
- :exclusions [org.clojure/clojure]]]
+ :dependencies [[org.clojure/clojure "[1.2.0,1.3.0]"]
+ [commons-codec "1.5"]
+ [com.mefesto/wabbitmq "0.1.4"
+ :exclusions [org.clojure/clojure-contrib]]
+ [org.clojure/tools.logging "0.2.0"]]
:checksum-deps true
- :local-repo-classpath true
- :dev-dependencies [[org.clojure/clojure "1.2.1"]])
+ :local-repo-classpath true)
View
@@ -8,7 +8,9 @@
(:import (java.util UUID)
(java.util.concurrent Executors TimeUnit TimeoutException)
(java.lang.management ManagementFactory)
- (java.io FilterInputStream)))
+ (java.io FilterInputStream ObjectInputStream ObjectOutputStream
+ ByteArrayInputStream ByteArrayOutputStream)
+ (org.apache.commons.codec.binary Base64)))
(def ^{:doc "Namespace in which robots work." :private true} context
(binding [*ns* (create-ns 'die.roboter.context)] (refer-clojure) *ns*))
@@ -73,6 +75,15 @@
:exchange-type "fanout"
:key "die.roboter.broadcast"} config))))
+(defn- serialize-64 [x]
+ (let [baos (ByteArrayOutputStream.)]
+ (.writeObject (ObjectOutputStream. baos) x)
+ (String. (.encode (Base64.) (.toByteArray baos)))))
+
+(defn throw-form [e]
+ `(throw (-> (.decode (Base64.) ~(serialize-64 e))
+ ByteArrayInputStream. ObjectInputStream. .readObject)))
+
(defmacro future
"Run body on a robot node and return a result upon deref."
[& body]
@@ -81,9 +92,12 @@
(with-robots (merge {:implict true} *config*)
(wabbit/queue-declare reply-queue# false true true)
(send-off (list `wabbit/publish reply-queue#
- '(.getBytes (pr-str (do ~@body)))))
+ '(.getBytes (pr-str (try ~@body
+ (catch Exception e#
+ (throw-form e#)))))))
(wabbit/with-queue reply-queue#
- (-> (wabbit/consuming-seq true) first :body String. read-string))))))
+ (-> (wabbit/consuming-seq true) first :body String.
+ read-string eval))))))
(defn- success? [f timeout]
(try (.get f timeout TimeUnit/MILLISECONDS) true
View
@@ -5,7 +5,8 @@
(:require [com.mefesto.wabbitmq :as wabbit]
[clojure.tools.logging :as log]
[clojure.java.io :as io])
- (:import (java.util.concurrent TimeUnit TimeoutException)))
+ (:import (java.util.concurrent TimeUnit TimeoutException ExecutionException)
+ (java.io IOException)))
(.setLevel (java.util.logging.Logger/getLogger "die.roboter")
java.util.logging.Level/ALL) ; TODO: no-op
@@ -97,12 +98,22 @@
(finally (.cancel worker true)))
(is (not (:ran @state)))))
-(deftest test-progressive-copy
+;; (deftest test-progressive-copy
+;; (let [worker (clojure.core/future
+;; (binding [*exception-handler* ack-handler]
+;; (work {:timeout 100})))]
+;; (try
+;; (send-off `(do (copy (io/file "/etc/hosts") (io/file "/dev/null"))
+;; (swap! state assoc :ran true)))
+;; (finally (.cancel worker true)))
+;; (is (:ran @state))))
+
+(deftest test-exception-over-future
(let [worker (clojure.core/future
(binding [*exception-handler* ack-handler]
(work {:timeout 100})))]
- (try
- (send-off `(do (copy (io/file "/etc/hosts") (io/file "/dev/null"))
- (swap! state assoc :ran true)))
- (finally (.cancel worker true)))
- (is (:ran @state))))
+ (try
+ (is (thrown? IOException (try @(future (throw (java.io.IOException.)))
+ (catch ExecutionException e
+ (throw (-> e .getCause .getCause))))))
+ (finally (.cancel worker true)))))

0 comments on commit d51b2ff

Please sign in to comment.