Find file
Fetching contributors…
Cannot retrieve contributors at this time
143 lines (120 sloc) 3.96 KB
(ns trabajo.core
(:require [redis.core :as redis]))
; todo
; - namespace redis keys with "trabajo"
; - job timeouts
(def redis-conf (atom {:host "localhost" :port 6379}))
(defmacro with-redis
"Executes the body with the default redis instance."
[& body]
`(redis/with-server ~(deref redis-conf) ~@body))
(defn enqueue
"Puts a job onto the queue. Returns the job ID."
[queue func args & {:keys [timeout] :as opts}] ;todo timeouts
(let [qn (name queue) id (redis/incr (str qn ":sequencer"))]
(redis/hmset (str qn ":" id) "func" func "args" (pr-str args))
(redis/rpush qn id)
(defn dequeue
"Pulls the next job from the queue. Returns the next job or nil."
(when-let [id (redis/lpop (name queue))]
(-> (redis/hgetall (str (name queue) ":" id))
(assoc :id id)
(assoc :queue queue)))))
(defn ^:private store-error
"Stores the given error for the given job into Redis."
[job message]
(redis/rpush (str (name (:queue job) ) ":failed") (:id job))
(redis/set (str (name (:queue job)) ":" (:id job) ":error") message)))
(defn apply-job
"Applies the function from the job with its arguments."
[{:strs [func args] :as job}]
(apply (ns-resolve *ns* (symbol func)) (read-string args)))
;(defn process-job
; "Processes the job and handles any errors."
; [job]
; (try
; (apply-job job)
; (catch Exception e
; (store-error job (.toString e)))))
;(def workers (ref {}))
;(defn ^:private process [queue]
; (loop []
; (when-not (Thread/interrupted)
; (if-let [job (dequeue queue)]
; (process-job job)
; (Thread/sleep 5000))
; (recur))))
;(defn work-on
; "Returns a thread that polls the given queue until interrupted."
; [queue]
; (dosync
; (let [t (Thread. #(process queue))]
; (alter workers update-in [queue] #(conj (if (nil? %) [] %) t))
; (.start t))))
(defn test-work [x]
(with-open [out ( "/home/jeremy/job.out" true)]
(.write out (str x "\n"))))
(def work-manager (ref {}))
;(def poll-manager (ref {}))
(defn ^:private request-promise-from-work-manager
(peek (:promises (alter work-manager update-in [:promises] conj (promise))))))
(defn ^:private process-job
(apply-job job)
(catch Exception e (store-error job (.toString e)))
(finally (dosync
(let [p (peek (:promises @work-manager))]
;(alter work-manager update-in [:promises] #(-> % (pop) (conj (promise))))
(alter work-manager update-in [:promises] pop)
(deliver p true))))))
(defn ^:private process
(loop [[q & qs :as q+qs] queues p (atom true)]
(when-not (Thread/interrupted)
(if (or
(< (count (:promises @work-manager)) (:max-workers @work-manager))
@p) ; blocks until a worker is available
(if-let [job (dequeue q)]
(let [f (future-call #(process-job job))]
;(dosync (alter work-manager update-in [:futures] conj f))
(recur queues (request-promise-from-work-manager)))
(Thread/sleep 5000)
(recur (if (nil? qs) queues qs) p)))
(recur q+qs (request-promise-from-work-manager))))))
(defn running?
"Returns true if the work-manager thread is running, otherwise false."
(if-let [t (:thread @work-manager)]
(.isAlive? t)
(defn start
"Starts a work manager that polls jobs on the given queue."
[n & queues]
(if-not (running?)
(let [t (Thread. #(process queues))]
(ref-set work-manager
{:thread t
:promises (clojure.lang.PersistentQueue/EMPTY)
:max-workers n})
(.start t))
(throw (IllegalStateException. "A work manager has already been started.")))))
(defn stop
"Stops the work manager if one has been started."
(when-let [t (:thread @work-manager)]
(.interrupt t))))