Permalink
Browse files

update tube data structure

  • Loading branch information...
sunng87 committed Sep 26, 2011
1 parent ed4ebfe commit b25fbbc9a3dd480cc5b376e33458592761379e03
Showing with 102 additions and 107 deletions.
  1. +49 −54 src/clojalk/core.clj
  2. +8 −8 src/clojalk/data.clj
  3. +1 −1 src/clojalk/jmx.clj
  4. +3 −3 src/clojalk/wal.clj
  5. +41 −41 test/clojalk/test/core.clj
View
@@ -38,19 +38,19 @@
(defn- top-ready-job [session]
(let [watchlist (:watch @session)
watch-tubes (filter not-nil (map #(get @tubes %) watchlist))
- watch-tubes (filter #(false? (:paused @%)) watch-tubes)
- top-jobs (filter not-nil (map #(first (:ready_set @%)) watch-tubes))]
+ watch-tubes (filter #(false? @(:paused %)) watch-tubes)
+ top-jobs (filter not-nil (map #(first @(:ready_set %)) watch-tubes))]
(first (apply sorted-set-by (conj top-jobs priority-comparator)))))
;; Append a session into waiting_list of all tubes it watches.
;; Also update *state* and *deadline_at* of the session.
;;
;; (This function does not open transaction so it should run within a dosync block)
(defn- enqueue-waiting-session [session timeout]
- (let [watch-tubes (filter #(contains? (:watch @session) (:name @%)) (vals @tubes))
+ (let [watch-tubes (filter #(contains? (:watch @session) (:name %)) (vals @tubes))
deadline_at (if (nil? timeout) nil (+ (current-time) (* timeout 1000)))]
(doseq [tube watch-tubes]
- (alter tube assoc :waiting_list (conj (:waiting_list @tube) session)))
+ (alter (:waiting_list tube) conj session))
(alter session assoc
:state :waiting
:deadline_at deadline_at)))
@@ -61,9 +61,9 @@
;;
;; (This function does not open transaction so it should run within a dosync block)
(defn- dequeue-waiting-session [session]
- (let [watch-tubes (filter #(contains? (:watch @session) (:name @%)) (vals @tubes))]
+ (let [watch-tubes (filter #(contains? (:watch @session) (:name %)) (vals @tubes))]
(doseq [tube watch-tubes]
- (alter tube assoc :waiting_list (into [] (remove-item (:waiting_list @tube) session))))
+ (alter (:waiting_list tube) #(into [] (remove-item % session))))
(alter session assoc :state :working)))
;; Reserve the job with the session. Steps:
@@ -93,7 +93,7 @@
:deadline_at deadline
:reserves (inc (:reserves job)))]
(do
- (alter tube assoc :ready_set (disj (:ready_set @tube) job))
+ (alter (:ready_set tube) disj job)
(alter jobs assoc (:id job) updated-top-job)
(dequeue-waiting-session session)
(alter session assoc :incoming_job updated-top-job)
@@ -119,8 +119,8 @@
(let [tube ((:tube job) @tubes)]
(do
(alter jobs assoc (:id job) (assoc job :state :ready))
- (alter tube assoc :ready_set (conj (:ready_set @tube) job))
- (if-let [s (first (:waiting_list @tube))]
+ (alter (:ready_set tube) conj job)
+ (if-let [s (first @(:waiting_list tube))]
(reserve-job s job)))))
;; Create a session and add it to the `sessions`
@@ -177,13 +177,13 @@
(defcommand "put" [session priority delay ttr body]
(if-not @drain
(let [tube ((:use @session) @tubes)
- job (make-job priority delay ttr (:name @tube) body)]
+ job (make-job priority delay ttr (:name tube) body)]
(do
(clojalk.wal/write-job job true)
(dosync
(case (:state job)
:delayed (do
- (alter tube assoc :delay_set (conj (:delay_set @tube) job))
+ (alter (:delay_set tube) conj job)
(alter jobs assoc (:id job) job))
:ready (set-job-as-ready job)))
job))))
@@ -197,18 +197,18 @@
;; using tube.
(defcommand "peek-ready" [session]
(let [tube ((:use @session) @tubes)]
- (first (:ready_set @tube))))
+ (first @(:ready_set tube))))
;; `peek-delayed` is also a producer task. The job which is nearest to deadline will
;; be peeked.
(defcommand "peek-delayed" [session]
(let [tube ((:use @session) @tubes)]
- (first (:delay_set @tube))))
+ (first @(:delay_set tube))))
;; `peek-buried` is another producer task. It will peek the first item in the buried list.
(defcommand "peek-buried" [session]
(let [tube ((:use @session) @tubes)]
- (first (:buried_list @tube))))
+ (first @(:buried_list tube))))
;; `reserve-with-timeout` is a worker task. It tries to reserve a job from its watching
;; tubes. If there is no job ready for reservation, it will wait at most `timeout`
@@ -264,11 +264,10 @@
(dosync
(alter jobs dissoc id)
(if (= (:state job) :buried)
- (alter tube assoc :buried_list
- (vec (remove-item (:buried_list @tube) job))))
+ (alter (:buried_list tube)
+ #(into [] (remove-item % job))))
(if (= (:state job) :ready)
- (alter tube assoc :ready_set
- (disj (:ready_set @tube) job)))
+ (alter (:ready_set tube) disj job))
(alter session assoc :incoming_job nil)
(alter session assoc :reserved_jobs
(disj (:reserved_jobs @session) (:id job)))
@@ -296,10 +295,8 @@
:releases (inc (:releases job)))]
(do
(dosync
- (if (> delay 0)
- (do
- (alter tube assoc :delay_set
- (conj (:delay_set @tube) (assoc updated-job :state :delayed))))
+ (if (> delay 0)
+ (alter (:delay_set tube) conj (assoc updated-job :state :delayed))
(set-job-as-ready (assoc updated-job :state :ready)))
(alter session assoc :incoming_job nil)
(alter session assoc :reserved_jobs
@@ -323,7 +320,7 @@
:buries (inc (:buries job)))]
(do
(dosync
- (alter tube assoc :buried_list (conj (:buried_list @tube) updated-job))
+ (alter (:buried_list tube) conj updated-job)
(alter jobs assoc (:id updated-job) updated-job)
(alter session assoc :incoming_job nil)
(alter session assoc :reserved_jobs
@@ -341,24 +338,24 @@
(defcommand "kick" [session bound]
(let [tube ((:use @session) @tubes)]
(dosync
- (if (empty? (:buried_list @tube))
+ (if (empty? @(:buried_list tube))
;; no jobs buried, kick from delay set
- (let [kicked (take bound (:delay_set @tube))
+ (let [kicked (take bound @(:delay_set tube))
updated-kicked (map #(assoc % :state :ready :kicks (inc (:kicks %))) kicked)
- remained (drop bound (:delay_set @tube))
+ remained (drop bound @(:delay_set tube))
remained-set (apply sorted-set-by delay-comparator remained)]
- (alter tube assoc :delay_set remained-set)
+ (ref-set (:delay_set tube) remained-set)
(doseq [job updated-kicked]
(clojalk.wal/write-job job false)
(set-job-as-ready job))
updated-kicked)
;; kick at most bound jobs from buried list
- (let [kicked (take bound (:buried_list @tube))
+ (let [kicked (take bound @(:buried_list tube))
updated-kicked (map #(assoc % :state :ready :kicks (inc (:kicks %))) kicked)
- remained (vec (drop bound (:buried_list @tube)))]
- (alter tube assoc :buried_list remained)
+ remained (vec (drop bound @(:buried_list tube)))]
+ (ref-set (:buried_list tube) remained)
(doseq [job updated-kicked]
(clojalk.wal/write-job job false)
(set-job-as-ready job))
@@ -416,9 +413,9 @@
(defcommand "pause-tube" [session id timeout]
(if-let [tube ((keyword id) @tubes)]
(dosync
- (alter tube assoc :paused true)
- (alter tube assoc :pause_deadline (+ (* timeout 1000) (current-time)))
- (alter tube assoc :pauses (inc (:pauses @tube))))))
+ (ref-set (:paused tube) true)
+ (ref-set (:pause_deadline tube) (+ (* timeout 1000) (current-time)))
+ (alter (:pauses tube) inc))))
;; stats command. Display some information of a job.
(defcommand "stats-job" [session id]
@@ -445,25 +442,25 @@
;; stats command. Display some information of a tube.
(defcommand "stats-tube" [session name]
(if-let [tube (get @tubes (keyword name))]
- (let [paused (:paused @tube)
+ (let [paused @(:paused tube)
now (current-time)
- pause-time-left (int (/ (- (:pause_deadline @tube) now) 1000))
+ pause-time-left (int (/ (- @(:pause_deadline tube) now) 1000))
pause-time-left (if paused pause-time-left 0)
- jobs-func #(= (:tube %) (:name @tube))
+ jobs-func #(= (:tube %) (:name tube))
jobs-of-tube (filter jobs-func (vals @jobs))
jobs-reserved (filter #(= (:state %) :reserved) jobs-of-tube)
- jobs-urgent (filter #(< (:priority %) 1024) (:ready_set @tube))]
- {:name (:name @tube)
+ jobs-urgent (filter #(< (:priority %) 1024) @(:ready_set tube))]
+ {:name (:name tube)
:current-jobs-urgent (count jobs-urgent)
- :current-jobs-ready (count (:ready_set @tube))
- :current-jobs-delayed (count (:delay_set @tube))
- :current-jobs-buried (count (:buried_list @tube))
+ :current-jobs-ready (count @(:ready_set tube))
+ :current-jobs-delayed (count @(:delay_set tube))
+ :current-jobs-buried (count @(:buried_list tube))
:current-jobs-reserved (count jobs-reserved)
:total-jobs (count jobs-of-tube)
- :current-waiting (count (:waiting_list @tube))
+ :current-waiting (count @(:waiting_list tube))
:current-using (count (filter #(= (keyword name) (:use @%)) (vals @sessions)))
- :pause (:paused @tube)
- :cmd-pause-tube (:pauses @tube)
+ :pause paused
+ :cmd-pause-tube @(:pauses tube)
:pause-time-left pause-time-left})))
;; stats command. Display server statistical data:
@@ -483,13 +480,12 @@
worker-sessions (filter #(= :worker (:type @%)) all-sessions)
waiting-sessions (filter #(= :waiting (:state @%)) worker-sessions)
producer-sessions (filter #(= :producer (:type @%)) all-sessions)
- all-tubes (vals @tubes)
commands-stats @commands
commands-stats-keys (keys commands-stats)]
; (dbg commands-stats)
(merge (zipmap commands-stats-keys (map #(deref (commands-stats %)) commands-stats-keys))
{:job-timeouts @job-timeouts
- :current-tubes (count all-tubes)
+ :current-tubes (count @tubes)
:current-connections (count all-sessions)
:current-producers (count producer-sessions)
:current-workers (count worker-sessions)
@@ -507,10 +503,10 @@
;; Set delay jobs as ready when they are timeout.
(defn- update-delay-job-for-tube [now tube]
(dosync
- (let [ready-jobs (filter #(< (:deadline_at %) now) (:delay_set @tube))
+ (let [ready-jobs (filter #(< (:deadline_at %) now) @(:delay_set tube))
updated-jobs (map #(assoc % :state :ready) ready-jobs)]
(doseq [job updated-jobs]
- (alter tube assoc :delay_set (disj (:delay_set @tube) job))
+ (alter (:delay_set tube) disj job)
(clojalk.wal/write-job job false)
(set-job-as-ready job)))))
@@ -524,8 +520,7 @@
now (current-time)
expired-jobs (filter #(> now (:deadline_at %)) reserved-jobs)]
(doseq [job expired-jobs]
- (let [tube ((:tube job) @tubes)
- session (:reserver job)
+ (let [session (:reserver job)
updated-job (assoc job :state :ready
:reserver nil
:timeouts (inc (:timeouts job)))]
@@ -538,15 +533,15 @@
;; Enable paused tubes when they are timeout
(defn update-paused-tube-task []
(let [all-tubes (vals @tubes)
- paused-tubes (filter #(true? (:paused @%)) all-tubes)
+ paused-tubes (filter #(true? @(:paused %)) all-tubes)
now (current-time)
- expired-tubes (filter #(> now (:pause_deadline @%)) paused-tubes)]
+ expired-tubes (filter #(> now @(:pause_deadline %)) paused-tubes)]
(doseq [t expired-tubes]
(dosync
- (alter t assoc :paused false)
+ (ref-set (:paused t) false)
;; handle waiting session
- (let [pending-pairs (zipmap (:waiting_list @t) (:ready_set @t))]
+ (let [pending-pairs (zipmap @(:waiting_list t) @(:ready_set t))]
(doseq [s (keys pending-pairs)]
(reserve-job s (pending-pairs s))))))))
View
@@ -89,14 +89,14 @@
;; Function to create an empty tube.
(defn make-tube [name]
- (ref (struct Tube (keyword name) ; name
- (sorted-set-by priority-comparator) ; ready_set
- (sorted-set-by delay-comparator) ; delay_set
- [] ; buried_list
- [] ; waiting queue
- false ; paused state
- -1 ; pause timeout
- 0))) ; pause command counter
+ (struct Tube (keyword name) ; name
+ (ref (sorted-set-by priority-comparator)) ; ready_set
+ (ref (sorted-set-by delay-comparator)) ; delay_set
+ (ref []) ; buried_list
+ (ref []) ; waiting queue
+ (ref false) ; paused state
+ (ref -1) ; pause timeout
+ (ref 0))) ; pause command counter
;; Default job id generator. We use an atomic integer to store id.
(defonce id-counter (atom (long 0)))
View
@@ -33,7 +33,7 @@
(def jmx-tube-bean
(new-mbean
(ref
- {:tubes (fn [] (into-string-array (map #(name (:name @%)) (vals @tubes))))
+ {:tubes (fn [] (into-string-array (map #(name (:name %)) (vals @tubes))))
})))
(def jmx-wal-bean
View
@@ -183,9 +183,9 @@
(alter clojalk.data/tubes assoc (:tube jr) (clojalk.data/make-tube (:tube jr)))))
(let [tube (@clojalk.data/tubes (:tube jr))]
(case (:state jr)
- :ready (alter tube assoc :ready_set (conj (:ready_set @tube) jr))
- :buried (alter tube assoc :buried_lsit (conj (:buried_lsit @tube) jr))
- :delayed (alter tube assoc :delay_set (conj (:delay_set @tube) jr))))))
+ :ready (alter (:ready_set tube) conj jr)
+ :buried (alter (:buried_list tube) conj jr)
+ :delayed (alter (:delay_set tube) conj jr)))))
;; Update id counter after all job records are loaded from logs
;;
Oops, something went wrong.

0 comments on commit b25fbbc

Please sign in to comment.