Permalink
Browse files

Add -main function, Procfile, and start/stop workers defns.

  • Loading branch information...
1 parent a615a55 commit 700640c7142914db410abdbf542d569c723baf27 @technomancy committed Nov 4, 2011
Showing with 24 additions and 5 deletions.
  1. +1 −0 Procfile
  2. +1 −3 README.md
  3. +22 −2 src/die/roboter.clj
View
@@ -0,0 +1 @@
+worker: lein run -m die.roboter
View
@@ -57,7 +57,7 @@ message back to the server:
By default each job has five minutes to complete before it is
considered hung and are killed, returning its work to the queue. The
-timeout can be overridden by rebinding `die.roboter/\*timeout\*` or
+timeout can be overridden by rebinding `die.roboter/*timeout*` or
passing in a `:timeout` config key to the `work` function. You can
also avoid timing out by calling the `die.roboter/report-progress`
function, which resets the timer back to the start.
@@ -80,8 +80,6 @@ use the `with-robots` macro to bind it dynamically.
* fix progress-reporting-copy
* Expose future exceptions to caller (issue #2)
* Test timeout functionality
-* Control worker count via queue
-* Retry limit
## License
View
@@ -3,7 +3,8 @@
(:refer-clojure :exclude [future send-off])
(:require [com.mefesto.wabbitmq :as wabbit]
[clojure.tools.logging :as log]
- [clojure.java.io :as io])
+ [clojure.java.io :as io]
+ [clojure.walk :as walk])
(:import (java.util UUID)
(java.util.concurrent Executors TimeUnit TimeoutException)
(java.lang.management ManagementFactory)
@@ -147,4 +148,23 @@
"Copy between input and output using clojure.java.io/copy, but reporting
progress every so often. Use to prevent long IO operations from timing out."
[input output & opts]
- (apply io/copy (progressive-input input) output opts))
+ (apply io/copy (progressive-input input) output opts))
+
+(def workers (atom ()))
+
+(defn add-worker
+ "Spin up a worker with the given options."
+ [opts]
+ (swap! workers conj (clojure.core/future (work opts))))
+
+(defn stop-worker
+ "Cancel the most recently-created worker."
+ []
+ (swap! workers (fn [[worker & others]]
+ (future-cancel worker)
+ others)))
+
+(defn -main [& {:as opts}]
+ (let [opts (into {:workers (Integer. (or (System/getenv "WORKER_COUNT") 4))}
+ (walk/keywordize-keys opts))]
+ (dotimes [n (opts :workers)] (add-worker opts))))

0 comments on commit 700640c

Please sign in to comment.