Skip to content
Browse files

Replace futures with threads because they're easier to manage and I d…

…on't care about it ever returning a value.
  • Loading branch information...
1 parent f63ac74 commit 0c280874ab3cb07a6794029418bee15f77f54241 @jeremyheiler committed Apr 17, 2012
Showing with 17 additions and 11 deletions.
  1. +17 −11 src/trabajo/core.clj
View
28 src/trabajo/core.clj
@@ -1,6 +1,11 @@
(ns trabajo.core
(:require [redis.core :as redis]))
+; todo
+; - fn to kill and remove workers
+; - fn to replace dead workers
+; - job timeouts
+
(def redis-conf (atom {:host "localhost" :port 6379}))
(defmacro with-redis
@@ -31,26 +36,27 @@
(def workers (ref {}))
-(defn ^:private init-future [queue]
- (future
- (loop []
- (when-not (Thread/interrupted)
- (if-let [job (dequeue queue)]
- (apply-job job)
- (Thread/sleep 5000))
- (recur)))))
+(defn ^:private process [queue]
+ (loop []
+ (when-not (Thread/interrupted)
+ (if-let [job (dequeue queue)]
+ (apply-job job)
+ (Thread/sleep 5000))
+ (recur))))
(defn work-on
- "Returns a future that polls the given queue until cancelled."
+ "Returns a thread that polls the given queue until interrupted."
[queue]
(dosync
- (let [f (init-future queue)]
- (alter workers update-in [queue] #(conj (if (nil? %) [] %) f)))))
+ (let [t (Thread. #(process queue))]
+ (alter workers update-in [queue] #(conj (if (nil? %) [] %) t))
+ (.start t))))
(defn test-work [x]
(with-open [out (java.io.FileWriter. "/home/jeremy/job.out" true)]
(.write out (str x "\n"))))
+
;(defn inc-workers [queue-key incr]
; (dosync
; (alter queues update-in [queue-key :workers] (fn [old]

0 comments on commit 0c28087

Please sign in to comment.
Something went wrong with that request. Please try again.