-
Notifications
You must be signed in to change notification settings - Fork 5
/
cronut.clj
155 lines (138 loc) · 5.79 KB
/
cronut.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
(ns troy-west.cronut
(:refer-clojure :exclude [proxy])
(:require [clojure.edn :as edn]
[clojure.tools.logging :as log]
[integrant.core :as ig])
(:import (org.quartz Scheduler Job SimpleScheduleBuilder JobExecutionException JobBuilder TriggerBuilder JobDetail CronScheduleBuilder)
(org.quartz.impl StdSchedulerFactory)
(org.quartz.spi JobFactory TriggerFiredBundle)
(java.util TimeZone)))
(defn base-trigger-builder
"Provide a base trigger-builder from configuration"
[{:keys [identity description start end priority]}]
(cond-> (TriggerBuilder/newTrigger)
(seq identity) (.withIdentity (first identity) (second identity))
description (.withDescription description)
start (.startAt start)
(nil? start) (.startNow)
end (.endAt end)
priority (.withPriority (int priority))))
(defn simple-schedule
"Provide a simple schedule from configuration"
[{:keys [interval time-unit repeat misfire]}]
(let [schedule (SimpleScheduleBuilder/simpleSchedule)]
(case time-unit
:millis (.withIntervalInMilliseconds schedule interval)
:seconds (.withIntervalInSeconds schedule interval)
:minutes (.withIntervalInMinutes schedule interval)
:hours (.withIntervalInHours schedule interval)
nil (when interval (.withIntervalInMilliseconds schedule interval)))
(case misfire
:fire-now (.withMisfireHandlingInstructionFireNow schedule)
:ignore (.withMisfireHandlingInstructionIgnoreMisfires schedule)
:next-existing (.withMisfireHandlingInstructionNextWithExistingCount schedule)
:next-remaining (.withMisfireHandlingInstructionNextWithRemainingCount schedule)
:now-existing (.withMisfireHandlingInstructionNowWithExistingCount schedule)
:now-remaining (.withMisfireHandlingInstructionNowWithRemainingCount schedule)
nil nil)
(cond
(number? repeat) (.withRepeatCount schedule repeat)
(= :forever repeat) (.repeatForever schedule))
schedule))
(defn cron-schedule
"Provide a cron schedule from configuration"
[{:keys [cron time-zone misfire]}]
(let [schedule (CronScheduleBuilder/cronSchedule ^String cron)]
(case misfire
:ignore (.withMisfireHandlingInstructionIgnoreMisfires schedule)
:do-nothing (.withMisfireHandlingInstructionDoNothing schedule)
:fire-and-proceed (.withMisfireHandlingInstructionFireAndProceed schedule)
nil nil)
(when time-zone
(.inTimeZone schedule (TimeZone/getTimeZone ^String time-zone)))
schedule))
(defmulti trigger-builder :type)
(defmethod trigger-builder :simple
[config]
(.withSchedule ^TriggerBuilder (base-trigger-builder config)
(simple-schedule config)))
(defmethod trigger-builder :cron
[config]
(.withSchedule ^TriggerBuilder (base-trigger-builder config)
(cron-schedule config)))
(defrecord ProxyJob [proxied-job]
Job
(execute [_ job-context]
(try
(.execute ^Job proxied-job job-context)
(catch JobExecutionException ex
(throw ex))
(catch Exception ex
(throw (JobExecutionException. ^Exception ex))))))
(defn job-factory
[scheduled]
(reify JobFactory
(newJob [_ bundle _]
(let [job-detail (.getJobDetail ^TriggerFiredBundle bundle)
job-key (.getKey job-detail)]
(->ProxyJob (get scheduled job-key))))))
(defn proxy
[job]
(let [{:keys [identity description recover? durable?]} job]
(.build (cond-> (.ofType (JobBuilder/newJob) ProxyJob)
(seq identity) (.withIdentity (first identity) (second identity))
description (.withDescription description)
(boolean? recover?) (.requestRecovery recover?)
(boolean? durable?) (.storeDurably durable?)))))
(defn activate
[^Scheduler scheduler schedule]
(.clear scheduler)
(loop [schedule schedule
scheduled {}
proxies {}]
(if-let [{:keys [job ^TriggerBuilder trigger]} (first schedule)]
(if-let [previously-scheduled ^JobDetail (get proxies job)]
(let [built (.build (.forJob trigger previously-scheduled))]
(log/info "scheduling new trigger for existing job" trigger previously-scheduled)
(.scheduleJob scheduler built)
(recur (rest schedule) scheduled proxies))
(let [proxy-detail ^JobDetail (proxy job)
job-key (.getKey proxy-detail)]
(log/info "scheduling new job" trigger proxy-detail)
(.scheduleJob scheduler proxy-detail (.build trigger))
(recur (rest schedule) (assoc scheduled job-key job) (assoc proxies job proxy-detail))))
(.setJobFactory scheduler (job-factory scheduled))))
(.start scheduler)
scheduler)
(defn initialize
[config]
(let [{:keys [schedule update-check?]} config]
(log/infof "initializing schedule of [%s] jobs" (count schedule))
(when-not update-check?
(System/setProperty "org.terracotta.quartz.skipUpdateCheck" "true")
(log/infof "with quartz update check disabled"))
(activate (StdSchedulerFactory/getDefaultScheduler) schedule)))
(defn shortcut-interval
"Trigger immediately, at an interval-ms, run forever (well that's optimistic but you get the idea)"
[interval-ms]
(trigger-builder {:type :simple
:interval interval-ms
:time-unit :millis
:repeat :forever}))
(defn shortcut-cron
[cron]
(trigger-builder {:type :cron
:cron cron}))
(defn shutdown
[scheduler]
(.shutdown ^Scheduler scheduler))
(defmethod ig/init-key :cronut/scheduler
[_ config]
(initialize config))
(defmethod ig/halt-key! :cronut/scheduler
[_ scheduler]
(shutdown scheduler))
(def data-readers
{'cronut/trigger troy-west.cronut/trigger-builder
'cronut/cron troy-west.cronut/shortcut-cron
'cronut/interval troy-west.cronut/shortcut-interval})