/
crontabs.py
224 lines (189 loc) · 7.82 KB
/
crontabs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
"""
Module for manageing crontabs interface
"""
import datetime
import time
import traceback
import daiquiri
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from fleming import fleming
from .processes import ProcessMonitor, wrapped_target
import logging
daiquiri.setup(level=logging.INFO)
class Cron:
def __init__(self):
"""
A Cron object runs many "tabs" of asynchronous tasks.
"""
self.monitor = ProcessMonitor()
self._tab_list = []
def schedule(self, *tabs):
self._tab_list = list(tabs)
# Give every tab access to the process monitor
for tab in self._tab_list:
tab.monitor = self.monitor
return self
def go(self, max_seconds=None):
for tab in self._tab_list:
self.monitor.add_subprocess(
tab._name, tab._get_target(), robust=tab._robust, restart=True)
try:
self.monitor.loop(max_seconds=max_seconds)
except KeyboardInterrupt: # pragma: no cover
pass
class Tab:
_SILENCE_LOGGER = False
def __init__(self, name, robust=True, verbose=True):
"""
Schedules a Tab entry in the cron runner
:param name: Every tab must have a string name
:param robust: A robust tab will be restarted if an error occures
A non robust tab will not be restarted, but all other
non-errored tabs should continue running
:param verbose: Set the verbosity of log messages.
"""
self._name = name
self._robust = robust
self._verbose = verbose
self._starting_at = None
self._every_kwargs = None
self._func = None
self._func_args = None
self._func_kwargs = None
def starting_at(self, datetime_or_str):
"""
Set the starting time for the cron job. If not specified, the starting time will always
be the beginning of the interval that is current when the cron is started.
:param datetime_or_str: a datetime object or a string that dateutil.parser can understand
:return: self
"""
if isinstance(datetime_or_str, str):
self._starting_at = parse(datetime_or_str)
elif isinstance(datetime_or_str, datetime.datetime):
self._starting_at = datetime_or_str
else:
raise ValueError('.starting_at() method can only take strings or datetime objects')
return self
def every(self, **kwargs):
"""
Specify the interval at which you want the job run. Takes exactly one keyword argument.
That argument must be one named one of [second, minute, hour, day, week, month, year] or
their plural equivalents.
:param kwargs: Exactly one keyword argument
:return: self
"""
if len(kwargs) != 1:
raise ValueError('.every() method must be called with exactly one keyword argument')
self._every_kwargs = self._clean_kwargs(kwargs)
return self
def run(self, func, *func_args, **func__kwargs):
"""
Specify the function to run at the scheduled times
:param func: a callable
:param func_args: the args to the callable
:param func__kwargs: the kwargs to the callable
:return:
"""
self._func = func
self._func_args = func_args
self._func_kwargs = func__kwargs
return self
def _clean_kwargs(self, kwargs):
allowed_key_map = {
'seconds': 'second',
'second': 'second',
'minutes': 'minute',
'minute': 'minute',
'hours': 'hour',
'hour': 'hour',
'days': 'day',
'day': 'day',
'weeks': 'week',
'week': 'week',
'months': 'month',
'month': 'month',
'years': 'year',
'year': 'year',
}
out_kwargs = {}
for key in kwargs.keys():
out_key = allowed_key_map.get(key.lower())
if out_key is None:
raise ValueError('Allowed time names are {}'.format(sorted(allowed_key_map.keys())))
out_kwargs[out_key] = kwargs[key]
return out_kwargs
def _loop(self, max_iter=None):
if not self._SILENCE_LOGGER: # pragma: no cover don't want to clutter tests
logger = daiquiri.getLogger(self._name)
logger.info('Starting')
# fleming and dateutil have arguments that just differ by ending in an "s"
fleming_kwargs = self._every_kwargs
relative_delta_kwargs = {}
# build the relative delta kwargs
for k, v in self._every_kwargs.items():
relative_delta_kwargs[k + 's'] = v
# if a starting time was given use the floored second of that time as the previous time
if self._starting_at is not None:
previous_time = fleming.floor(self._starting_at, second=1)
# otherwise use the interval floored value of now as the previous time
else:
previous_time = fleming.floor(datetime.datetime.now(), **fleming_kwargs)
# keep track of iterations
n_iter = 0
# this is the infinite loop that runs the cron. It will only be stopped when the
# process is killed by its monitor.
while True:
n_iter += 1
if max_iter is not None and n_iter > max_iter:
break
# everything is run in a try block so errors can be explicitly handled
try:
# push forward the previous/next times
next_time = previous_time + relativedelta(**relative_delta_kwargs)
previous_time = next_time
# get the current time
now = datetime.datetime.now()
# if our job ran longer than an interval, we will need to catch up
if next_time < now:
continue
# sleep until the computed time to run the function
sleep_seconds = (next_time - now).total_seconds()
time.sleep(sleep_seconds)
if self._verbose and not self._SILENCE_LOGGER: # pragma: no cover
logger.info('Running')
# TODO: I think I need to add a wrapped target run right here so that each
# instance of a tab runs in its own subprocess. This means the process tree would
# be 3 deep. 1) the master cron process, 2) The process monitor for each tab 3) The
# individual instance processes
# run the function
self._func(*self._func_args, **self._func_kwargs)
except KeyboardInterrupt: # pragma: no cover
pass
except: # noqa
# only raise the error if not in robust mode.
if self._robust:
s = 'Error in tab\n' + traceback.format_exc()
logger = daiquiri.getLogger(self._name)
logger.error(s)
else:
raise
# def _exec_in_sub_process(self, target, *args, **kwargs):
# self._process = Process(
# target=wrapped_target,
# args=[
# self._target, self.q_stdout, self.q_stderr,
# self.q_error, self._robust, self._name
# ] + list(self._args),
# kwargs=self._kwargs
# )
# self._process.daemon = True
# self._process.start()
def _get_target(self):
"""
returns a callable with no arguments designed
to be the target of a Subprocess
"""
if None in [self._func, self._func_kwargs, self._func_kwargs, self._every_kwargs]:
raise ValueError('You must call the .every() and .run() methods on every tab.')
return self._loop