Permalink
Browse files

Fix test for progressive-copy; improve readme.

  • Loading branch information...
1 parent d51b2ff commit cb967db17a4c102258a4811c738ad3b69b168528 @technomancy committed Nov 4, 2011
Showing with 39 additions and 35 deletions.
  1. +7 −10 README.md
  2. +32 −25 test/die/test/roboter.clj
View
@@ -40,10 +40,13 @@ The Robots get your work done in an straightforward way.
Jobs will not ack to the server until they've completed successfully,
so workers that throw exceptions or disappear entirely will have their
-jobs automatically retried. By default failed jobs will simply log
-using `clojure.tools.logging/warn`, but you can rebind
-`*exception-handler*` to respond in your own way, including acking the
-message back to the server:
+jobs automatically retried.
+
+Exceptions occurring in `future` calls will be propagated back to the
+caller and thrown upon deref. When using `send-off` by default failed
+jobs will simply log using `clojure.tools.logging/warn`, but you can
+rebind `*exception-handler*` to respond in your own way, including
+acking the message back to the server:
```clj
(defn handle-tachyon [e msg]
@@ -75,12 +78,6 @@ use the `with-robots` macro to bind it dynamically.
(roboter/work))
```
-## Todo
-
-* fix progress-reporting-copy
-* Expose future exceptions to caller (issue #2)
-* Test timeout functionality
-
## License
Image and lyrics quoted above copyright © 1979 Kraftwerk.
View
@@ -11,10 +11,15 @@
(.setLevel (java.util.logging.Logger/getLogger "die.roboter")
java.util.logging.Level/ALL) ; TODO: no-op
+(def ^{:dynamic true} *timeout-expected* false)
+
(def state (atom {}))
(def bound :root)
+(defn ack-handler [e msg]
+ (com.mefesto.wabbitmq/ack (-> msg :envelope :delivery-tag)))
+
(defn clear-queues! [queue-name]
(with-robots {}
(wabbit/with-queue queue-name
@@ -28,12 +33,13 @@
(defmacro with-worker [& body]
`(let [worker# (clojure.core/future (work))]
(try ~@body
- (finally (.cancel worker# true)))))
+ (finally (future-cancel worker#)))))
(defn wait-for [blockers]
(try (.get (clojure.core/future (doseq [b blockers] @b)) 1 TimeUnit/SECONDS)
(catch TimeoutException _
- (is false "Timed out!"))))
+ (when (not *timeout-expected*)
+ (is false "Timed out!")))))
(defmacro with-block [n body]
`(let [blockers# (repeatedly ~n #(promise))
@@ -68,7 +74,7 @@
(work-on-broadcast)))]
(Thread/sleep 100)
(try (broadcast `(swap! state assoc bound true))
- (finally (.cancel worker true)))))
+ (finally (future-cancel worker)))))
(is (= {1 true} @state)))
(deftest test-multiple-broadcast
@@ -81,39 +87,40 @@
(work-on-broadcast {:queue "worker2"})))]
(Thread/sleep 100)
(try (broadcast `(swap! state assoc bound true))
- (finally (.cancel worker1 true)
- (.cancel worker2 true)))))
+ (finally (future-cancel worker1)
+ (future-cancel worker2)))))
(is (= {1 true 2 true} @state)))
-(defn ack-handler [e msg]
- (com.mefesto.wabbitmq/ack (-> msg :envelope :delivery-tag)))
-
(deftest test-timeout-normal-copy
(let [worker (clojure.core/future
(binding [*exception-handler* ack-handler]
(work {:timeout 100})))]
- (try
- (send-off `(do (io/copy (io/file "/etc/hosts") (io/file "/dev/null"))
- (swap! state assoc :ran true)))
- (finally (.cancel worker true)))
+ (try
+ (binding [*timeout-expected* true]
+ (with-block 1
+ (send-off `(do (io/copy (io/file "/etc/hosts") (io/file "/dev/null"))
+ (Thread/sleep 2000)
+ (swap! state assoc :ran true)))))
+ (finally (future-cancel worker)))
(is (not (:ran @state)))))
-;; (deftest test-progressive-copy
-;; (let [worker (clojure.core/future
-;; (binding [*exception-handler* ack-handler]
-;; (work {:timeout 100})))]
-;; (try
-;; (send-off `(do (copy (io/file "/etc/hosts") (io/file "/dev/null"))
-;; (swap! state assoc :ran true)))
-;; (finally (.cancel worker true)))
-;; (is (:ran @state))))
+(deftest test-progressive-copy
+ (let [worker (clojure.core/future
+ (binding [*exception-handler* ack-handler]
+ (work {:timeout 100})))]
+ (try
+ (with-block 1
+ (send-off `(do (copy (io/file "/etc/hosts") (io/file "/tmp/hosts"))
+ (swap! state assoc :ran true))))
+ (finally (future-cancel worker)))
+ (is (:ran @state))))
(deftest test-exception-over-future
(let [worker (clojure.core/future
(binding [*exception-handler* ack-handler]
(work {:timeout 100})))]
(try
- (is (thrown? IOException (try @(future (throw (java.io.IOException.)))
- (catch ExecutionException e
- (throw (-> e .getCause .getCause))))))
- (finally (.cancel worker true)))))
+ (is (instance? IOException (try @(future (throw (java.io.IOException.)))
+ (catch ExecutionException e
+ (-> e .getCause .getCause)))))
+ (finally (future-cancel worker)))))

0 comments on commit cb967db

Please sign in to comment.