Permalink
Browse files

fix compilation error

  • Loading branch information...
1 parent 02b1312 commit 18f7e86a461a574605121e39d3361aabf5b5c535 @sunng87 committed Sep 28, 2011
Showing with 71 additions and 68 deletions.
  1. +64 −62 src/clojalk/core.clj
  2. +5 −6 src/clojalk/utils.clj
  3. +2 −0 test/clojalk/test/core.clj
View
@@ -23,67 +23,12 @@
(:require [clojalk.wal])
(:import [java.util.concurrent Executors TimeUnit]))
-;; ## Schedule tasks for time based task
-;;
+;; predefine the task names
+(declare update-delayed-job)
+(declare update-paused-tube)
+(declare update-expired-job)
+(declare update-expired-waiting-session)
-;; Update a delayed job and set it as ready.
-;;
-(defn- update-delayed-job [job-id]
- (if-let [job (job-id @jobs)]
- (when (= :delayed (:state job))
- (dosync
- (alter (:delay_set ((:tube job) @tubes)) disj job)
- (clojalk.wal/write-job job false)
- (set-job-as-ready job)))))
-
-;; Release an expired job set it as ready
-;;
-;; Since we won't cancel the task so we should check if the
-;; task is still valid before we actually run it.
-;;
-;; For this scenario, we should ensure:
-;;
-;; * the job has exceed its deadline. To prevent the deadline is
-;;override by another operation.
-;; * the state of job is still `:reserved`
-;;
-(defn- update-expired-job [job-id]
- (if-let [job (job-id @jobs)]
- (when (and (>= (current-time) (:deadline_at job)) (= :reserved (:state job)))
- (let [session (:reserver job)
- updated-job (assoc job :state :ready
- :reserver nil
- :timeouts (inc (:timeouts job)))]
- (clojalk.wal/write-job updated-job false)
- (dosync
- (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
- (alter job-timeouts inc)
- (set-job-as-ready updated-job))))))
-
-;; Enable a paused tube
-;;
-(defn- update-paused-tube [tube]
- (dosync
- (ref-set (:paused t) false)
-
- ;; handle waiting session
- (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
- (doseq [s (keys pending-pairs)]
- (reserve-job s (pending-pairs s))))))
-
-;; Reject a session that waiting for reservation
-;;
-(defn- update-expired-waiting-session [session]
- (if (= :waiting (:state @session))
- (dosync
- (dequeue-waiting-session session)
- (alter session assoc :state :idle))))
-
-(defonce schduler
- (Executors/newScheduledThreadPool (* 2 (.availableProcessors (Runtime/getRuntime)))))
-
-(defn shedule [task delay]
- (.schedule scheduler task delay TimeUnit/SECONDS))
;; ## Functions to handle clojalk logic
@@ -249,7 +194,7 @@
:delayed (do
(alter (:delay_set tube) conj job)
(alter jobs assoc (:id job) job)
- (schedule #(update-delayed-job (:id job) (:delay job))))
+ (schedule #(update-delayed-job (:id job)) (:delay job)))
:ready (set-job-as-ready job)))
job))))
@@ -363,7 +308,7 @@
(if (> delay 0)
(do
(alter (:delay_set tube) conj (assoc updated-job :state :delayed))
- (schedule #(update-delayed-job (:id updated-job) delay)))
+ (schedule #(update-delayed-job (:id updated-job)) delay))
(set-job-as-ready (assoc updated-job :state :ready)))
(alter session assoc :incoming_job nil)
(alter session assoc :reserved_jobs
@@ -569,3 +514,60 @@
:current-jobs-buried (count buried-jobs)})))
+;; ## Schedule tasks for time based task
+;;
+
+;; Update a delayed job and set it as ready.
+;;
+(defn- update-delayed-job [job-id]
+ (if-let [job (job-id @jobs)]
+ (when (= :delayed (:state job))
+ (dosync
+ (alter (:delay_set ((:tube job) @tubes)) disj job)
+ (clojalk.wal/write-job job false)
+ (set-job-as-ready job)))))
+
+;; Release an expired job set it as ready
+;;
+;; Since we won't cancel the task so we should check if the
+;; task is still valid before we actually run it.
+;;
+;; For this scenario, we should ensure:
+;;
+;; * the job has exceed its deadline. To prevent the deadline is
+;;override by another operation.
+;; * the state of job is still `:reserved`
+;;
+(defn- update-expired-job [job-id]
+ (if-let [job (job-id @jobs)]
+ (when (and (>= (current-time) (:deadline_at job)) (= :reserved (:state job)))
+ (let [session (:reserver job)
+ updated-job (assoc job :state :ready
+ :reserver nil
+ :timeouts (inc (:timeouts job)))]
+ (clojalk.wal/write-job updated-job false)
+ (dosync
+ (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
+ (alter job-timeouts inc)
+ (set-job-as-ready updated-job))))))
+
+;; Enable a paused tube
+;;
+(defn- update-paused-tube [tube]
+ (dosync
+ (ref-set (:paused tube) false)
+
+ ;; handle waiting session
+ (let [pending-pairs (zipmap @(:waiting_list tube) @(:ready_set tube))]
+ (doseq [s (keys pending-pairs)]
+ (reserve-job s (pending-pairs s))))))
+
+;; Reject a session that waiting for reservation
+;;
+(defn- update-expired-waiting-session [session]
+ (if (= :waiting (:state @session))
+ (dosync
+ (dequeue-waiting-session session)
+ (alter session assoc :state :idle))))
+
+
View
@@ -73,10 +73,9 @@
(defn- wrap-task [task]
(try task (catch Exception e (logging/warn "Exception caught on scheduled task" e))))
-(defn schedule-task [thread-pool-size & taskdefs]
- (let [scheduler (. Executors newScheduledThreadPool thread-pool-size)]
- (doseq [[task delay interval] taskdefs]
- (.scheduleWithFixedDelay scheduler (wrap-task task) delay interval (. TimeUnit SECONDS)))
- scheduler))
-
+(defonce compute-intensive-scheduler
+ (Executors/newScheduledThreadPool (* 2 (.availableProcessors (Runtime/getRuntime)))))
+(defn schedule [task delay]
+ (.schedule compute-intensive-scheduler
+ ^Runnable task ^long (long delay) ^TimeUnit TimeUnit/SECONDS))
@@ -208,6 +208,8 @@
(reserve session-w)
(reserve session-w)
(reserve session-w)
+
+ (sleep 1)
(is (= 1 (count @(:ready_set (:expire-task-test @tubes)))))

0 comments on commit 18f7e86

Please sign in to comment.