-
Notifications
You must be signed in to change notification settings - Fork 185
/
utils.py
151 lines (114 loc) · 4.47 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# Copyright 2015 Vinicius Chiele. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utility module."""
import dateutil.parser
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from collections import OrderedDict
def job_to_dict(job):
"""Converts a job to an OrderedDict."""
data = OrderedDict()
data["id"] = job.id
data["name"] = job.name
data["func"] = job.func_ref
data["args"] = job.args
data["kwargs"] = job.kwargs
data.update(trigger_to_dict(job.trigger))
if not job.pending:
data["misfire_grace_time"] = job.misfire_grace_time
data["max_instances"] = job.max_instances
data["next_run_time"] = None if job.next_run_time is None else job.next_run_time
return data
def pop_trigger(data):
"""Pops trigger and trigger args from a given dict."""
trigger_name = data.pop("trigger")
trigger_args = {}
if trigger_name == "date":
trigger_arg_names = ("run_date", "timezone")
elif trigger_name == "interval":
trigger_arg_names = ("weeks", "days", "hours", "minutes", "seconds", "start_date", "end_date", "timezone")
elif trigger_name == "cron":
trigger_arg_names = ("year", "month", "day", "week", "day_of_week", "hour", "minute", "second", "start_date", "end_date", "timezone")
else:
raise Exception(f"Trigger {trigger_name} is not supported.")
for arg_name in trigger_arg_names:
if arg_name in data:
trigger_args[arg_name] = data.pop(arg_name)
return trigger_name, trigger_args
def trigger_to_dict(trigger):
"""Converts a trigger to an OrderedDict."""
data = OrderedDict()
if isinstance(trigger, DateTrigger):
data["trigger"] = "date"
data["run_date"] = trigger.run_date
elif isinstance(trigger, IntervalTrigger):
data["trigger"] = "interval"
data["start_date"] = trigger.start_date
if trigger.end_date:
data["end_date"] = trigger.end_date
w, d, hh, mm, ss = extract_timedelta(trigger.interval)
if w > 0:
data["weeks"] = w
if d > 0:
data["days"] = d
if hh > 0:
data["hours"] = hh
if mm > 0:
data["minutes"] = mm
if ss > 0:
data["seconds"] = ss
elif isinstance(trigger, CronTrigger):
data["trigger"] = "cron"
if trigger.start_date:
data["start_date"] = trigger.start_date
if trigger.end_date:
data["end_date"] = trigger.end_date
for field in trigger.fields:
if not field.is_default:
data[field.name] = str(field)
else:
data["trigger"] = str(trigger)
return data
def fix_job_def(job_def):
"""
Replaces the datetime in string by datetime object.
"""
if isinstance(job_def.get("start_date"), str):
job_def["start_date"] = dateutil.parser.parse(job_def.get("start_date"))
if isinstance(job_def.get("end_date"), str):
job_def["end_date"] = dateutil.parser.parse(job_def.get("end_date"))
if isinstance(job_def.get("run_date"), str):
job_def["run_date"] = dateutil.parser.parse(job_def.get("run_date"))
# it keeps compatibility backward
if isinstance(job_def.get("trigger"), dict):
trigger = job_def.pop("trigger")
job_def["trigger"] = trigger.pop("type", "date")
job_def.update(trigger)
def extract_timedelta(delta):
w, d = divmod(delta.days, 7)
mm, ss = divmod(delta.seconds, 60)
hh, mm = divmod(mm, 60)
return w, d, hh, mm, ss
def bytes_to_wsgi(data):
assert isinstance(data, bytes), "data must be bytes"
if isinstance(data, str):
return data
else:
return data.decode("latin1")
def wsgi_to_bytes(data):
"""coerce wsgi unicode represented bytes to real ones"""
if isinstance(data, bytes):
return data
return data.encode("latin1") # XXX: utf8 fallback?