Permalink
Browse files

WIP: add support for timeouts.

  • Loading branch information...
1 parent acf3fd2 commit 093f15888ba89f14b9f7e5e4e509525e92ceec24 @technomancy committed Aug 3, 2011
Showing with 40 additions and 8 deletions.
  1. +10 −4 README.md
  2. +30 −4 src/die/roboter.clj
View
@@ -51,6 +51,13 @@ message back to the server:
(binding [roboter/*exception-handler* handle-tachyon]
(roboter/work))
+By default each job has five minutes to complete before it is
+considered hung and are killed, returning its work to the queue. The
+timeout can be overridden by rebinding `die.roboter/\*timeout\*` or
+passing in a `:timeout` config key to the `work` function. You can
+also avoid timing out by calling the `die.roboter/report-progress`
+function, which resets the timer back to the start.
+
AMQP is used as the transport. [RabbitMQ](http://rabbitmq.com) is a
popular choice. Most functions take an optional `config` argument that
can be used to specify the AMQP connection settings, but you can also
@@ -64,11 +71,10 @@ use the `with-robots` macro to bind it dynamically.
## Todo
-* Timeouts
-* Thread pooling
+* Test timeout functionality
* Switch to tools.namespace once it's fixed
-* Fix broadcast (see failing test)
-* Control queue
+* Fix race condition in broadcast tests
+* Control worker count via queue
## License
View
@@ -18,6 +18,12 @@
[e msg]
(log/warn e "Robot ran into trouble:" (String. (:body msg))))
+(def ^{:doc "How long before jobs that don't report progress are killed, in ms."
+ :dynamic true} *timeout*
+ (* 1000 60 5)) ; five minutes
+
+(defn ^{:dynamic true :doc "Reset job timeout."} report-progress [])
+
(defn ^{:internal true :doc "Public for macro-expansion only!"} init [config]
(try (wabbit/exchange-declare (:exchange config "die.roboter")
(:exchange-type config "direct")
@@ -76,11 +82,31 @@
(wabbit/with-queue reply-queue#
(-> (wabbit/consuming-seq true) first :body String. read-string))))))
-(defn- consume [{:keys [body envelope] :as msg}]
+(defn- success? [f timeout]
+ (try (.get f timeout TimeUnit/MILLISECONDS) true
+ ;; TODO: get stack trace if there's an exception inside the future
+ (catch TimeoutException _)))
+
+(defn- supervise [f progress timeout]
+ (when-not (success? f timeout)
+ (if @progress
+ (do (reset! progress false)
+ (recur f progress timeout))
+ (future-cancel f))))
+
+(defn- run-with-timeout [timeout f & args]
+ (let [progress (atom false)
+ f-fn (bound-fn [] (apply f args))
+ fut (clojure.core/future ; TODO: name thread
+ (binding [report-progress (fn [] (reset! progress true))]
+ (f-fn)))]
+ (-> #(supervise fut progress timeout) (Thread.) .start)
+ fut))
+
+(defn- consume [{:keys [body envelope] :as msg} timeout]
(binding [*ns* context,*current-message* msg]
- ;; TODO: timeouts
(log/trace "Robot received message:" (String. body))
- (eval (read-string (String. body))))
+ (run-with-timeout timeout eval (read-string (String. body))))
(wabbit/ack (:delivery-tag envelope)))
(defn work
@@ -90,7 +116,7 @@
(wabbit/with-queue (:queue config "die.roboter.work")
(log/trace "Consuming on" (:queue config "die.roboter.work"))
(doseq [msg (wabbit/consuming-seq)]
- (try (consume msg)
+ (try (consume msg (:timeout config *timeout*))
(catch Exception e
(*exception-handler* e msg)))))))
([] (work {:implicit true})))

0 comments on commit 093f158

Please sign in to comment.