-
Notifications
You must be signed in to change notification settings - Fork 15
/
scheduler_core.clj
158 lines (145 loc) · 6.71 KB
/
scheduler_core.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
(ns puppetlabs.trapperkeeper.services.scheduler.scheduler-core
(:require [clojure.tools.logging :as log]
[puppetlabs.i18n.core :as i18n]
[puppetlabs.kitchensink.core :as ks])
(:import (org.quartz.impl.matchers GroupMatcher)
(org.quartz.impl StdSchedulerFactory SchedulerRepository)
(org.quartz JobBuilder SimpleScheduleBuilder TriggerBuilder DateBuilder
DateBuilder$IntervalUnit Scheduler JobKey SchedulerException JobDataMap)
(org.quartz.utils Key)
(java.util Date)))
(def shutdown-timeout-sec 30)
(defn create-scheduler
"Creates and returns a thread pool which can be used for scheduling jobs."
[thread-count]
;; without the following property set, quartz does a version check automatically
(System/setProperty "org.quartz.scheduler.skipUpdateCheck" "true")
(System/setProperty "org.quartz.threadPool.threadCount" (str thread-count))
(let [scheduler (StdSchedulerFactory/getDefaultScheduler)]
(.start scheduler)
scheduler))
(defn build-executable-job
([f job-name group-name] (build-executable-job f job-name group-name {}))
([f job-name group-name options]
(let [jdm (JobDataMap.)
options (assoc options :job f)]
(.put jdm "jobData" options)
(-> (JobBuilder/newJob puppetlabs.trapperkeeper.services.scheduler.job)
(.withIdentity job-name group-name)
(.usingJobData jdm)
(.build)))))
(defn interspaced
[n f ^Scheduler scheduler group-name]
(try
(let [job-name (Key/createUniqueName group-name)
job (build-executable-job f job-name group-name {:interspaced n})
schedule (SimpleScheduleBuilder/simpleSchedule)
trigger (-> (TriggerBuilder/newTrigger)
(.withSchedule schedule)
(.startNow)
(.build))]
(.scheduleJob scheduler job trigger)
(.getJobKey trigger))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/error e (i18n/trs "Failed to schedule job")))))
(defn after
[n f ^Scheduler scheduler group-name]
(try
(let [job-name (Key/createUniqueName group-name)
job (build-executable-job f job-name group-name)
future-date (Date. ^Long (+ (System/currentTimeMillis) n))
trigger (-> (TriggerBuilder/newTrigger)
(.startAt future-date)
(.build))]
(.scheduleJob scheduler job trigger)
(.getJobKey trigger))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/error e (i18n/trs "Failed to schedule job")))))
(defn interval
[^Scheduler scheduler repeat-delay f group-name]
(try
(let [job-name (Key/createUniqueName group-name)
job (build-executable-job f job-name group-name {:interval repeat-delay})
schedule (-> (SimpleScheduleBuilder/simpleSchedule)
(.withIntervalInMilliseconds repeat-delay)
(.withMisfireHandlingInstructionNextWithRemainingCount)
(.repeatForever))
trigger (-> (TriggerBuilder/newTrigger)
(.withSchedule schedule)
(.startNow)
(.build))]
(.scheduleJob scheduler job trigger)
(.getJobKey trigger))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/error e (i18n/trs "Failed to schedule job")))))
(defn interval-after
[^Scheduler scheduler initial-delay repeat-delay f group-name]
(try
(let [job-name (Key/createUniqueName group-name)
job (build-executable-job f job-name group-name {:interval repeat-delay})
schedule (-> (SimpleScheduleBuilder/simpleSchedule)
(.withIntervalInMilliseconds repeat-delay)
(.withMisfireHandlingInstructionNextWithRemainingCount)
(.repeatForever))
future-date (Date. ^Long (+ (System/currentTimeMillis) initial-delay))
trigger (-> (TriggerBuilder/newTrigger)
(.withSchedule schedule)
(.startAt future-date)
(.build))]
(.scheduleJob scheduler job trigger)
(.getJobKey trigger))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/error e (i18n/trs "Failed to schedule job")))))
(defn stop-job
"Returns true, if the job was deleted, and false if the job wasn't found."
[^JobKey id ^Scheduler scheduler]
(try
(.deleteJob scheduler id)
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/debug e (i18n/trs "Failure stopping job"))
false)))
(defn get-all-jobs
[^Scheduler scheduler]
(try
(let [groups (seq (.getJobGroupNames scheduler))
extract-keys (fn [group-name] (seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-name))))]
(mapcat extract-keys groups))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/debug e (i18n/trs "Failure getting all jobs"))
[])))
(defn stop-all-jobs!
[^Scheduler scheduler]
(when-not (.isShutdown scheduler)
(try
(let [sr (SchedulerRepository/getInstance)
scheduler-name (.getSchedulerName scheduler)]
(doseq [job (get-all-jobs scheduler)]
(try
(.interrupt scheduler job)
(.deleteJob scheduler job)
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/debug e (i18n/trs "Failure stopping job")))))
(when (= :timeout (ks/with-timeout shutdown-timeout-sec :timeout (.shutdown scheduler true)))
(log/info (i18n/trs "Failed to shutdown schedule service in {0} seconds" shutdown-timeout-sec))
(.shutdown scheduler))
; explicitly remove the scheduler from the registry to prevent leaks. This can happen if the
; jobs don't terminate immediately
(.remove sr scheduler-name))
(catch SchedulerException e
; this can occur if the interface is being used while the scheduler is shutdown
(log/debug e (i18n/trs "Failure stopping all jobs"))))))
(defn get-jobs-in-group
[^Scheduler scheduler group-id]
(try
(seq (.getJobKeys scheduler (GroupMatcher/jobGroupEquals group-id)))
(catch SchedulerException e
; this can occur if the function is called when the scheduler is shutdown
(log/debug e (i18n/trs "Failure getting jobs in group"))
[])))