Skip to content

Commit

Permalink
Schedule even for past starting time.
Browse files Browse the repository at this point in the history
  • Loading branch information
kul committed Jan 20, 2015
1 parent 94dc8d7 commit 8796bf8
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
34 changes: 25 additions & 9 deletions src/nolan/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
"Repeating Time Interval regex."
#"^R(\d*)/(.*)?/(P.*)?$")

(def ^:dynamic *pool-delta* 40)

(defn parse-schedule [s]
"Parse a limited subset of ISO8601 repeating time interval to be used for
scheduling jobs."
Expand All @@ -42,18 +44,35 @@
#(recuring-schedule scheduler counter period entity id) spool))
(.execute scheduler entity)
(when (= period :tail)
(at/at (+ 10 (at/now))
(at/at (+ *pool-delta* (at/now))
#(recuring-schedule scheduler counter period entity id) spool)))
; Scheduling completed
(.expire scheduler id))
; Logging is must as it is not possible to throw exceptions back to user api
(catch Exception e (log/error e))))

(defn next-time
"Gives the next execution time for a schedules's `start` time and `period`."
[start-date period]
(let [start (.getMillis start-date)
now (at/now)]
(cond
(> start now) start
(= period :tail) (+ *pool-delta* (at/now))
:default
(let [period (.getMillis (.toDurationFrom period start-date))
times (quot (- now start) period)
nxt (+ start (* period times))]
(if (< (Math/abs (- nxt now)) *pool-delta*)
(+ *pool-delta* (at/now)) (+ nxt period))))))

(defn- start-scheduling
[scheduler iso-str entity id]
(let [[rept start period] (parse-schedule iso-str)
counter (atom rept)]
(at/at (if (= start :now) (+ 10 (at/now)) (.getMillis start))
(at/at (if (= start :now)
(+ *pool-delta* (at/now))
(next-time start period))
#(recuring-schedule scheduler counter period entity id) spool)))

;; User API {{{1
Expand Down Expand Up @@ -89,19 +108,16 @@

;; Testing {{{1
(comment
(def rti "R1/2014-11-28T12:18:00/")
(java.util.Date. (System/currentTimeMillis))
(java.util.Date. (.getMillis (DateTime.)))
(java.util.Date.)
(DateTime. (at/now))
(= (System/currentTimeMillis) (.getMillis (DateTime.)))
(def rti "R1/2015-01-20T13:59:00/")
(/ (- (.getMillis (second (parse-schedule rti))) (at/now)) 1000.0)
(parse-schedule "R5//PT10S")

(def sc (get-mem-scheduler))
(def scid (add-schedule sc rti #(log/info "OK")))
(def scid1 (add-schedule sc "R//PT1S" #(log/info "OK")))
(expire sc scid)

(def sc (get-mem-scheduler {1 {:id 1 :iso-str "R//PT1S" :entity #(log/info "KO")}}))
(boot sc)
(expire sc 1))
(expire sc 1)
)
32 changes: 29 additions & 3 deletions test/nolan/core_test.clj
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
(ns nolan.core-test
(:require [clojure.test :refer :all]
[nolan.core :refer :all]
[overtone.at-at :refer [now]]))
[overtone.at-at :refer [now]])
(:import [org.joda.time Period DateTime]))

(defn- on-complete
[sc scid timeout func]
(let [start (now)]
(if (> (- (now) start) timeout)
(throw (Exception. "Completion timed out."))
(if-let [completed? (expired? sc scid)]
(if (expired? sc scid)
(func)
(do
(Thread/sleep 10)
(recur sc scid timeout func))))))

(deftest parsing-test
(is (= (parse-schedule "R//") [Integer/MAX_VALUE :now :tail])
"Repeat forever tailing.")
(is (= (parse-schedule "R10//") [10 :now :tail])
"Repeat 10 times tailing.")
(is (= (parse-schedule "R10//PT10S") [10 :now (Period. 10000)])
"Repeat 10 times tailing.")
(let [sch "R/1991-11-11T16:12:00/PT1S"
start (now)]
(is (nil? (doseq [i (range 1000)]
(assert (as-> (apply next-time (rest (parse-schedule sch))) $
(<= start $)))))
"Next schedule should always be in the future."))
(let [sch "R/1991-11-11T16:12:00/PT1S"]
(is (as-> (apply next-time (rest (parse-schedule "R/1991-11-11T16:12:00/"))) $
(- $ (now)) (<= $ *pool-delta*))
"Tailing schedules sould be started almost immedately.")))

(deftest scheduling-test
(let [sc (get-mem-scheduler)
c (atom 0)]
Expand All @@ -30,4 +49,11 @@
(is (on-complete sc (add-schedule sc 1 "R//" #(if (= @c 10) (expire sc 1) (swap! c inc))) 1000
#(expired? sc 1))
"Expiring schedule should work.")
(is (= @c 10) "Tailing executions should work from above.")))
(is (= @c 10) "Tailing executions should work from above.")
(reset! c 0)
(is (= (on-complete sc (add-schedule
sc (format "R10/%s/" (DateTime. (- (now) 100000)))
#(swap! c inc))
1000 #(deref c))
10)
"Schedule will run even for a past starting time.")))

0 comments on commit 8796bf8

Please sign in to comment.