-
Notifications
You must be signed in to change notification settings - Fork 10
/
core.py
599 lines (496 loc) · 19 KB
/
core.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
# -*- coding: utf-8
# This file is part of rcontrol.
#
# rcontrol is free software; you can redistribute it and/or modify it under the
# terms of the GNU Lesser General Public License as published by the Free
# Software Foundation; either version 3 of the License, or (at your option)
# any later version.
#
# rcontrol is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with rcontrol. If not, see <http://www.gnu.org/licenses/>.
import sys
import threading
import six
from collections import OrderedDict
from rcontrol import fs
import abc
import warnings
class BaseTaskError(Exception):
"""Raised on a task error. All tasks errors inherit from this."""
class TaskError(BaseTaskError):
"""Raised on a task error"""
def __init__(self, session, task, msg):
self.session = session
self.task = task
self.rawmsg = msg
BaseTaskError.__init__(self, "%s: %s (%s)" % (session, task, msg))
class TimeoutError(TaskError):
"""Raise on a command timeout error"""
class ExitCodeError(TaskError):
"""Raised when the exit code of a command is unexpected"""
class TaskErrors(BaseTaskError):
"""A list of task errors"""
def __init__(self, errors):
self.errors = errors
BaseTaskError.__init__(self, '\n'.join(str(e) for e in self.errors))
@six.add_metaclass(abc.ABCMeta)
class Task(object):
def __init__(self, session):
self.session = session
self.explicit_wait = False
# register the task instance to the session
session._register_task(self)
def _unregister(self):
# this must be called by subclasses when the task needs to be
# unregistered from the session. This is called from a thread,
# when the task is finished (or for a timeout)
self.session._unregister_task(self)
@abc.abstractmethod
def is_running(self):
"""
Return True if the task is running.
"""
@abc.abstractmethod
def error(self):
"""
Return an instance of a :class:`BaseTaskError` or None.
"""
def raise_if_error(self):
"""
Check if an error occured and raise it if any.
"""
error = self.error()
if error:
raise error
@abc.abstractmethod
def _wait(self, raise_if_error):
pass
def wait(self, raise_if_error=True):
"""
Block and wait until the task is finished.
:param raise_if_error: if True, call :meth:`raise_if_error` at
the end.
"""
self.explicit_wait = True
return self._wait(raise_if_error=raise_if_error)
def _async(meth, name):
def new_meth(self, *args, **kwargs):
return ThreadableTask(self, meth, (self,) + args, kwargs)
new_meth.__name__ = name
new_meth.__doc__ = """
Asynchronous version of :meth:`%s`.
This method returns an instance of a :class:`ThreadableTask`.
""" % meth.__name__
return new_meth
class TaskCache(object):
def __init__(self):
self._cache = set()
def update(self, tasks):
self._cache.update(id(t) for t in tasks)
def __contains__(self, task):
return id(task) in self._cache
@six.add_metaclass(abc.ABCMeta)
class BaseSession(object):
"""
Represent an abstraction of a session on a remote or local machine.
Note that you should not use a session instance from multiple threads. For
example, running commands and calling :meth:`wait_for_tasks` in parallel
will have an undefined behaviour.
"""
def __init__(self, auto_close=True):
# a lock for tasks and silent errors access
self._lock = threading.Lock()
self._tasks = []
# silent errors are errors from tasks that are not waited
# explicitly. As a task is unregistered from the session once
# it is finished, we save in this list the errors of tasks
# that are finished before wait_for_tasks is called.
self._silent_errors = []
self.auto_close = auto_close
def _register_task(self, task):
assert isinstance(task, Task)
with self._lock:
self._tasks.append(task)
def _unregister_task(self, task):
with self._lock:
try:
self._tasks.remove(task)
except ValueError:
pass # this should not happen
# keep silent error
if not task.explicit_wait:
error = task.error()
if error:
self._silent_errors.append(error)
def tasks(self):
"""
Return a copy of the currently active tasks.
"""
with self._lock:
return self._tasks[:]
def wait_for_tasks(self, raise_if_error=True):
"""
Wait for the running tasks launched from this session.
If any errors are encountered, they are raised or returned depending
on **raise_if_error**. Note that this contains errors reported from
silently finished tasks (tasks ran and finished in backround without
explicit wait call on them).
Tasks started from another task callback (like on_finished)
are also waited here.
This is not required to call this method explictly if you use the
:class:`BaseSession` or the :class:`SessionManager` with the **with**
keyword.
:param raise_if_error: If True, errors are raised using
:class:`TaskErrors`. Else the errors are returned as a list.
"""
errors = []
# in case tasks do not unregister themselves we do not want to
# loop infinitely
tasks_seen = TaskCache()
# we do a while loop to ensure that tasks started from callbacks
# are waited too.
while True:
with self._lock:
# bring back to life silent errors
errors.extend(self._silent_errors)
tasks = [t for t in self._tasks if t not in tasks_seen]
if not tasks:
with self._lock:
# now clean the silent errors
self._silent_errors = []
break
for task in tasks:
task.wait(raise_if_error=False)
error = task.error()
if error:
errors.append(error)
with self._lock:
# now clean the silent errors
self._silent_errors = []
tasks_seen.update(tasks)
if raise_if_error and errors:
raise TaskErrors(errors)
return errors
@abc.abstractmethod
def open(self, filename, mode='r', bufsize=-1):
"""
Return an opened file object.
:param filename: the file path to open
:param mode: the mode used to open the file
:param bufsize: buffer size
"""
@abc.abstractmethod
def execute(self, command, **kwargs):
"""
Execute a command in an asynchronous way.
Return an instance of a subclass of a :class:`CommandTask`.
:param command: the command to execute (a string)
:param kwargs: named arguments passed to the constructor of the
class:`CommandTask` subclass.
"""
@abc.abstractmethod
def walk(self, top, topdown=True, onerror=None, followlinks=False):
"""
Walk the file system. Equivalent to os.walk.
"""
@abc.abstractmethod
def mkdir(self, path):
"""
Create a directory. Equivalent to os.mkdir.
"""
@abc.abstractmethod
def exists(self, path):
"""
Return True if the path exists. Equivalent to os.path.exists.
"""
@abc.abstractmethod
def isdir(self, path):
"""
Return True if the path is a directory. Equivalent to os.path.isdir.
"""
@abc.abstractmethod
def islink(self, path):
"""
Return True if the path is a link. Equivalent to os.path.islink.
"""
def s_copy_file(self, src, dest_os, dest, chunk_size=16384):
"""
Copy a file from this session to another session.
:param src: full path of the file to copy in this session
:param dest_os: session to copy to
:param dest: full path of the file to copy in the dest session
"""
fs.copy_file(self, src, dest_os, dest, chunk_size=chunk_size)
copy_file = _async(s_copy_file, "copy_file")
def s_copy_dir(self, src, dest_session, dest, chunk_size=16384):
"""
Recursively copy a directory from a session to another one.
**dest** must not exist, it will be created automatically.
:param src: path of the dir to copy in this session
:param dest_session: session to copy to
:param dest: path of the dir to copy in the dest session (must
not exists)
"""
fs.copy_dir(self, src, dest_session, dest, chunk_size=chunk_size)
copy_dir = _async(s_copy_dir, "copy_dir")
def close(self):
"""
Close the session.
"""
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
errors = self.wait_for_tasks(raise_if_error=False)
if self.auto_close:
self.close()
if errors:
if value is None:
# no exceptions in the with block -> let's raise
# the errors
raise TaskErrors(errors)
else:
# TODO: for now, just print errors if any
for error in errors:
print('ERROR: %s' % error)
class SessionManager(OrderedDict):
"""
A specialized OrderedDict that keep sessions instances.
It can be used like a namespace: ::
sess_manager.local = LocalSession()
# equivalent to:
# sess_manager['local'] = LocalSession()
It should be used inside a **with** block, to wait for pending
tasks and close sessions if needed automatically.
"""
def __setitem__(self, name, value):
if not isinstance(name, six.string_types):
raise TypeError('key must be an str instance')
if not isinstance(value, BaseSession):
raise TypeError('only BaseSession instances can be set')
OrderedDict.__setitem__(self, name, value)
def __setattr__(self, name, value):
if isinstance(value, BaseSession):
self[name] = value
else:
self.__dict__[name] = value
def __getattr__(self, name):
try:
return self[name]
except KeyError:
raise AttributeError('%r does not exists' % name)
def __delattr__(self, name):
try:
del self[name]
except KeyError:
OrderedDict.__delattr__(self, name)
def wait_for_tasks(self, raise_if_error=True):
"""
Wait for the running tasks lauched from the sessions.
Note that it also wait for tasks that are started from other tasks
callbacks, like on_finished.
:param raise_if_error: if True, raise all possible encountered
errors using :class:`TaskErrors`. Else the errors are returned
as a list.
"""
errors = []
tasks_seen = TaskCache()
while True:
for session in self.values():
errs = session.wait_for_tasks(raise_if_error=False)
errors.extend(errs)
# look for tasks created after the wait (in callbacks of
# tasks from different sessions)
tasks = []
for session in self.values():
tasks.extend(session.tasks())
# if none, then just break - else loop to wait for them
if not any(t for t in tasks if t not in tasks_seen):
break
if raise_if_error and errors:
raise TaskErrors(errors)
return errors
def close(self):
"""
close the sessions.
"""
for session in self.values():
session.close()
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
errors = self.wait_for_tasks(raise_if_error=False)
for session in self.values():
if session.auto_close:
session.close()
if errors:
if value is None:
# no exceptions in the with block -> let's raise
# the errors
raise TaskErrors(errors)
else:
# TODO: for now, just print errors if any
for error in errors:
print('ERROR: %s' % error)
class CommandTask(Task):
"""
Base class that execute a command in an asynchronous way.
It uses an internal stream reader (a subclass of
:class:`streamreader.StreamsReader`)
:param session: the session that run this command
:param reader_class: the :class:`streamreader.StreamsReader` class
to use
:param command: the command to execute (a string)
:param expected_exit_code: the expected exit code of the command. If
None, there is no exit code expected.
:param combine_stderr: if None, stderr and stdout will be automatically
combined unless stderr_callback is defined. You can force to combine
stderr or stdout by passing True or False.
:param timeout: timeout in seconds for the task. If None, no timeout is
set - else timeout_callback is called if the command has not finished
in time.
:param output_timeout: timeout in seconds for waiting output. If None, no
timeout is set - else timeout_callback is called if there is no output
in time.
:param on_finished: a callable that takes one parameter, the command
task instance. Called when the command is finished, but not on timeout.
:param on_timeout: a callable that takes one parameter, the command
task instance. Called on timeout.
:param on_stdout: a callable that takes two parameter, the command
task instance and the line read. Called on line read from stdout and
possibly from stderr if streams are combined..
:param on_stderr: a callable that takes two parameter, the command
task instance and the line read. Called on line read from stderr.
"""
def __init__(self, session, reader_class, command, expected_exit_code=0,
combine_stderr=None, timeout=None, output_timeout=None,
on_finished=None, on_timeout=None, on_stdout=None,
on_stderr=None,
# deprecated aliases
finished_callback=None, timeout_callback=None,
stdout_callback=None, stderr_callback=None):
Task.__init__(self, session)
if combine_stderr is None:
combine_stderr = not stderr_callback
self._combine_stderr = combine_stderr
self.__exit_code = None
self.__expected_exit_code = expected_exit_code
self.__timed_out = False
def _warn(name):
msg = ("You should use on_%s instead of %s_callback"
" in new code (it will be removed soon)") % (name, name)
warnings.warn(msg)
if finished_callback:
_warn("finished")
on_finished = finished_callback
if timeout_callback:
_warn("timeout")
on_timeout = timeout_callback
if stdout_callback:
_warn("stdout")
on_stdout = stdout_callback
if stderr_callback:
_warn("stderr")
on_stderr = stderr_callback
self.__finished_callback = on_finished
self.__timeout_callback = on_timeout
self.__stdout_callback = on_stdout
self.__stderr_callback = on_stderr
self._reader = reader_class(
stdout_callback=self._on_stdout,
stderr_callback=self._on_stderr,
timeout=timeout,
output_timeout=output_timeout,
timeout_callback=self._on_timeout,
finished_callback=self._on_finished
)
def _set_exit_code(self, exit_code):
self.__exit_code = exit_code
def _on_stdout(self, line):
if self.__stdout_callback:
self.__stdout_callback(self, line)
def _on_stderr(self, line):
if self.__stderr_callback:
self.__stderr_callback(self, line)
def _on_timeout(self):
self._unregister()
self.__timed_out = True
if self.__timeout_callback:
self.__timeout_callback(self)
def _on_finished(self):
self._unregister()
if self.__finished_callback:
self.__finished_callback(self)
def timed_out(self):
"""
Return True if a timeout occured.
"""
return self.__timed_out
def is_running(self):
"""
Return True if the command is still running.
"""
return self._reader.is_alive()
def error(self):
"""
Return an instance of Exception if any, else None.
Actually check for a :class:`TimeoutError` or a
:class:`ExitCodeError`.
"""
if self.__timed_out:
return TimeoutError(self.session, self, "timeout")
if self.__exit_code is not None and \
self.__expected_exit_code is not None and \
self.__exit_code != self.__expected_exit_code:
return ExitCodeError(self.session, self,
'bad exit code: Got %s' % self.__exit_code)
def exit_code(self):
"""
Return the exit code of the command, or None if the command is
not finished yet.
"""
return self.__exit_code
def _wait(self, raise_if_error):
if self._reader.is_alive():
self._reader.thread.join()
if raise_if_error:
self.raise_if_error()
return self.__exit_code
class ThreadableTask(Task):
"""
A task ran in a background thread.
"""
def __init__(self, session, callable, args, kwargs,
finished_callback=None):
Task.__init__(self, session)
# Set up exception handling
self.exception = None
def wrapper(*args, **kwargs):
try:
callable(*args, **kwargs)
except Exception:
self.exception = TaskError(session, self, sys.exc_info()[1])
finally:
self._unregister()
if finished_callback:
finished_callback(self)
# Kick off thread
name = getattr(callable, '__name__', None)
thread = threading.Thread(None, wrapper, name, args, kwargs)
thread.setDaemon(True)
thread.start()
# Make thread available to instantiator
self.thread = thread
def is_running(self):
return self.thread.is_alive()
def error(self):
return self.exception
def _wait(self, raise_if_error):
if self.thread.is_alive():
self.thread.join()
if raise_if_error:
self.raise_if_error()