diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ee508f7 --- /dev/null +++ b/.gitignore @@ -0,0 +1,10 @@ +/target +/lib +/classes +/checkouts +pom.xml +*.jar +*.class +.lein-deps-sum +.lein-failures +.lein-plugins diff --git a/README.md b/README.md new file mode 100644 index 0000000..1f4ab5f --- /dev/null +++ b/README.md @@ -0,0 +1,10 @@ +# Trabajo + +Trabajo is a Redis-backed library for creating background jobs with Clojure. + +## License + +Copyright © 2012 Jeremy Heiler + +Distributed under the Eclipse Public License, the same as Clojure. + diff --git a/project.clj b/project.clj new file mode 100644 index 0000000..d8717bd --- /dev/null +++ b/project.clj @@ -0,0 +1,9 @@ +(defproject trabajo "0.1.0-SNAPSHOT" + :description "Trabajo is a Redis-backed library for creating background jobs with Clojure." + :url "https://github.com/jeremyheiler/trabajo" + :license {:name "Eclipse Public License" + :url "http://www.eclipse.org/legal/epl-v10.html"} + :dependencies [[org.clojure/clojure "1.3.0"] + [org.clojars.tavisrudd/redis-clojure "1.3.1"]] + :plugins [[lein-swank "1.4.4"] + [lein-tarsier "0.9.1"]]) diff --git a/src/trabajo/core.clj b/src/trabajo/core.clj new file mode 100644 index 0000000..9d656a4 --- /dev/null +++ b/src/trabajo/core.clj @@ -0,0 +1,91 @@ +(ns trabajo.core + (:require [redis.core :as redis])) + +(def redis-conf (atom {:host "localhost" :port 6379})) + +(defmacro with-redis + "Executes the body with the default redis instance." + [& body] + `(redis/with-server ~(deref redis-conf) ~@body)) + +(defn enqueue + "Puts a job onto the queue. Returns the job ID." + [queue func args & {:keys [timeout] :as opts}] ;todo timeouts + (with-redis + (let [qn (name queue) id (redis/incr (str qn ":sequencer"))] + (redis/hmset (str qn ":" id) "func" func "args" (pr-str args)) + (redis/rpush qn id) + id))) + +(defn dequeue + "Pulls the next job from the queue. Returns the next job or nil." + [queue] + (with-redis + (when-let [id (redis/lpop (name queue))] + (redis/hgetall (str (name queue) ":" id))))) + +(defn apply-job + "Applies the function from the job with its arguments." + [{:strs [func args] :as job}] + (apply (ns-resolve *ns* (symbol func)) (read-string args))) + +(def workers (ref {})) + +(defn ^:private init-future [queue] + (future + (loop [] + (when-not (Thread/interrupted) + (if-let [job (dequeue queue)] + (apply-job job) + (Thread/sleep 5000)) + (recur))))) + +(defn work-on + "Returns a future that polls the given queue until cancelled." + [queue] + (dosync + (let [f (init-future queue)] + (alter workers update-in [queue] #(conj (if (nil? %) [] %) f))))) + +(defn test-work [x] + (with-open [out (java.io.FileWriter. "/home/jeremy/job.out" true)] + (.write out (str x "\n")))) + +;(defn inc-workers [queue-key incr] +; (dosync +; (alter queues update-in [queue-key :workers] (fn [old] +; (reduce (fn [v n] (conj v (init-worker))) old (range incr)))))) + +;(defn ^:private poll-redis [queue-key worker] +; (with-redis +; (let [id (-> queue-key queue-name redis/lpop)] +; (when-not (nil? id) +; (let [{:strs [func args]} (redis/hgetall id)] +; (assoc worker :future (future (apply (load-fn func) args)))))))) + +;(defn ^:private init-workers [worker-count] +; (reduce (fn [v n] (conj v (init-worker))) [] (range worker-count))) + +;(defn ^:private init-manager [queue-key] +; (future +; (loop [] +; (when-not (Thread/interrupted) +; (when (nil? (dosync +; (let [worker (first (filter #(let [f (:future %)] (and (future? f) (future-done? f))) (get-in @queues [queue-key :workers])))] +; (when-not (nil? worker) +; (poll-redis queue-key worker))))) +; (Thread/sleep 5000)) +; (recur))))) + +;(defn ^:private init-queue [queue-key worker-count] +; (alter queues assoc queue-key {:manager (init-manager queue-key) + +;(defn listen +; "Returns a future that polls the given Redis queue every five seconds. +; If one already exists, that one is returned, otherwise a new one is created." +; ([queue-key] (listen queue-key 1)) +; ([queue-key worker-count] +; (if (keyword? queue-key) +; (dosync (:future (-> @queues queue-key #(if (nil? %) (init-queue queue-key worker-count) %)))) +; (throw (IllegalArgumentException. "Argument must be a keyword"))))) +