-
Notifications
You must be signed in to change notification settings - Fork 76
/
ScheduleManager.java
284 lines (250 loc) · 8.51 KB
/
ScheduleManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
package azkaban.scheduler;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.log4j.Logger;
import org.joda.time.DateTime;
import org.joda.time.ReadablePeriod;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormat;
import azkaban.jobs.JobExecutionException;
import azkaban.jobs.JobExecutorManager;
/**
* The ScheduleManager stores and executes the schedule. It uses a single thread instead
* and waits until correct loading time for the job. It will not remove the job from the
* schedule when it is run, which can potentially allow the job to and overlap each other.
*
* @author Richard
*/
public class ScheduleManager {
private static Logger logger = Logger.getLogger(ScheduleManager.class);
private final DateTimeFormatter _dateFormat = DateTimeFormat.forPattern("MM-dd-yyyy HH:mm:ss:SSS");
private ScheduleLoader loader;
private Map<String, ScheduledJob> scheduleIDMap = new LinkedHashMap<String, ScheduledJob>();
private final ScheduleRunner runner;
private final JobExecutorManager executionManager;
/**
* Give the schedule manager a loader class that will properly load the schedule.
*
* @param loader
*/
public ScheduleManager(
JobExecutorManager executionManager,
ScheduleLoader loader)
{
this.executionManager = executionManager;
this.loader = loader;
this.runner = new ScheduleRunner();
List<ScheduledJob> scheduleList = loader.loadSchedule();
for (ScheduledJob job: scheduleList) {
internalSchedule(job);
}
this.runner.start();
}
/**
* Shutdowns the scheduler thread. After shutdown, it may not be safe to use it again.
*/
public void shutdown() {
this.runner.shutdown();
}
/**
* Retrieves a copy of the list of schedules.
*
* @return
*/
public synchronized List<ScheduledJob> getSchedule() {
return runner.getSchedule();
}
/**
* Returns the scheduled job for the job name
*
* @param id
* @return
*/
public ScheduledJob getSchedule(String id) {
return scheduleIDMap.get(id);
}
/**
* Removes the job from the schedule if it exists.
*
* @param id
*/
public synchronized void removeScheduledJob(String id) {
ScheduledJob job = scheduleIDMap.get(id);
scheduleIDMap.remove(id);
runner.removeScheduledJob(job);
loader.saveSchedule(getSchedule());
}
public void schedule(String jobId,
DateTime dateTime,
ReadablePeriod period,
boolean ignoreDep) {
logger.info("Scheduling job '" + jobId + "' for " + _dateFormat.print(dateTime)
+ " with a period of " + PeriodFormat.getDefault().print(period));
schedule(new ScheduledJob(jobId, dateTime, period, ignoreDep));
}
/**
* Schedule the job
* @param jobId
* @param date
* @param ignoreDep
*/
public void schedule(String jobId, DateTime date, boolean ignoreDep) {
logger.info("Scheduling job '" + jobId + "' for " + _dateFormat.print(date));
schedule(new ScheduledJob(jobId, date, ignoreDep));
}
/**
* Schedules the job, but doesn't save the schedule afterwards.
* @param job
*/
private synchronized void internalSchedule(ScheduledJob job) {
ScheduledJob existing = scheduleIDMap.get(job.getId());
job.updateTime();
if (existing != null) {
this.runner.removeScheduledJob(existing);
}
this.runner.addScheduledJob(job);
scheduleIDMap.put(job.getId(), job);
}
/**
* Adds a job to the schedule.
*
* @param job
*/
public synchronized void schedule(ScheduledJob job) {
internalSchedule(job);
saveSchedule();
}
/**
* Save the schedule
*/
private void saveSchedule() {
loader.saveSchedule(getSchedule());
}
/**
* Thread that simply invokes the running of jobs when the schedule is
* ready.
*
* @author Richard Park
*
*/
public class ScheduleRunner extends Thread {
private final PriorityBlockingQueue<ScheduledJob> schedule;
private AtomicBoolean stillAlive = new AtomicBoolean(true);
// Five minute minimum intervals
private static final int TIMEOUT_MS = 300000;
public ScheduleRunner() {
schedule = new PriorityBlockingQueue<ScheduledJob>(1, new ScheduleComparator());
}
public void shutdown() {
logger.error("Shutting down scheduler thread");
stillAlive.set(false);
this.interrupt();
}
/**
* Return a list of scheduled job
* @return
*/
public synchronized List<ScheduledJob> getSchedule() {
return new ArrayList<ScheduledJob>(schedule);
}
/**
* Adds the job to the schedule and then interrupts so it will update its wait time.
* @param job
*/
public synchronized void addScheduledJob(ScheduledJob job) {
logger.info("Adding " + job + " to schedule.");
schedule.add(job);
this.interrupt();
}
/**
* Remove scheduled jobs. Does not interrupt.
*
* @param job
*/
public synchronized void removeScheduledJob(ScheduledJob job) {
logger.info("Removing " + job + " from the schedule.");
schedule.remove(job);
// Don't need to interrupt, because if this is originally on the top of the queue,
// it'll just skip it.
}
public void run() {
while(stillAlive.get()) {
synchronized (this) {
try {
ScheduledJob job = schedule.peek();
if (job == null) {
// If null, wake up every minute or so to see if there's something to do.
// Most likely there will not be.
try {
this.wait(TIMEOUT_MS);
} catch (InterruptedException e) {
// interruption should occur when items are added or removed from the queue.
}
}
else {
// We've passed the job execution time, so we will run.
if (!job.getScheduledExecution().isAfterNow()) {
// Run job. The invocation of jobs should be quick.
ScheduledJob runningJob = schedule.poll();
logger.info("Scheduler attempting to run " + runningJob.getId());
// Execute the job here
try {
executionManager.execute(runningJob.getId(), runningJob.isDependencyIgnored());
} catch (JobExecutionException e) {
logger.info("Could not run job. " + e.getMessage());
}
schedule.remove(job);
// Immediately reschedule if it's possible. Let the execution manager
// handle any duplicate runs.
if (runningJob.updateTime()) {
schedule.add(runningJob);
}
saveSchedule();
}
else {
// wait until job run
long millisWait = Math.max(0, job.getScheduledExecution().getMillis() - (new DateTime()).getMillis());
try {
this.wait(Math.min(millisWait, TIMEOUT_MS));
} catch (InterruptedException e) {
// interruption should occur when items are added or removed from the queue.
}
}
}
}
catch (Exception e) {
logger.error("Unexpected exception has been thrown in scheduler", e);
}
catch (Throwable e) {
logger.error("Unexpected throwable has been thrown in scheduler", e);
}
}
}
}
/**
* Class to sort the schedule based on time.
*
* @author Richard Park
*/
private class ScheduleComparator implements Comparator<ScheduledJob>{
@Override
public int compare(ScheduledJob arg0, ScheduledJob arg1) {
DateTime first = arg1.getScheduledExecution();
DateTime second = arg0.getScheduledExecution();
if (first.isEqual(second)) {
return 0;
}
else if (first.isBefore(second)) {
return 1;
}
return -1;
}
}
}
}