Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

initialize refactor on time based tasks

  • Loading branch information...
commit 62e0a1f689313546a547c37b63de2da76f3906d9 1 parent b25fbbc
@sunng87 authored
View
114 src/clojalk/core.clj
@@ -20,7 +20,8 @@
(ns clojalk.core
(:refer-clojure :exclude [use peek])
(:use [clojalk data utils])
- (:require [clojalk.wal]))
+ (:require [clojalk.wal])
+ (:import [java.util.concurrent Executors TimeUnit]))
@@ -100,6 +101,7 @@
(alter session assoc :reserved_jobs
(conj (:reserved_jobs @session) (:id updated-top-job)))
(clojalk.wal/write-job updated-top-job false)
+ (schedule #(update-expired-job (:id updated-top-job)) (:ttr job))
updated-top-job)))
;; Mark the job as ready. This is referenced when
@@ -497,83 +499,51 @@
:current-jobs-delayed (count delayed-jobs)
:current-jobs-buried (count buried-jobs)})))
-;; ## Schedule tasks for time based work
+;; ## Schedule tasks for time based task
;;
-;; Set delay jobs as ready when they are timeout.
-(defn- update-delay-job-for-tube [now tube]
+;; Update a delayed job and set it as ready.
+;;
+(defn- update-delayed-job [job]
(dosync
- (let [ready-jobs (filter #(< (:deadline_at %) now) @(:delay_set tube))
- updated-jobs (map #(assoc % :state :ready) ready-jobs)]
- (doseq [job updated-jobs]
- (alter (:delay_set tube) disj job)
- (clojalk.wal/write-job job false)
- (set-job-as-ready job)))))
-
-;; Loop all tubes to perform last function.
-(defn update-delay-job-task []
- (doseq [tube (vals @tubes)] (update-delay-job-for-tube (current-time) tube)))
-
-;; Release jobs that are exceed ttr
-(defn update-expired-job-task []
- (let [reserved-jobs (filter #(= :reserved (:state %)) (vals @jobs))
- now (current-time)
- expired-jobs (filter #(> now (:deadline_at %)) reserved-jobs)]
- (doseq [job expired-jobs]
- (let [session (:reserver job)
- updated-job (assoc job :state :ready
- :reserver nil
- :timeouts (inc (:timeouts job)))]
- (clojalk.wal/write-job updated-job false)
- (dosync
- (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
- (alter job-timeouts inc)
- (set-job-as-ready updated-job))))))
-
-;; Enable paused tubes when they are timeout
-(defn update-paused-tube-task []
- (let [all-tubes (vals @tubes)
- paused-tubes (filter #(true? @(:paused %)) all-tubes)
- now (current-time)
- expired-tubes (filter #(> now @(:pause_deadline %)) paused-tubes)]
- (doseq [t expired-tubes]
- (dosync
- (ref-set (:paused t) false)
-
- ;; handle waiting session
- (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
- (doseq [s (keys pending-pairs)]
- (reserve-job s (pending-pairs s))))))))
+ (alter (:delay_set tube) disj job)
+ (clojalk.wal/write-job job false)
+ (set-job-as-ready job)))
-;; Update waiting workers when they are expired
+;; Release an expired job set it as ready
;;
-;; we don't update deadline_at on this task,
-;; it will be updated next time when it's reserved
-(defn update-expired-waiting-session-task []
- (dosync
- (doseq
- [session (vals @sessions)]
- (if (= :waiting (:state @session))
- (let [now (current-time)
- deadline (:deadline_at @session)
- expired (and (not-nil deadline) (> now deadline))]
- (if (true? expired)
- (do
- (dequeue-waiting-session session)
- (alter session assoc :state :idle))))))))
+(defn- update-expired-job [job]
+ (let [session (:reserver job)
+ updated-job (assoc job :state :ready
+ :reserver nil
+ :timeouts (inc (:timeouts job)))]
+ (clojalk.wal/write-job updated-job false)
+ (dosync
+ (alter session assoc :reserved_jobs (disj (:reserved_jobs @session) (:id updated-job)))
+ (alter job-timeouts inc)
+ (set-job-as-ready updated-job))))
-;; Start all tasks mentioned above with 5 threads.
-;; Tasks are executed in fixed delay.
+;; Enable a paused tube
;;
-;; A scheduler will be returned.
-(defn start-tasks []
- (schedule-task 5
- [update-delay-job-task 0 1]
- [update-expired-job-task 0 1]
- [update-paused-tube-task 0 1]
- [update-expired-waiting-session-task 0 1]))
+(defn- update-paused-tube [tube]
+ (dosync
+ (ref-set (:paused t) false)
+
+ ;; handle waiting session
+ (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
+ (doseq [s (keys pending-pairs)]
+ (reserve-job s (pending-pairs s))))))
-;; Stop the scheduler
+;; Reject a session that waiting for reservation
;;
-(defn stop-tasks [scheduler]
- (.shutdownNow ^java.util.concurrent.ScheduledThreadPoolExecutor scheduler))
+(defn- update-expired-waiting-session [session]
+ (dosync
+ (dequeue-waiting-session session)
+ (alter session assoc :state :idle)))
+
+(defonce schduler
+ (Executors/newScheduledThreadPool (* 2 (.availableProcessors (Runtime/getRuntime)))))
+
+(defn shedule [task delay]
+ (.schedule scheduler task delay TimeUnit/SECONDS))
+
View
1  src/clojalk/main.clj
@@ -15,7 +15,6 @@
*clojalk-log-dir* (property props "wal.dir")
*clojalk-log-count* (as-int (property props "wal.files"))]
(if *clojalk-log-enabled* (start-wal)))
- (start-tasks)
(binding [*clojalk-port* (as-int (property props "server.port"))]
(start-server))
(start-jmx-server)
View
2  src/clojalk/utils.clj
@@ -78,3 +78,5 @@
(doseq [[task delay interval] taskdefs]
(.scheduleWithFixedDelay scheduler (wrap-task task) delay interval (. TimeUnit SECONDS)))
scheduler))
+
+
Please sign in to comment.
Something went wrong with that request. Please try again.