Skip to content

Commit

Permalink
add reserve-with-timeout interface and task
Browse files Browse the repository at this point in the history
  • Loading branch information
sunng87 committed Aug 14, 2011
1 parent 3a7fb2d commit 45e808b
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 7 deletions.
35 changes: 28 additions & 7 deletions src/clojalk/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@
top-jobs (filter not-nil (map #(first @(:ready_set %)) watch-tubes))]
(first (apply sorted-set-by (conj top-jobs priority-comparator)))))

(defn- enqueue-waiting-session [session]
(let [watch-tubes (filter #(contains? (:watch @session) (:name %)) (vals @tubes))]
(defn- enqueue-waiting-session [session timeout]
(let [watch-tubes (filter #(contains? (:watch @session) (:name %)) (vals @tubes))
deadline_at (if (nil? timeout) nil (+ (current-time) (* timeout 1000)))]
(doseq [tube watch-tubes]
(alter (:waiting_list tube) conj session)
(alter session assoc :state :waiting)
(alter tubes assoc (:name tube) tube))))
(do
(alter (:waiting_list tube) conj session)
(alter session assoc
:state :waiting
:deadline_at deadline_at)
(alter tubes assoc (:name tube) tube)))))

(defn- dequeue-waiting-session [session]
(let [watch-tubes (filter #(contains? (:watch @session) (:name %)) (vals @tubes))]
Expand Down Expand Up @@ -125,12 +129,15 @@
(let [tube ((:use @session) @tubes)]
(first @(:buried_list tube))))

(defn reserve [session]
(defn reserve-with-timeout [session timeout]
(dosync
(enqueue-waiting-session session)
(enqueue-waiting-session session timeout)
(if-let [top-job (top-ready-job session)]
(reserve-job session top-job))))

(defn reserve [session]
(reserve-with-timeout session nil))

(defn use [session tube-name]
(let [tube-name-kw (keyword tube-name)]
(dosync
Expand Down Expand Up @@ -273,3 +280,17 @@
(ref-set (:paused t) false)
(alter tubes assoc (:name t) t))))))

(defn update-expired-waiting-session-task []
(dosync
(doseq
[tube (vals @tubes)]
(let [now (current-time)
waiting_list @(:waiting_list tube)
active-func (fn [s] (or (nil? (:deadline_at @s)) (< now (:deadline_at @s))))]
(do
(ref-set (:waiting_list tube) (vec (filter active-func waiting_list)))
(doseq [session (filter #(false? (active-func %)) waiting_list)]
;; we don't update deadline_at on this task,
;; it will be updated next time when it's reserved
(alter session assoc :state :idle)))))))

12 changes: 12 additions & 0 deletions test/clojalk/test/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -264,3 +264,15 @@
(kick session-p 10)
(is (= :reserved (:state (get @jobs the-job-id))))
(is (= :working (:state @session-w))))))


(deftest test-reserve-timeout
(let [session-w (watch (open-session :worker) "test-reserve-timeout")]
;;reserve an empty tube with timeout
(reserve-with-timeout session-w 0.5)
(is (= 1 (count @(:waiting_list (:test-reserve-timeout @tubes)))))

(sleep 0.7)
(update-expired-waiting-session-task)
(is (empty? @(:waiting_list (:test-reserve-timeout @tubes))))
(is (= :idle (:state @session-w)))))

0 comments on commit 45e808b

Please sign in to comment.