/
backend.py
483 lines (365 loc) · 17 KB
/
backend.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
# -*- coding: utf-8 -*-
"""
Copyright (C) 2019, Zato Source s.r.o. https://zato.io
Licensed under LGPLv3, see LICENSE.txt for terms and conditions.
"""
from __future__ import absolute_import, division, print_function, unicode_literals
# stdlib
import datetime
from logging import getLogger
from traceback import format_exc
# datetime
from dateutil.rrule import rrule, SECONDLY
# gevent
import gevent # Imported directly so it can be mocked out in tests
from gevent import lock, sleep
# paodate
from paodate import Delta
# Python 2/3 compatibility
from future.utils import iterkeys, itervalues
# Zato
from zato.common import SCHEDULER
from zato.common.util import add_scheduler_jobs, add_startup_jobs, asbool, make_repr, new_cid, spawn_greenlet
# ################################################################################################################################
logger = getLogger(__name__)
# ################################################################################################################################
initial_sleep = 30
# ################################################################################################################################
class Interval(object):
def __init__(self, days=0, hours=0, minutes=0, seconds=0, in_seconds=0):
self.days = days
self.hours = hours
self.minutes = minutes
self.seconds = seconds
self.in_seconds = in_seconds or self.get_in_seconds()
def __str__(self):
return make_repr(self)
__repr__ = __str__
def get_in_seconds(self):
return Delta(days=self.days, hours=self.hours, minutes=self.minutes, seconds=self.seconds).total_seconds
# ################################################################################################################################
class Job(object):
def __init__(self, id, name, type, interval, start_time=None, callback=None, cb_kwargs=None, max_repeats=None,
on_max_repeats_reached_cb=None, is_active=True, clone_start_time=False, cron_definition=None, old_name=None):
self.id = id
self.name = name
self.type = type
self.interval = interval
self.callback = callback
self.cb_kwargs = cb_kwargs or {}
self.max_repeats = max_repeats
self.on_max_repeats_reached_cb = on_max_repeats_reached_cb
self.is_active = is_active
self.cron_definition = cron_definition
# This is used by the edit action to be able to discern if an edit did not include a rename
self.old_name = old_name
self.current_run = 0 # Starts over each time scheduler is started
self.max_repeats_reached = False
self.max_repeats_reached_at = None
self.keep_running = True
if clone_start_time:
self.start_time = start_time
elif self.type == SCHEDULER.JOB_TYPE.CRON_STYLE:
now = datetime.datetime.utcnow()
self.start_time = now + datetime.timedelta(seconds=(self.get_sleep_time(now)))
else:
self.start_time = self.get_start_time(start_time if start_time is not None else datetime.datetime.utcnow())
self.wait_sleep_time = 1
self.wait_iter_cb = None
self.wait_iter_cb_args = ()
# TODO: Add skip_days, skip_hours and skip_dates
def __str__(self):
return make_repr(self)
def __hash__(self):
return hash(self.name)
def __eq__(self, other):
return self.name == other.name
def __lt__(self, other):
return self.name < other.name
def clone(self, name=None, is_active=None):
# It will not be None if an edit changed it from True to False or the other way around
is_active = is_active if is_active is not None else self.is_active
return Job(self.id, self.name, self.type, self.interval, self.start_time, self.callback, self.cb_kwargs,
self.max_repeats, self.on_max_repeats_reached_cb, is_active, True, self.cron_definition)
def get_start_time(self, start_time):
""" Converts initial start time to the time the job should be invoked next.
For instance, assume the scheduler has just been started. Given this job config ..
- start_time: 2019-11-23 13:15:17
- interval: 90 seconds
- now: 2019-11-23 17:32:44
.. a basic approach is to add 90 seconds to now and schedule the job. This would even
work for jobs that have very short intervals when no one usually cares that much if a job
is 15 seconds off or not.
However, consider this series of events ..
- start_time: 2019-11-23 13:00:00
- interval: 86400 seconds (1 day)
- the job is started
- at 2019-11-23 21:15:00 the scheduler is stopped and started again
.. now we don't want for the scheduler to start the job at 21:15:00 with an interval of one day,
the job should rather wait till the next day so that the computed start_time should in fact be 2019-11-24 13:00:00.
"""
# We have several scenarios to handle assuming that first_run_time = start_time + interval
#
# 1) first_run_time > now
# 2a) first_run_time <= now and first_run_time + interval_in_seconds > now
# 2b) first_run_time <= now and first_run_time + interval_in_seconds <= now
#
# 1) is quick - start_time simply becomes first_run_time
# 2a) means we already seen some executions of this job and there's still at least one in the future
# 2b) means we already seen some executions of this job and it won't be run in the future anymore
now = datetime.datetime.utcnow()
interval = datetime.timedelta(seconds=self.interval.in_seconds)
if start_time > now:
return start_time
first_run_time = start_time + interval
if first_run_time > now:
return first_run_time
else:
runs = rrule(SECONDLY, interval=int(self.interval.in_seconds), dtstart=start_time, count=self.max_repeats)
last_run_time = runs.before(now)
next_run_time = last_run_time + interval
if next_run_time >= now:
return next_run_time
# The assumption here is that all one-time jobs are always active at the instant we evaluate them here.
elif next_run_time < now and self.type == SCHEDULER.JOB_TYPE.ONE_TIME and self.is_active:
# The delay is 100% arbitrary
return now + datetime.timedelta(seconds=10)
else:
# We must have already run out of iterations
self.max_repeats_reached = True
self.max_repeats_reached_at = next_run_time
self.keep_running = False
logger.warn(
'Cannot compute start_time. Job `%s` max repeats reached at `%s` (UTC)',
self.name, self.max_repeats_reached_at)
def get_context(self):
ctx = {
'cid':new_cid(),
'start_time': self.start_time.isoformat(),
'cb_kwargs': self.cb_kwargs
}
if self.type == SCHEDULER.JOB_TYPE.CRON_STYLE:
ctx['cron_definition'] = self.cron_definition
else:
ctx['interval_in_seconds'] = self.interval.in_seconds
for name in 'id', 'name', 'current_run', 'max_repeats_reached', 'max_repeats', 'type':
ctx[name] = getattr(self, name)
return ctx
def get_sleep_time(self, now):
""" Returns a number of seconds the job should sleep for before the next run.
For interval-based jobs this is a constant value pre-computed well ahead by self.interval
but for cron-style jobs the value is obtained each time it's needed.
"""
if self.type == SCHEDULER.JOB_TYPE.INTERVAL_BASED:
return self.interval.in_seconds
elif self.type == SCHEDULER.JOB_TYPE.CRON_STYLE:
return self.interval.next(now)
else:
raise ValueError('Unsupported job type `{}` ({})'.format(self.type, self.name))
def _spawn(self, *args, **kwargs):
""" A thin wrapper so that it is easier to mock this method out in unit-tests.
"""
return spawn_greenlet(*args, **kwargs)
def main_loop(self):
logger.info('Job entering main loop `%s`', self)
_sleep = gevent.sleep
try:
while self.keep_running:
try:
self.current_run += 1
# Perhaps we've already been executed enough times
if self.max_repeats and self.current_run == self.max_repeats:
self.keep_running = False
self.max_repeats_reached = True
self.max_repeats_reached_at = datetime.datetime.utcnow()
if self.on_max_repeats_reached_cb:
self.on_max_repeats_reached_cb(self)
# Invoke callback in a new greenlet so it doesn't block the current one.
self._spawn(self.callback, **{'ctx':self.get_context()})
except Exception:
logger.warn(format_exc())
finally:
# Pause the greenlet for however long is needed if it is not a one-off job
if self.type == SCHEDULER.JOB_TYPE.ONE_TIME:
return True
else:
_sleep(self.get_sleep_time(datetime.datetime.utcnow()))
logger.info('Job leaving main loop `%s` after %d iterations', self, self.current_run)
except Exception:
logger.warn(format_exc())
return True
def run(self):
# OK, we're ready
try:
if not self.start_time:
logger.warn('Job `%s` cannot start without start_time set', self.name)
return
logger.info('Job starting `%s`', self)
_utcnow = datetime.datetime.utcnow
_sleep = gevent.sleep
# If the job has a start time in the future, sleep until it's ready to go.
now = _utcnow()
while self.start_time > now:
_sleep(self.wait_sleep_time)
if self.wait_iter_cb:
self.wait_iter_cb(self.start_time, now, *self.wait_iter_cb_args)
now = _utcnow()
self.main_loop()
except Exception:
logger.warn(format_exc())
# ################################################################################################################################
class Scheduler(object):
def __init__(self, config, api):
self.config = config
self.api = api
self.on_job_executed_cb = config.on_job_executed_cb
self.startup_jobs = config.startup_jobs
self.odb = config.odb
self.jobs = {}
self.job_greenlets = {}
self.keep_running = True
self.lock = lock.RLock()
self.sleep_time = 0.1
self.iter_cb = None
self.iter_cb_args = ()
self.ready = False
self._add_startup_jobs = config._add_startup_jobs
self._add_scheduler_jobs = config._add_scheduler_jobs
self.job_log = getattr(logger, config.job_log_level)
def on_max_repeats_reached(self, job):
with self.lock:
job.is_active = False
def _create(self, job, spawn=True):
""" Actually creates a job. Must be called with self.lock held.
"""
try:
self.jobs[job.name] = job
if job.is_active:
if spawn:
self.spawn_job(job)
self.job_log('Job scheduled `%s` (%s, start: %s UTC)', job.name, job.type, job.start_time)
else:
logger.debug('Skipping inactive job `%s`', job)
except Exception:
logger.warn(format_exc())
def create(self, *args, **kwargs):
with self.lock:
self._create(*args, **kwargs)
def edit(self, job):
""" Edits a job - this means an already existing job is unscheduled and created again,
i.e. it's not an in-place update.
"""
with self.lock:
self._unschedule_stop(job, '(src:edit)')
self._create(job.clone(job.is_active), True)
def _unschedule(self, job):
""" Actually unschedules a job. Must be called with self.lock held.
"""
# The job could have been renamed so we need to unschedule it by the previous name, if there is one
name = job.old_name if job.old_name else job.name
found = False
job.keep_running = False
if name in iterkeys(self.jobs):
del self.jobs[name]
found = True
if name in iterkeys(self.job_greenlets):
self.job_greenlets[name].kill(block=False, timeout=2.0)
del self.job_greenlets[name]
found = True
return found
def _unschedule_stop(self, job, message):
""" API for job deletion and stopping. Must be called with a self.lock held.
"""
if self._unschedule(job):
name = job.old_name if job.old_name else job.name
logger.info('Unscheduled %s job %s `%s`', job.type, name, message)
else:
logger.debug('Job not found `%s`', job)
def unschedule(self, job):
""" Deletes a job.
"""
with self.lock:
self._unschedule_stop(job, '(src:unschedule)')
def unschedule_by_name(self, name):
""" Deletes a job by its name.
"""
_job = None
with self.lock:
for job in itervalues(self.jobs):
if job.name == name:
_job = job
break
# We can't do it with self.lock because deleting changes the set = RuntimeError
if _job:
self.unschedule(job)
def stop_job(self, job):
""" Stops a job by deleting it.
"""
with self.lock:
self._unschedule_stop(job, 'stopped')
def stop(self):
""" Stops all jobs and the scheduler itself.
"""
with self.lock:
jobs = sorted(self.jobs)
for job in jobs:
self._unschedule_stop(job.clone(), 'stopped')
def sleep(self, value):
""" A method introduced so the class is easier to mock out in tests.
"""
gevent.sleep(value)
def execute(self, name):
""" Executes a job no matter if it's active or not. One-time job are not unscheduled afterwards.
"""
with self.lock:
for job in itervalues(self.jobs):
if job.name == name:
self.on_job_executed(job.get_context(), False)
break
else:
logger.warn('No such job `%s` in `%s`', name, [elem.get_context() for elem in itervalues(self.jobs)])
def on_job_executed(self, ctx, unschedule_one_time=True):
logger.debug('Executing `%s`, `%s`', ctx['name'], ctx)
self.on_job_executed_cb(ctx)
self.job_log('Job executed `%s`, `%s`', ctx['name'], ctx)
if ctx['type'] == SCHEDULER.JOB_TYPE.ONE_TIME and unschedule_one_time:
self.unschedule_by_name(ctx['name'])
def _spawn(self, *args, **kwargs):
""" As in the Job class, this is a thin wrapper so that it is easier to mock this method out in unit-tests.
"""
return spawn_greenlet(*args, **kwargs)
def spawn_job(self, job):
""" Spawns a job's greenlet. Must be called with self.lock held.
"""
job.callback = self.on_job_executed
job.on_max_repeats_reached_cb = self.on_max_repeats_reached
self.job_greenlets[job.name] = self._spawn(job.run)
def init_jobs(self):
sleep(initial_sleep) # To make sure that at least one server is running if the environment was started from quickstart scripts
cluster_conf = self.config.main.cluster
add_startup_jobs(cluster_conf.id, self.odb, self.startup_jobs, asbool(cluster_conf.stats_enabled))
# Actually start jobs now, including any added above
if self._add_scheduler_jobs:
add_scheduler_jobs(self.api, self.odb, self.config.main.cluster.id, spawn=False)
def run(self):
try:
logger.info('Scheduler will start to execute jobs in %d seconds', initial_sleep)
# Add default jobs to the ODB and start all of them, the default and user-defined ones
self.init_jobs()
_sleep = self.sleep
_sleep_time = self.sleep_time
with self.lock:
for job in sorted(itervalues(self.jobs)):
if job.max_repeats_reached:
logger.info('Job `%s` already reached max runs count (%s UTC)', job.name, job.max_repeats_reached_at)
else:
self.spawn_job(job)
# Ok, we're good now.
self.ready = True
logger.info('Scheduler started')
while self.keep_running:
_sleep(_sleep_time)
if self.iter_cb:
self.iter_cb(*self.iter_cb_args)
except Exception:
logger.warn(format_exc())