-
Notifications
You must be signed in to change notification settings - Fork 46
/
__init__.py
155 lines (122 loc) · 4.17 KB
/
__init__.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
from functools import wraps
import collections
import crontab
import django
from django.core.management import get_commands, load_command_class
try:
from importlib import import_module
except ImportError:
from django.utils.importlib import import_module
from kronos.settings import PROJECT_MODULE, KRONOS_PYTHON, KRONOS_MANAGE, \
KRONOS_PYTHONPATH, KRONOS_POSTFIX, KRONOS_PREFIX, KRONOS_BREADCRUMB
from django.conf import settings
from kronos.version import __version__
import six
from django.utils.module_loading import autodiscover_modules
Task = collections.namedtuple('Task', ['name', 'schedule', 'command', 'function'])
registry = set()
def load():
"""
Load ``cron`` modules for applications listed in ``INSTALLED_APPS``.
"""
autodiscover_modules('cron')
if PROJECT_MODULE:
if '.' in PROJECT_MODULE.__name__:
try:
import_module('%s.cron' % '.'.join(
PROJECT_MODULE.__name__.split('.')[0:-1]))
except ImportError as e:
if 'No module named' not in str(e):
print(e)
# load django tasks
for cmd, app in get_commands().items():
try:
load_command_class(app, cmd)
except django.core.exceptions.ImproperlyConfigured:
pass
KRONOS_TEMPLATE = \
'%(prefix)s %(python)s %(manage)s ' \
'runtask %(name)s %(passed_args)s --settings=%(settings_module)s ' \
'%(postfix)s'
DJANGO_TEMPLATE = \
'%(prefix)s %(python)s %(manage)s ' \
'%(name)s %(passed_args)s --settings=%(settings_module)s ' \
'%(postfix)s'
def process_args(args):
res = []
for key, value in six.iteritems(args):
if isinstance(value, dict):
raise TypeError('Parse for dict arguments not yet implemented.')
if isinstance(value, list):
temp_args = ",".join(map(str, value))
res.append("{}={}".format(key, temp_args))
else:
if value is None:
arg_text = "{}"
elif isinstance(value, str):
arg_text = '{}="{}"'
else:
arg_text = '{}={}'
res.append(arg_text.format(key, value))
return res
def register(schedule, args=None, settings_filter=None):
def decorator(function):
global registry_kronos, registry_django
passed_args = process_args(args) if args is not None else []
ctx = {
'prefix': KRONOS_PREFIX,
'python': KRONOS_PYTHON,
'manage': KRONOS_MANAGE,
'passed_args': ' '.join(passed_args),
'settings_module': settings.SETTINGS_MODULE,
'postfix': KRONOS_POSTFIX
}
if hasattr(function, 'handle'):
func = None
tmpl = DJANGO_TEMPLATE
name = function.__module__.split('.')[-1]
else:
func = function
tmpl = KRONOS_TEMPLATE
name = function.__name__
command = tmpl % dict(ctx, name=name)
if KRONOS_PYTHONPATH is not None:
command += ' --pythonpath=%s' % KRONOS_PYTHONPATH
if settings_filter is None or getattr(django.conf.settings, settings_filter, False):
registry.add(Task(name, schedule, command, func))
@wraps(function)
def wrapper(*args, **kwargs):
return function(*args, **kwargs)
return wrapper
return decorator
def install():
"""
Register tasks with cron.
"""
load()
tab = crontab.CronTab(user=True)
for task in registry:
tab.new(task.command, KRONOS_BREADCRUMB).setall(task.schedule)
tab.write()
return len(registry)
def printtasks():
"""
Print the tasks that would be installed in the
crontab, for debugging purposes.
"""
load()
tab = crontab.CronTab('')
for task in registry:
tab.new(task.command, KRONOS_BREADCRUMB).setall(task.schedule)
print(tab.render())
def uninstall():
"""
Uninstall tasks from cron.
"""
tab = crontab.CronTab(user=True)
count = len(list(tab.find_comment(KRONOS_BREADCRUMB)))
tab.remove_all(comment=KRONOS_BREADCRUMB)
tab.write()
return count
def reinstall():
return uninstall(), install()