-
Notifications
You must be signed in to change notification settings - Fork 62
/
status.py
323 lines (273 loc) · 10.3 KB
/
status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
#!/usr/bin/python
# -*- coding: utf-8 -*-
# Copyright (C) 2008-2022, Luis Pedro Coelho <luis@luispedro.org>
# vim: set ts=4 sts=4 sw=4 expandtab smartindent:
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from collections import defaultdict, namedtuple
from contextlib import contextmanager
import jug
from .. import task
from .. import backends
from ..task import Task
from ..backends import memoize_store
from ..io import print_task_summary_table
from . import SubCommand
__all__ = [
'status'
]
unknown = 'unknown'
waiting = 'waiting'
ready = 'ready'
running = 'running'
failed = 'failed'
finished = 'finished'
def create_sqlite3(connection, ht, deps, rdeps):
connection.executescript('''
CREATE TABLE ht (
id INTEGER PRIMARY KEY,
name CHAR(128),
hash CHAR(128),
status CHAR(32));
CREATE TABLE dep (
source INT,
target INT);
''')
connection.executemany('INSERT INTO ht VALUES(?,?,?,?)', ht)
for i, cdeps in deps.items():
if len(cdeps):
connection.executemany('''
INSERT INTO dep(source, target) VALUES(?,?)
''', [(i, cd) for cd in cdeps])
def retrieve_sqlite3(connection):
'''
Retrieves status from an SQLite3 DB
Parameters
----------
connection: DB connection
Returns
-------
ht : status list. See Table ht
dep : dict
dependencies
rdep : dict
reverse dependencies
'''
ht = connection. \
execute('SELECT * FROM ht ORDER BY id'). \
fetchall()
deps = defaultdict(list)
rdeps = defaultdict(list)
for d0, d1 in connection.execute('SELECT * FROM dep'):
deps[d0].append(d1)
rdeps[d1].append(d0)
return ht, dict(deps), dict(rdeps)
def save_dirty3(connection, dirty):
connection.executemany('UPDATE ht SET STATUS = ? WHERE id = ?', [(nstatus, id) for id, nstatus in dirty.items()])
@contextmanager
def _open_connection(options):
'''Opens sqlite3 connection as a context manager
'''
import sqlite3
connection = sqlite3.connect(options.status_cache_file)
yield connection
connection.commit()
connection.close()
def load_jugfile(options):
store, _ = jug.init(options.jugfile, options.jugdir)
h2idx = {}
ht = []
deps = {}
try:
for i, t in enumerate(task.alltasks):
deps[i] = [h2idx[d.hash() if isinstance(d, Task) else d._base_hash()]
for d in t.dependencies()]
hash = t.hash()
ht.append((i, t.name, hash, unknown))
h2idx[hash] = i
except KeyError:
import sys
sys.stderr.write("Could not build dependency graph!\n")
sys.stderr.write("This normally indicates a bug in your code!\n")
sys.stderr.write("\n")
sys.stderr.write("A common error is to build a Task with a mutable argument and subsequently modifying.\n")
sys.stderr.write("\n")
sys.stderr.write("For help, you can use the jug-users mailing-list: https://groups.google.com/g/jug-users\n")
sys.stderr.write("\n")
sys.exit(1)
rdeps = defaultdict(list)
for k, v in deps.items():
for rv in v:
rdeps[rv].append(k)
return store, ht, deps, dict(rdeps)
class TaskStatus:
def __init__(self):
self.failed=defaultdict(int)
self.waiting=defaultdict(int)
self.ready=defaultdict(int)
self.running=defaultdict(int)
self.finished=defaultdict(int)
def update_status(store, ht, deps, rdeps):
ts = TaskStatus()
store = memoize_store(store, list_base=True)
dirty = {}
for i, name, hash, status in ht:
nstatus = None
if status == finished or store.can_load(hash):
ts.finished[name] += 1
nstatus = finished
else:
can_run = True
if status != ready:
for dep in deps.get(i, []):
_, _, dhash, dstatus = ht[dep]
if dstatus != finished and not store.can_load(dhash):
can_run = False
break
if can_run:
lock = store.getlock(hash)
if lock.is_locked():
if lock.is_failed():
ts.failed[name] += 1
nstatus = failed
else:
ts.running[name] += 1
nstatus = running
else:
ts.ready[name] += 1
nstatus = ready
else:
ts.waiting[name] += 1
nstatus = waiting
assert nstatus is not None, 'update_status: nstatus not assigned'
if status != nstatus:
dirty[i] = nstatus
return ts, dirty
def _print_status(options, ts):
if options.short:
n_ready = sum(ts.ready.values())
n_running = sum(ts.running.values())
n_failed = sum(ts.failed.values())
n_waiting = sum(ts.waiting.values())
n_finished = sum(ts.finished.values())
if not n_waiting and not n_running and not n_failed and not n_ready:
options.print_out('All tasks complete ({0} tasks).'.format(n_finished))
elif not n_running:
options.print_out('{0} tasks waiting to be run, {1} failed, {2} complete, (none active).'.format(n_waiting + n_ready, n_failed, n_finished))
else:
options.print_out('{0} tasks waiting to be run, {1} failed, {2} complete, ({3} active).'.format(n_waiting + n_ready, n_failed, n_finished, n_running))
else:
print_task_summary_table(options, [
("Failed", ts.failed),
("Waiting", ts.waiting),
("Ready", ts.ready),
("Complete", ts.finished),
("Active", ts.running)])
def _clear_cache(options):
from os import unlink
try:
unlink(options.status_cache_file)
except:
pass
def _status_cached(options):
create, update = list(range(2))
try:
with _open_connection(options) as connection:
ht, deps, rdeps = retrieve_sqlite3(connection)
store = backends.select(options.jugdir)
mode = update
except:
store, ht, deps, rdeps = load_jugfile(options)
mode = create
ts, dirty = update_status(store, ht, deps, rdeps)
_print_status(options, ts)
if mode == update:
with _open_connection(options) as connection:
save_dirty3(connection, dirty)
else:
for k in dirty:
_, name, hash, _ = ht[k]
ht[k] = (k, name, hash, dirty[k])
with _open_connection(options) as connection:
create_sqlite3(connection, ht, deps, rdeps)
return sum(ts.finished.values())
def _status_nocache(options):
store, _ = jug.init(options.jugfile, options.jugdir)
Task.store = memoize_store(store, list_base=True)
ts = TaskStatus()
for t in task.alltasks:
if t.can_load():
ts.finished[t.name] += 1
elif t.can_run():
if t.is_locked():
if t.is_failed():
ts.failed[t.name] += 1
else:
ts.running[t.name] += 1
else:
ts.ready[t.name] += 1
else:
ts.waiting[t.name] += 1
_print_status(options, ts)
return sum(ts.finished.values())
class StatusCommand(SubCommand):
'''Print status
status(options)
Implements the status command.
Parameters
----------
options : jug options
'''
name = "status"
def run(self, options, *args, **kwargs):
if options.status_cache:
try:
import sqlite3
except ImportError:
from sys import stderr
stderr.write('Cached status relies on sqlite3. Falling back to non-cached version')
options.status_cache = False
return status(options)
if options.status_cache_clear:
return _clear_cache(options)
return _status_cached(options)
else:
return _status_nocache(options)
def parse(self, parser):
defaults = self.parse_defaults()
parser.add_argument('--cache',
action='store_const', const=True,
dest='status_cache',
help='Use a cache for faster status [does not update after jugfile changes, though]')
parser.add_argument('--cache-file',
action='store', metavar="CACHE_FILE",
dest='status_cache_file',
help=('Name of file to use for status cache. Use with status --cache. '
'(Default: {status_cache_file}'.format(**defaults)))
parser.add_argument('--clear',
action='store_const', const=True,
dest='status_cache_clear',
help='Use with status --cache. Removes the cache file')
def parse_defaults(self):
return {
"status_cache": False,
"status_cache_clear": False,
"status_cache_file": ".jugstatus.sqlite3",
}
status = StatusCommand()