Permalink
Browse files

minor changes

  • Loading branch information...
1 parent 7fddc70 commit 02b1312c63381521fe39057aadbdbcf569bf9bb1 @sunng87 committed Sep 28, 2011
Showing with 60 additions and 73 deletions.
  1. +60 −61 src/clojalk/core.clj
  2. +0 −12 test/clojalk/test/core.clj
View
@@ -23,7 +23,67 @@
(:require [clojalk.wal])
(:import [java.util.concurrent Executors TimeUnit]))
+;; ## Schedule tasks for time based task
+;;
+
+;; Update a delayed job and set it as ready.
+;;
+(defn- update-delayed-job [job-id]
+ (if-let [job (job-id @jobs)]
+ (when (= :delayed (:state job))
+ (dosync
+ (alter (:delay_set ((:tube job) @tubes)) disj job)
+ (clojalk.wal/write-job job false)
+ (set-job-as-ready job)))))
+
+;; Release an expired job set it as ready
+;;
+;; Since we won't cancel the task so we should check if the
+;; task is still valid before we actually run it.
+;;
+;; For this scenario, we should ensure:
+;;
+;; * the job has exceed its deadline. To prevent the deadline is
+;;override by another operation.
+;; * the state of job is still `:reserved`
+;;
+(defn- update-expired-job [job-id]
+ (if-let [job (job-id @jobs)]
+ (when (and (>= (current-time) (:deadline_at job)) (= :reserved (:state job)))
+ (let [session (:reserver job)
+ updated-job (assoc job :state :ready
+ :reserver nil
+ :timeouts (inc (:timeouts job)))]
+ (clojalk.wal/write-job updated-job false)
+ (dosync
+ (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
+ (alter job-timeouts inc)
+ (set-job-as-ready updated-job))))))
+
+;; Enable a paused tube
+;;
+(defn- update-paused-tube [tube]
+ (dosync
+ (ref-set (:paused t) false)
+
+ ;; handle waiting session
+ (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
+ (doseq [s (keys pending-pairs)]
+ (reserve-job s (pending-pairs s))))))
+;; Reject a session that waiting for reservation
+;;
+(defn- update-expired-waiting-session [session]
+ (if (= :waiting (:state @session))
+ (dosync
+ (dequeue-waiting-session session)
+ (alter session assoc :state :idle))))
+
+(defonce schduler
+ (Executors/newScheduledThreadPool (* 2 (.availableProcessors (Runtime/getRuntime)))))
+
+(defn shedule [task delay]
+ (.schedule scheduler task delay TimeUnit/SECONDS))
;; ## Functions to handle clojalk logic
@@ -508,65 +568,4 @@
:current-jobs-delayed (count delayed-jobs)
:current-jobs-buried (count buried-jobs)})))
-;; ## Schedule tasks for time based task
-;;
-
-;; Update a delayed job and set it as ready.
-;;
-(defn- update-delayed-job [job-id]
- (if-let [job (job-id @jobs)]
- (when (= :delayed (:state job))
- (dosync
- (alter (:delay_set tube) disj job)
- (clojalk.wal/write-job job false)
- (set-job-as-ready job)))))
-
-;; Release an expired job set it as ready
-;;
-;; Since we won't cancel the task so we should check if the
-;; task is still valid before we actually run it.
-;;
-;; For this scenario, we should ensure:
-;;
-;; * the job has exceed its deadline. To prevent the deadline is
-;;override by another operation.
-;; * the state of job is still `:reserved`
-;;
-(defn- update-expired-job [job-id]
- (if-let [job (job-id @jobs)]
- (when (and (>= (current-time) (:deadline_at job)) (= :reserved (:state job)))
- (let [session (:reserver job)
- updated-job (assoc job :state :ready
- :reserver nil
- :timeouts (inc (:timeouts job)))]
- (clojalk.wal/write-job updated-job false)
- (dosync
- (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
- (alter job-timeouts inc)
- (set-job-as-ready updated-job))))))
-
-;; Enable a paused tube
-;;
-(defn- update-paused-tube [tube]
- (dosync
- (ref-set (:paused t) false)
-
- ;; handle waiting session
- (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
- (doseq [s (keys pending-pairs)]
- (reserve-job s (pending-pairs s))))))
-
-;; Reject a session that waiting for reservation
-;;
-(defn- update-expired-waiting-session [session]
- (if (= :waiting (:state @session))
- (dosync
- (dequeue-waiting-session session)
- (alter session assoc :state :idle))))
-
-(defonce schduler
- (Executors/newScheduledThreadPool (* 2 (.availableProcessors (Runtime/getRuntime)))))
-
-(defn shedule [task delay]
- (.schedule scheduler task delay TimeUnit/SECONDS))
@@ -103,9 +103,6 @@
;; sleep
(sleep 3)
- ;; update tasks
- (update-delay-job-task)
-
(is (= 1 (count @(:delay_set (:delay-task-test @tubes)))))
(is (= 3 (count @(:ready_set (:delay-task-test @tubes)))))))
@@ -214,10 +211,6 @@
(is (= 1 (count @(:ready_set (:expire-task-test @tubes)))))
- ;;wait for expire
- (sleep 1.5)
- (update-expired-job-task)
-
(is (= 3 (count @(:ready_set (:expire-task-test @tubes)))))
(is (= 1 (count (:reserved_jobs @session-w))))))
@@ -232,9 +225,6 @@
(reserve session-w)
(is (= :waiting (:state @session-w)))
- (sleep 0.8)
- (update-paused-tube-task)
-
;; job could be automatically assign to pending worker
(is (= :working (:state @session-w)))
(is (false? @(:paused (:expire-tube-test @tubes))))))
@@ -289,8 +279,6 @@
(reserve-with-timeout session-w 0.5)
(is (= 1 (count @(:waiting_list (:test-reserve-timeout @tubes)))))
- (sleep 0.7)
- (update-expired-waiting-session-task)
(is (empty? @(:waiting_list (:test-reserve-timeout @tubes))))
(is (= :idle (:state @session-w)))))

0 comments on commit 02b1312

Please sign in to comment.