diff --git a/bin/radical-stack b/bin/radical-stack index 2eb8fe17e..3f8dbecb0 100755 --- a/bin/radical-stack +++ b/bin/radical-stack @@ -1,16 +1,52 @@ #!/usr/bin/env python -import radical.utils as ru +import os +import sys +import glob +import pkgutil -stack = ru.stack() +print 'python : %s' % sys.version.split()[0] +print 'virtualenv : %s' % os.environ.get('VIRTUAL_ENV') -print -for key in sorted(stack['sys'].keys()): - print ' %-20s : %s' % (key, stack['sys'][key]) -print -for key in sorted(stack['radical'].keys()): - print ' %-20s : %s' % (key, stack['radical'][key]) -print +try: + import radical.utils as mod + print 'radical.utils : %s' % mod.version_detail +except: + pass -# ------------------------------------------------------------------------------ +try: + import saga as mod + print 'saga-python : %s' % mod.version_detail +except: + pass + +try: + import radical.saga as mod + print 'radical.saga : %s' % mod.version_detail +except: + pass + +try: + import radical.pilot as mod + print 'radical.pilot : %s' % mod.version_detail +except: + pass + +try: + import radical.entk as mod + print 'radical.entk : %s' % mod.version_detail +except: + pass + +try: + import radical.ensembletk as mod + print 'radical.ensembletk: %s' % mod.version_detail +except: + pass + +try: + import radical.analytics as mod + print 'radical.analytics : %s' % mod.version_detail +except: + pass diff --git a/bin/radical-stack-clone b/bin/radical-stack-clone index bb3032e9b..35cbbb8f7 100755 --- a/bin/radical-stack-clone +++ b/bin/radical-stack-clone @@ -192,17 +192,13 @@ printf "%-20s %-55s %-30s %-30s %s\n" "mod" "repo" "branch" "commit" "tag" # if ! test -z "$stack" then - for mod in 'radical.utils' 'radical.saga' 'saga' 'radical.pilot' 'radical.analytics' + for mod in 'radical.utils' 'saga-python' 'radical.pilot' 'radical.analytics' do info=$(cat $stack | grep -e "^$mod" | tail -n 1) if test -z "$info" then echo "skip $mod" else - if test "$mod" = "saga" - then - mod='saga-python' - fi repo="https://github.com/radical-cybertools/$mod.git" spec=$( echo "$info" | cut -f 2 -d ':' | xargs echo) @@ -215,12 +211,8 @@ then done else - for mod in 'radical.utils' 'radical.saga' 'saga' 'radical.pilot' 'radical.analytics' + for mod in 'radical.utils' 'saga-python' 'radical.pilot' 'radical.analytics' do - if test "$mod" = "saga" - then - mod='saga-python' - fi repo="https://github.com/radical-cybertools/$mod.git" install "$mod" "$repo" "$branch" "$tag" "$commit" diff --git a/setup.py b/setup.py old mode 100755 new mode 100644 index f0c14f325..cc74b9596 --- a/setup.py +++ b/setup.py @@ -2,11 +2,11 @@ __author__ = 'RADICAL Team' __email__ = 'radical@rutgers.edu' -__copyright__ = 'Copyright 2013-16, RADICAL Research, Rutgers University' +__copyright__ = 'Copyright 2013/14, RADICAL Research, Rutgers University' __license__ = 'MIT' -""" Setup script, only usable via pip. """ +""" Setup script. Used by easy_install and pip. """ import re import os @@ -270,8 +270,7 @@ def isgood(name): 'cmdclass' : { 'test' : our_test, }, - 'install_requires' : ['future', - 'colorama', + 'install_requires' : ['colorama', 'netifaces==0.10.4' ], 'extras_require' : { diff --git a/src/radical/utils/__init__.py b/src/radical/utils/__init__.py index 5d5a88c78..04ebdfe41 100644 --- a/src/radical/utils/__init__.py +++ b/src/radical/utils/__init__.py @@ -3,20 +3,16 @@ __copyright__ = "Copyright 2013, RADICAL@Rutgers" __license__ = "MIT" -# we want atfork imported first, specifically before os and logging -from .atfork import * # import utility classes from .object_cache import ObjectCache from .plugin_manager import PluginManager from .singleton import Singleton -from .process import Process -from .threads import Thread, RLock -from .threads import is_main_thread, is_this_thread, cancel_main_thread -from .threads import main_thread, this_thread +from .threads import Thread, RLock, NEW, RUNNING, DONE, FAILED +from .threads import is_main_thread, cancel_main_thread from .threads import raise_in_thread, ThreadExit, SignalRaised -from .futures import Future -from .futures import NEW, RUNNING, DONE, FAILED, CANCELED +from .threads import fs_event_create, fs_event_wait +from .futures import * from .url import Url from .dict_mixin import DictMixin, dict_merge, dict_stringexpand from .dict_mixin import PRESERVE, OVERWRITE @@ -29,6 +25,7 @@ from .daemonize import Daemon # import utility methods +from .atfork import * from .logger import * from .ids import * from .read_json import * diff --git a/src/radical/utils/atfork/__init__.py b/src/radical/utils/atfork/__init__.py index 607ccf715..72f0305ea 100644 --- a/src/radical/utils/atfork/__init__.py +++ b/src/radical/utils/atfork/__init__.py @@ -48,7 +48,4 @@ from .atfork import monkeypatch_os_fork_functions, atfork from .stdlib_fixer import fix_logging_module -fix_logging_module() -monkeypatch_os_fork_functions() - diff --git a/src/radical/utils/atfork/atfork.py b/src/radical/utils/atfork/atfork.py index 7d99d19cc..a5480daf4 100644 --- a/src/radical/utils/atfork/atfork.py +++ b/src/radical/utils/atfork/atfork.py @@ -56,11 +56,6 @@ def monkeypatch_os_fork_functions(): Replace os.fork* with wrappers that use ForkSafeLock to acquire all locks before forking and release them afterwards. """ - - # monkeypatching can be disabled by setting RADICAL_UTILS_NOATFORK - if 'RADICAL_UTILS_NOATFORK' in os.environ: - return - builtin_function = type(''.join) if hasattr(os, 'fork') and isinstance(os.fork, builtin_function): global _orig_os_fork @@ -94,17 +89,10 @@ def atfork(prepare=None, parent=None, child=None): they will be printed to sys.stderr after the fork call once it is safe to do so. """ - - # monkeypatching can be disabled by setting RADICAL_UTILS_NOATFORK - if 'RADICAL_UTILS_NOATFORK' in os.environ: - return - assert not prepare or callable(prepare) assert not parent or callable(parent) assert not child or callable(child) - _fork_lock.acquire() - try: if prepare: _prepare_call_list.append(prepare) @@ -205,6 +193,3 @@ def os_forkpty_wrapper(): # TODO: Also replace os.fork1() on Solaris. - -# ------------------------------------------------------------------------------ - diff --git a/src/radical/utils/atfork/stdlib_fixer.py b/src/radical/utils/atfork/stdlib_fixer.py index 805e7f85e..9b62e3984 100644 --- a/src/radical/utils/atfork/stdlib_fixer.py +++ b/src/radical/utils/atfork/stdlib_fixer.py @@ -50,11 +50,6 @@ class Error(Exception): def fix_logging_module(): - - import os - if 'RADICAL_UTILS_NOATFORK' in os.environ: - return - logging = sys.modules.get('logging') # Prevent fixing multiple times as that would cause a deadlock. if logging and getattr(logging, 'fixed_for_atfork', None): @@ -88,6 +83,3 @@ def fork_safe_createLock(self): logging.fixed_for_atfork = True finally: logging._releaseLock() - -# ------------------------------------------------------------------------------ - diff --git a/src/radical/utils/debug.py b/src/radical/utils/debug.py index 344129964..984ab2204 100644 --- a/src/radical/utils/debug.py +++ b/src/radical/utils/debug.py @@ -108,6 +108,8 @@ def fs_block(self, info=None): except Exception as e: # we don't care (much)... + print get_trace() + print e pass diff --git a/src/radical/utils/futures.py b/src/radical/utils/futures.py index 5868679c7..4826faa8a 100644 --- a/src/radical/utils/futures.py +++ b/src/radical/utils/futures.py @@ -40,7 +40,7 @@ class Future(mt.Thread): """ This `Future` class is a thin wrapper around Python's native `mt.Thread` - class. It is expected to wrap a callable, and to watch its execution. + class. It is expeted to wrap a callable, and to watch its execution. """ # FIXME: we may want to use a thread pool to limit the number of threads diff --git a/src/radical/utils/logger.py b/src/radical/utils/logger.py index 9d725a256..41cc36ce7 100644 --- a/src/radical/utils/logger.py +++ b/src/radical/utils/logger.py @@ -39,6 +39,10 @@ from .atfork import * +# monkeypatching can be disabled by setting RADICAL_UTILS_NOATFORK +if not 'RADICAL_UTILS_NOATFORK' in os.environ: + stdlib_fixer.fix_logging_module() + monkeypatch_os_fork_functions() # ------------------------------------------------------------------------------ # @@ -88,7 +92,8 @@ def _atfork_parent(): def _atfork_child(): _after_fork() -atfork(_atfork_prepare, _atfork_parent, _atfork_child) +if not 'RADICAL_UTILS_NOATFORK' in os.environ: + atfork(_atfork_prepare, _atfork_parent, _atfork_child) # # ------------------------------------------------------------------------------ @@ -181,8 +186,6 @@ def get_logger(name, target=None, level=None, path=None, header=True): 'name' is used to identify log entries on this handle. 'target' is a comma separated list (or Python list) of specifiers, where specifiers are: - '0' : /dev/null - 'null' : /dev/null '-' : stdout '1' : stdout 'stdout' : stdout @@ -299,9 +302,9 @@ def get_logger(name, target=None, level=None, path=None, header=True): # add a handler for each targets (using the same format) logger.targets = targets for t in logger.targets: - if t in ['0', 'null']: - handle = logging.NullHandler() - elif t in ['-', '1', 'stdout']: + if t in ['null']: + continue + if t in ['-', '1', 'stdout']: handle = ColorStreamHandler(sys.stdout) elif t in ['=', '2', 'stderr']: handle = ColorStreamHandler(sys.stderr) diff --git a/src/radical/utils/misc.py b/src/radical/utils/misc.py index 3df5840c7..826552c12 100644 --- a/src/radical/utils/misc.py +++ b/src/radical/utils/misc.py @@ -1,11 +1,9 @@ import os import sys import time -import glob import regex import signal import socket -import importlib import netifaces import threading import url as ruu @@ -483,34 +481,5 @@ def name2env(name): return name.replace('.', '_').upper() -# ------------------------------------------------------------------------------ -# -def stack(): - - ret = {'sys' : {'python' : sys.version.split()[0], - 'pythonpath' : os.environ.get('PYTHONPATH', ''), - 'virtualenv' : os.environ.get('VIRTUAL_ENV', '')}, - 'radical' : dict() - } - - - modules = ['radical.utils', - 'radical.saga', - 'saga', - 'radical.pilot', - 'radical.entl', - 'radical.ensembletk', - 'radical.analytics'] - - for mod in modules: - try: - tmp = importlib.import_module(mod) - ret['radical'][mod] = tmp.version_detail - except: - pass - - return ret - - # ------------------------------------------------------------------------------ diff --git a/src/radical/utils/process.py b/src/radical/utils/process.py deleted file mode 100644 index 4ef9c7a68..000000000 --- a/src/radical/utils/process.py +++ /dev/null @@ -1,1079 +0,0 @@ - -__author__ = "Radical.Utils Development Team" -__copyright__ = "Copyright 2016, RADICAL@Rutgers" -__license__ = "MIT" - - -import os -import sys -import time -import select -import socket -import threading as mt -import multiprocessing as mp -import setproctitle as spt - -from .logger import get_logger -from .debug import print_exception_trace, print_stacktrace -from .threads import is_main_thread -from .threads import Thread as ru_Thread - - -# ------------------------------------------------------------------------------ -# -_ALIVE_MSG = 'alive' # message to use as alive signal -_START_TIMEOUT = 5.0 # time to wait for process startup signal. - # startup signal: 'alive' message on the socketpair; - # is sent in both directions to ensure correct setup -_WATCH_TIMEOUT = 0.5 # time between thread and process health polls. - # health poll: check for recv, error and abort - # on the socketpair; is done in a watcher thread. -_STOP_TIMEOUT = 0.1 # time between temination signal and killing child -_BUFSIZE = 1024 # default buffer size for socket recvs - - -# ------------------------------------------------------------------------------ -# -class Process(mp.Process): - ''' - This `Process` class is a thin wrapper around `mp.Process` which - specifically manages the process lifetime in a more cautious and copperative - way than the base class: *no* attempt on signal handling is made, we expect - to exclusively communicate between parent and child via a socket. - A separate thread in both processes will watch that socket: if the socket - disappears, we interpret that as the other process being terminated, and - begin process termination, too. - - NOTE: At this point we do not implement the full `mp.Process` constructor. - - The class also implements a number of initialization and finalization - methods which can be overloaded by any deriving class. While this can at - first be a confusing richness of methods to overload, it significantly - simplifies the implementation of non-trivial child processes. By default, - none of the initialized and finalizers needs to be overloaded. - - An important semantic difference are the `start()` and `stop()` methods: - both accept an optional `timeout` parameter, and both guarantee that the - child process successfully started and completed upon return, respectively. - If that does not happen within the given timeout, an exception is raised. - Not that the child startup is *only* considered complete once all of its - initialization methods have been completed -- the start timeout value must - thus be chosen very carefully. Note further that the *parent* - initialization methods are also part of the `start()` invocation, and must - also be considered when choosing the timeout value. Parent initializers - will only be executed once the child is known to be alive, and the parent - can thus expect the child bootstrapping to be completed (avoiding the need - for additional handshakes etc). Any error in child or parent initialization - will result in an exception, and the child will be terminated. - - Along the same lines, the parent and child finalizers are executed in the - `stop()` method, prompting similar considerations for the `timeout` value. - - Any class which derives from this Process class *must* overload the 'work_cb()` - method. That method is repeatedly called by the child process' main loop, - until: - - an exception occurs (causing the child to fail with an error) - - `False` is returned by `work_cb()` (causing the child to finish w/o error) - ''' - - # TODO: We should switch to fork/*exec*, if possible. - - - # -------------------------------------------------------------------------- - # - def __init__(self, name, log=None): - - # At this point, we only initialize members which we need before start - self._ru_log = log # ru.logger for debug output - - # ... or are *shared* between parent and child process. - self._ru_ppid = os.getpid() # get parent process ID - - # most importantly, we create a socketpair. Both parent and child will - # watch one end of that socket, which thus acts as a lifeline between - # the processes, to detect abnormal termination in the process tree. - # The socket is also used to send messages back and forth. - self._ru_sp = socket.socketpair(socket.AF_UNIX, socket.SOCK_STREAM, 0) - - # note that some members are only really initialized when calling the - # `start()`/`run()` methods, and the various initializers. - self._ru_name = name # use for setproctitle - self._ru_spawned = None # set in start(), run() - self._ru_is_parent = None # set in start() - self._ru_is_child = None # set in run() - self._ru_endpoint = None # socket endpoint for sent/recv - self._ru_term = None # set to terminate watcher - self._ru_initialized = False # set to signal bootstrap success - self._ru_terminating = False # set to signal active termination - self._ru_watcher = None # watcher thread - self._ru_things_lock = mt.Lock() # lock for the above - self._ru_things = dict() # registry of threads created in - # (parent or child) process - - # FIXME: assert that start() was called for some / most methods - - if not self._ru_log: - # if no logger is passed down, log to null - self._ru_log = get_logger('radical.util.process', target='rp.log') - self._ru_log.debug('name: %s' % self._ru_name) - - # when cprofile is requested but not available, - # we complain, but continue unprofiled - self._ru_cprofile = False - if self._ru_name in os.environ.get('RADICAL_CPROFILE', '').split(':'): - try: - self._ru_log.error('enable cprofile for %s', self._ru_name) - import cprofile - self._ru_cprofile = True - except: - self._ru_log.error('cannot import cprofile - disable') - - # base class initialization - super(Process, self).__init__(name=self._ru_name) - - # we don't want processes to linger around, waiting for children to - # terminate, and thus create sub-processes as daemons. - # - # NOTE: this requires the `patchy.mc_patchface()` fix in `start()`. - # - # self.daemon = True - - - # -------------------------------------------------------------------------- - # - def _ru_msg_send(self, msg): - ''' - send new message to self._ru_endpoint. We make sure that the - message is not larger than the _BUFSIZE define in the RU source code. - ''' - - if not self._ru_spawned: - # no child, no communication channel - raise RuntimeError("can't communicate w/o child") - - # NOTE: this method should only be called by the watcher thread, which - # owns the endpoint. - # FIXME: BUFSIZE should not be hardcoded - # FIXME: check for socket health - - if len(msg) > _BUFSIZE: - raise ValueError('message is larger than %s: %s' % (_BUFSIZE, msg)) - - self._ru_log.info('send message: %s', msg) - self._ru_endpoint.send(msg) - - - # -------------------------------------------------------------------------- - # - def _ru_msg_recv(self, size=_BUFSIZE, timeout=None): - ''' - receive a message from self._ru_endpoint. We only check for messages - of *up to* `size`. - - This call is non-blocking: if no message is available, return an empty - string. - ''' - - if not self._ru_spawned: - # no child, no communication channel - raise RuntimeError("can't communicate w/o child") - - # NOTE: this method should only be called by the watcher thread, which - # owns the endpoint (no lock used!) - # FIXME: BUFSIZE should not be hardcoded - # FIXME: check for socket health - - try: - if timeout: - self._ru_endpoint.settimeout(timeout) - msg = self._ru_endpoint.recv(size) - self._ru_log.info('recv message: %s', msg) - return msg - - except socket.timeout: - self._ru_log.warn('recv timed out') - return '' - - - # -------------------------------------------------------------------------- - # - def _ru_watch(self): - ''' - When `start()` is called, the parent process will create a socket pair. - after fork, one end of that pair will be watched by the parent and - client, respectively, in separate watcher threads. If any error - condition or hangup is detected on the socket, it is assumed that the - process on the other end died, and termination is initiated. - - Since the watch happens in a subthread, any termination requires the - attention and cooperation of the main thread. No attempt is made on - interrupting the main thread, we only set self._ru_term which needs to - be checked by the main threads in certain intervals. - ''' - - # we watch sockets and threads as long as we live, ie. until the main - # thread sets `self._ru_term`. - try: - - self._ru_poller = select.poll() - self._ru_poller.register(self._ru_endpoint, select.POLLERR | select.POLLHUP | select.POLLIN) - - last = 0.0 # we never watched anything until now - while not self._ru_term.is_set() : - - # only do any watching if time is up - now = time.time() - if now - last < _WATCH_TIMEOUT: - time.sleep(0.1) # FIXME: configurable, load tradeoff - continue - - self._ru_watch_socket() - self._ru_watch_things() - - last = now - - # FIXME: also *send* any pending messages to the child. - # # check if any messages need to be sent. - # while True: - # try: - # msg = self._ru_msg_out.get_nowait() - # self._ru_msg_send(msg) - # - # except Queue.Empty: - # # nothing more to send - # break - - - except Exception as e: - # mayday... mayday... - self._ru_log.warn('watcher failed') - - finally: - # no matter why we fell out of the loop: let the other end of the - # socket know by closing the socket endpoint. - self._ru_endpoint.close() - - # `self.stop()` will be called from the main thread upon checking - # `self._ru_term` via `self.is_alive()`. - # FIXME: check - self._ru_term.set() - - # -------------------------------------------------------------------------- - # - def _ru_watch_socket(self): - - # check health of parent/child relationship - events = self._ru_poller.poll(0.0) # don't block - for _,event in events: - - # for alive checks, we poll socket state for - # * data: some message from the other end, logged - # * error: child failed - terminate - # * hangup: child finished - terminate - - # check for error conditions - if event & select.POLLHUP or \ - event & select.POLLERR : - - # something happened on the other end, we are about to die - # out of solidarity (or panic?). - self._ru_log.warn('endpoint disappeard') - raise RuntimeError('endpoint disappeard') - - # check for messages - elif event & select.POLLIN: - - # we get a message! - # - # FIXME: BUFSIZE should not be hardcoded - # FIXME: we do nothing with the message yet, should be - # stored in a message queue. - msg = self._ru_msg_recv(_BUFSIZE) - self._ru_log.info('message received: %s' % msg) - - # self._ru_log.debug('endpoint watch ok') - - - # -------------------------------------------------------------------------- - # - def register_watchable(self, thing): - ''' - Add an object to watch. If the object is at any point found to be not - alive (`thing.is_alive()` returns `False`), an exception is raised which - will cause the watcher thread to terminate, and will thus also terminate - this `ru.Process` instance. All registered things will be terminated on - `stop()`. - - The given `thing` is expected to have these four methods/properties - - - `thing.is_alive()` -- perform health check - - `thing.name` -- a unique name identifying the thing - - `thing.stop()` -- method to terminate thing on regular shutdown - - `thing.join()` -- method to collect thing on regular shutdown - - `thing.stop()` and `thing.join()` are expected to accept a timeout - parameter, and are expected to raise exceptions if the operation fails. - - In other words, `things` are really expected to be radical.utils threads - and processes, but we don't enforce this in terms of type checks. - ''' - - assert(thing.is_alive) - assert(thing.name) - assert(thing.stop) - assert(thing.join) - - with self._ru_things_lock: - if thing.name in self._ru_things: - raise ValueError('already watching %s' % thing.name) - self._ru_things[thing.name] = thing - - - # -------------------------------------------------------------------------- - # - def unregister_watchable(self, name): - - with self._ru_things_lock: - if not name in self._ru_things: - raise ValueError('%s is not watched' % name) - - del(self._ru_things[name]) - - - # -------------------------------------------------------------------------- - # - def _ru_watch_things(self): - - with self._ru_things_lock: - for tname,thing in self._ru_things.iteritems(): - if not thing.is_alive(): - raise RuntimeError('%s died' % tname) - - - # -------------------------------------------------------------------------- - # - def start(self, spawn=True, timeout=None): - ''' - Overload the `mp.Process.start()` method, and block (with timeout) until - the child signals to be alive via a message over our socket pair. Also - run all initializers. - - If spawn is set to `False`, then no child process is actually created, - but the parent initializers will be executed nonetheless. - - ''' - - self._ru_log.debug('start process') - - if timeout is None: - timeout = _START_TIMEOUT - - # Daemon processes can't fork child processes in Python, because... - # Well, they just can't. We want to use daemons though to avoid hanging - # processes if, for some reason, communication of termination conditions - # fails. - # - # Patchy McPatchface to the rescue (no, I am not kidding): we remove - # that useless assert (of all things!) on the fly. - # - # NOTE: while this works, we seem to have the socketpair-based detection - # stable enough to not need the monkeypatch. - # - # _daemon_fork_patch = '''\ - # *** process_orig.py Sun Nov 20 20:02:23 2016 - # --- process_fixed.py Sun Nov 20 20:03:33 2016 - # *************** - # *** 5,12 **** - # assert self._popen is None, 'cannot start a process twice' - # assert self._parent_pid == os.getpid(), \\ - # 'can only start a process object created by current process' - # - assert not _current_process._daemonic, \\ - # - 'daemonic processes are not allowed to have children' - # _cleanup() - # if self._Popen is not None: - # Popen = self._Popen - # --- 5,10 ---- - # ''' - # - # import patchy - # patchy.mc_patchface(mp.Process.start, _daemon_fork_patch) - - if spawn: - # start `self.run()` in the child process, and wait for it's - # initalization to finish, which will send the 'alive' message. - super(Process, self).start() - self._ru_spawned = True - - - # this is the parent now - set role flags. - self._ru_is_parent = True - self._ru_is_child = False - - # select different ends of the socketpair for further communication. - self._ru_endpoint = self._ru_sp[0] - self._ru_sp[1].close() - - # from now on we need to invoke `self.stop()` for clean termination. - # Having said that: the daemonic watcher thread and the socket lifeline - # to the child should ensure that both will terminate in all cases, but - # possibly somewhat delayed and abruptly. - # - # Either way: use a try/except to ensure `stop()` being called. - try: - - if self._ru_spawned: - # we expect an alive message message from the child, within - # timeout - # - # NOTE: If the child does not bootstrap fast enough, the timeout - # will kick in, and the child will be considered dead, - # failed and/or hung, and will be terminated! Timeout can - # be set as parameter to the `start()` method. - msg = self._ru_msg_recv(size=len(_ALIVE_MSG), timeout=timeout) - - if msg != _ALIVE_MSG: - # attempt to read remainder of message and barf - msg += self._ru_msg_recv() - raise RuntimeError('unexpected child message (%s) [%s]' % (msg, - timeout)) - - - # When we got the alive messages, only then will we call the parent - # initializers. This way those initializers can make some - # assumptions about successful child startup. - self._ru_initialize() - - except Exception as e: - self._ru_log.exception('initialization failed') - self.stop() - raise - - # if we got this far, then all is well, we are done. - self._ru_log.debug('child process started') - - # child is alive and initialized, parent is initialized, watcher thread - # is started - Wohoo! - - - # -------------------------------------------------------------------------- - # - def run(self): - ''' - This method MUST NOT be overloaded! - - This is the workload of the child process. It will first call the child - initializers and then repeatedly call `self.work_cb()`, until being - terminated. When terminated, it will call the child finalizers and - exit. Note that the child will also terminate once `work_cb()` returns - `False`. - - The implementation of `work_cb()` needs to make sure that this process is - not spinning idly -- if there is nothing to do in `work_cb()` at any point - in time, the routine should at least sleep for a fraction of a second or - something. - - Child finalizers are only guaranteed to get called on `self.stop()` -- - a hard kill via `self.terminate()` may or may not be able to trigger to - run the child finalizers. - - The child process will automatically terminate (incl. finalizer calls) - when the parent process dies. It is thus not possible to create daemon - or orphaned processes -- which is an explicit purpose of this - implementation. - ''' - - # if no profiling is wanted, we just run the workload and exit - if not self._ru_cprofile: - self._run() - - # otherwise we run under the profiler, obviously - else: - import cprofile - cprofiler = cProfile.Profile() - cprofiler.runcall(self._run) - cprofiler.dump_stats('%s.cprof' % (self._ru_name)) - - - # -------------------------------------------------------------------------- - # - def _run(self): - - # FIXME: ensure that this is not overloaded - - # this is the child now - set role flags - self._ru_is_parent = False - self._ru_is_child = True - self._ru_spawned = True - - # select different ends of the socketpair for further communication. - self._ru_endpoint = self._ru_sp[1] - self._ru_sp[0].close() - - # set child name based on name given in c'tor, and use as proctitle - self._ru_name = self._ru_name + '.child' - spt.setproctitle(self._ru_name) - - try: - # we consider the invocation of the child initializers to be part of - # the bootstrap process, which includes starting the watcher thread - # to watch the parent's health (via the socket health). - try: - self._ru_initialize() - - except BaseException as e: - self._ru_log.exception('abort: %s', repr(e)) - self._ru_msg_send('error: %s' % repr(e)) - # sys.stderr.write('initialization error in %s: %s\n' % (self._ru_name, repr(e))) - # sys.stderr.flush() - self._ru_term.set() - - # initialization done - we only now send the alive signal, so the - # parent can make some assumptions about the child's state - if not self._ru_term.is_set(): - self._ru_log.info('send alive') - self._ru_msg_send(_ALIVE_MSG) - - # enter the main loop and repeatedly call 'work_cb()'. - # - # If `work_cb()` ever returns `False`, we break out of the loop to call the - # finalizers and terminate. - # - # In each iteration, we also check if the socket is still open -- if it - # is closed, we assume the parent to be dead and terminate (break the - # loop). We consider the socket closed if `self._ru_term` was set - # by the watcher thread. - while not self._ru_term.is_set() and \ - self._parent_is_alive() : - - # des Pudel's Kern - if not self.work_cb(): - self._ru_msg_send('work finished') - break - - except BaseException as e: - - # This is a very global except, also catches - # sys.exit(), keyboard interrupts, etc. - # Ignore pylint and PEP-8, we want it this way! - self._ru_log.exception('abort: %s', repr(e)) - self._ru_msg_send('abort: %s' % repr(e)) - # sys.stderr.write('work error in %s: %s\n' % (self._ru_name, repr(e))) - # sys.stderr.flush() - - try: - # note that we always try to call the finalizers, even if an - # exception got raised during initialization or in the work loop - # initializers failed for some reason or the other... - self._ru_finalize() - - except BaseException as e: - self._ru_log.exception('finalization error') - self._ru_msg_send('finalize(): %s' % repr(e)) - # sys.stderr.write('finalize error in %s: %s\n' % (self._ru_name, repr(e))) - # sys.stderr.flush() - - self._ru_msg_send('terminating') - - # tear down child watcher - if self._ru_watcher: - self._ru_term.set() - self._ru_watcher.join(_STOP_TIMEOUT) - - # stop all things we watch - with self._ru_things_lock: - for tname,thing in self._ru_things.iteritems(): - try: - thing.stop(timeout=_STOP_TIMEOUT) - except Exception as e: - self._ru_log.exception('Could not stop %s [%s]', tname, e) - - # All watchables should have stopped. For some of them, - # `stop()` will have implied `join()` already - but an additional - # `join()` will cause little overhead, so we don't bother - # distinguishing. - - with self._ru_things_lock: - for tname,thing in self._ru_things.iteritems(): - try: - thing.join(timeout=_STOP_TIMEOUT) - except Exception as e: - self._ru_log.exception('could not join %s [%s]', tname, e) - - # all is done and said - begone! - sys.exit(0) - - - # -------------------------------------------------------------------------- - # - def stop(self, timeout=None): - ''' - `stop()` is symmetric to `start()`, in that it can only be called by the - parent process. It MUST be called from the main thread. Both - conditions are asserted. If a subthread or the child needs to trigger - the termination of the parent, it should simply terminate/exit its own - scope, and let the parent detect that via its watcher thread. That will - eventually cause `is_alive()` to return `False`, signalling the - application that `stop()` should be called. - - This method sets the thread termination signal, and call `stop()` on all - watchables. It then calls `join()` on all watchables, with the given - timeout, and then also joins the child process with the given timeout. - - If any of the join calls fails or times out, an exception will be - raised. All join's will be attempted though, to collect as many threads - and processes as possible. - - - NOTE: `stop()` implies `join()`! Use `terminate()` if that is not - wanted. Terminate will not stop watchables though. - - NOTE: The given timeout is, as described above, applied multiple times, - once for the child process, and once for each watchable. - ''' - - # FIXME: we can't really use destructors to make sure stop() is called, - # but we should consider using `__enter__`/`__leave__` scopes to - # ensure clean termination. - - # FIXME: This method should reduce to - # self.terminate(timeout) - # self.join(timeout) - # ie., we should move some parts to `terminate()`. - - if not timeout: - timeout = _STOP_TIMEOUT - - # make sure we don't recurse - if self._ru_terminating: - return - self._ru_terminating = True - - # if not is_main_thread(): - # self._ru_log.info('reroute stop to main thread (%s)' % self._ru_name) - # sys.exit() - - self._ru_log.info('parent stops child') - - # keep a list of error messages for an eventual exception message - errors = list() - - # call parent finalizers - self._ru_finalize() - - # tear down watcher - we wait for it to shut down, to avoid races - if self._ru_watcher: - self._ru_term.set() - self._ru_watcher.stop(timeout) - - # stop all things we watch - with self._ru_things_lock: - for tname,thing in self._ru_things.iteritems(): - try: - thing.stop(timeout=timeout) - except Exception as e: - errors.append('could not stop %s [%s]' % (tname, e)) - - # process termination is only relevant for the parent, and only if - # a child was actually spawned - if self._ru_is_parent and self._ru_spawned: - - # stopping the watcher will close the socket, and the child should - # begin termination immediately. Well, whenever it realizes the - # socket is gone, really. We wait for that termination to complete. - super(Process, self).join(timeout) - - # make sure child is gone - if super(Process, self).is_alive(): - self._ru_log.warn('failed to stop child - terminate') - self.terminate() # hard kill - super(Process, self).join(timeout) - - # check again - if super(Process, self).is_alive(): - errors.append('could not join child process %s' % self.pid) - - # meanwhile, all watchables should have stopped, too. For some of them, - # `stop()` will have implied `join()` already - but an additional - # `join()` will cause little overhead, so we don't bother - # distinguishing. - with self._ru_things_lock: - for tname,thing in self._ru_things.iteritems(): - try: - thing.join(timeout=timeout) - except Exception as e: - errors.append('could not join %s [%s]' % (tname, e)) - - # don't exit - but signal if the child or any watchables survived - if errors: - for error in errors: - self._ru_log.error(error) - raise RuntimeError(errors) - - - # -------------------------------------------------------------------------- - # - def join(self, timeout=None): - - # raise RuntimeError('call stop instead!') - # - # we can't really raise the exception above, as the mp module calls this - # join via `at_exit`. Which kind of explains hangs on unterminated - # children... - # - # FIXME: not that `join()` w/o `stop()` will not call the parent finalizers. - # We should call those in both cases, but only once. - - if not timeout: - timeout = _STOP_TIMEOUT - - assert(self._ru_is_parent) - - if self._ru_spawned: - super(Process, self).join(timeout=timeout) - - - # -------------------------------------------------------------------------- - # - def _ru_initialize(self): - ''' - Perform basic settings, then call common and parent/child initializers. - ''' - - try: - # call parent and child initializers, respectively - if self._ru_is_parent: - self._ru_initialize_common() - self._ru_initialize_parent() - - self.ru_initialize_common() - self.ru_initialize_parent() - - elif self._ru_is_child: - self._ru_initialize_common() - self._ru_initialize_child() - - self.ru_initialize_common() - self.ru_initialize_child() - - self._ru_initialized = True - - except Exception as e: - self._ru_log.exception('initialization error') - raise RuntimeError('initialize: %s' % repr(e)) - - - # -------------------------------------------------------------------------- - # - def _ru_initialize_common(self): - - pass - - - # -------------------------------------------------------------------------- - # - def _ru_initialize_parent(self): - - # the term signal is *always* used, not only when spawning. - self._ru_term = mt.Event() - - if not self._ru_spawned: - # no child, so we won't need a watcher either - return - - # Start a separate thread which watches our end of the socket. If that - # thread detects any failure on that socket, it will set - # `self._ru_term`, to signal its demise and prompt an exception from - # the main thread. - # - # NOTE: For several reasons, the watcher thread has no valid/stable - # means of directly signaling the main thread of any error - # conditions, it is thus necessary to manually check the child - # state from time to time, via `self.is_alive()`. - # - # NOTE: https://bugs.python.org/issue1856 (01/2008) - # `sys.exit()` can segfault Python if daemon threads are active. - # https://bugs.python.org/issue21963 (07/2014) - # This will not be fixed in python 2.x. - # - # We make the Watcher thread a daemon anyway: - # - # - when `sys.exit()` is called in a child process, we don't care - # about the process anymore anyway, and all terminfo data are - # sent to the parent anyway. - # - when `sys.exit()` is called in the parent on unclean shutdown, - # the same holds. - # - when `sys.exit()` is called in the parent on clean shutdown, - # then the watcher threads should already be terminated when the - # `sys.exit()` invocation happens - # - # FIXME: check the conditions above - # - # FIXME: move to _ru_initialize_common - # - self._ru_watcher = ru_Thread(name='%s.watch' % self._ru_name, - target=self._ru_watch, - log=self._ru_log) - self._ru_watcher.start() - - self._ru_log.info('child is alive') - - - # -------------------------------------------------------------------------- - # - def _ru_initialize_child(self): - - # TODO: should we also get an alive from parent? - # - # FIXME: move to _ru_initialize_common - # - - self._ru_log.info('child (me) initializing') - - # start the watcher thread - self._ru_term = mt.Event() - self._ru_watcher = ru_Thread(name='%s.watch' % self._ru_name, - target=self._ru_watch, - log=self._ru_log) - self._ru_watcher.start() - - self._ru_log.info('child (me) is alive') - - - # -------------------------------------------------------------------------- - # - def ru_initialize_common(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, for both the parent and the child process (individually). If - this fails on either side, the process startup is considered failed. - ''' - - self._ru_log.debug('ru_initialize_common (NOOP)') - - - # -------------------------------------------------------------------------- - # - def ru_initialize_parent(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, in the parent process. If this fails, the process startup is - considered failed. - ''' - - self._ru_log.debug('ru_initialize_parent (NOOP)') - - - # -------------------------------------------------------------------------- - # - def ru_initialize_child(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, in the child process. If this fails, the process startup is - considered failed. - ''' - - self._ru_log.debug('ru_initialize_child (NOOP)') - - - # -------------------------------------------------------------------------- - # - def _ru_finalize(self): - ''' - Call common and parent/child initializers. - - Note that finalizers are called in inverse order of initializers. - ''' - - try: - # call parent and child finalizers, respectively - if self._ru_is_parent: - self.ru_finalize_parent() - self.ru_finalize_common() - - self._ru_finalize_parent() - self._ru_finalize_common() - - elif self._ru_is_child: - self.ru_finalize_child() - self.ru_finalize_common() - - self._ru_finalize_child() - self._ru_finalize_common() - - except Exception as e: - self._ru_log.exception('finalization error') - raise RuntimeError('finalize: %s' % repr(e)) - - - # -------------------------------------------------------------------------- - # - def _ru_finalize_common(self): - - pass - - - # -------------------------------------------------------------------------- - # - def _ru_finalize_parent(self): - - pass - - - # -------------------------------------------------------------------------- - # - def _ru_finalize_child(self): - - pass - - - # -------------------------------------------------------------------------- - # - def ru_finalize_common(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the parent process, in both - the parent and the child process (individually). - ''' - - self._ru_log.debug('ru_finalize_common (NOOP)') - - - # -------------------------------------------------------------------------- - # - def ru_finalize_parent(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the parent process. - ''' - - self._ru_log.debug('ru_finalize_parent (NOOP)') - - - # -------------------------------------------------------------------------- - # - def ru_finalize_child(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the child process. - ''' - - self._ru_log.debug('ru_finalize_child (NOOP)') - - - # -------------------------------------------------------------------------- - # - def work_cb(self): - ''' - This method MUST be overloaded. It represents the workload of the - process, and will be called over and over again. - - This has several implications: - - * `work_cb()` needs to enforce any call rate limits on its own! - * in order to terminate the child, `work_cb()` needs to either raise an - exception, or call `sys.exit()` (which actually also raises an - exception). - - Before the first invocation, `self.ru_initialize_child()` will be called. - After the last invocation, `self.ru_finalize_child()` will be called, if - possible. The latter will not always be possible if the child is - terminated by a signal, such as when the parent process calls - `child.terminate()` -- `child.stop()` should be used instead. - - The overloaded method MUST return `True` or `False` -- the child will - continue to work upon `True`, and otherwise (on `False`) begin - termination. - ''' - - raise NotImplementedError('ru.Process.work_cb() MUST be overloaded') - - - # -------------------------------------------------------------------------- - # - def is_alive(self, strict=True): - ''' - Check if the child process is still alive, and also assert that - termination is not yet initiated. If `strict` is set (default), then - only the process state is checked. - ''' - - if not self._ru_spawned: - # its not an error if the child was never spawned. - alive = True - - elif self._ru_is_child: - # if this *is* the child, then it's alive - alive = True - - else: - # this is the parent, and it spawned: check for real - # NOTE: assert removed for performance - # assert(self._ru_spawned) - # assert(self._ru_is_parent) - alive = super(Process, self).is_alive() - - termed = self._ru_term.is_set() - - if strict: - return alive - else: - return alive and not termed - - - # -------------------------------------------------------------------------- - # - def is_valid(self, term=True): - ''' - This method provides a self-check, and will call `stop()` if that check - fails and `term` is set. If `term` is not set, the return value of - `is_alive()` is passed through. - - The check will basically assert that `is_alive()` is `True`. The - purpose is to call this check frequently, e.g. at the begin of each - method invocation and during longer loops, as to catch failing - sub-processes or threads and to then terminate. - - This method will always return True if `start()` was not called, yet. - ''' - - if self._ru_terminating or not self._ru_initialized: - return True - - alive = self.is_alive(strict=False) - - if not alive and term: - self._ru_log.warn('alive check failed - stop [%s - %s]', alive, term) - self.stop() - else: - return alive - - - # -------------------------------------------------------------------------- - # - def _parent_is_alive(self): - ''' - This private method checks if the parent process is still alive. This - obviously only makes sense when being called in the child process. - - Note that there exists a (however unlikely) race: PIDs are reused, and - the process could be replaced by another process with the same PID - inbetween tests. We thus also except *all* exception, including - permission errors, to capture at least some of those races. - ''' - - # This method is an additional fail-safety check to the socketpair - # watching performed by the watcher thread -- either one should - # actually suffice. - - assert(self._ru_is_child) - - try: - os.kill(self._ru_ppid, 0) - return True - - except: - return False - - -# ------------------------------------------------------------------------------ - diff --git a/src/radical/utils/threads.py b/src/radical/utils/threads.py index 13090754e..5423eb21f 100644 --- a/src/radical/utils/threads.py +++ b/src/radical/utils/threads.py @@ -1,6 +1,6 @@ -__author__ = "Radical.Utils Development Team" -__copyright__ = "Copyright 2016, RADICAL@Rutgers" +__author__ = "Radical.Utils Development Team (Andre Merzky)" +__copyright__ = "Copyright 2013, RADICAL@Rutgers" __license__ = "MIT" @@ -9,815 +9,227 @@ import time import signal import thread +import threading import traceback -import Queue as queue -import threading as mt +import misc as rumisc -from .logger import get_logger -from .debug import print_stacktrace, get_stacktrace + +_out_lock = threading.RLock() # ------------------------------------------------------------------------------ # -_ALIVE_MSG = 'alive' # message to use as alive signal -_START_TIMEOUT = 5.0 # time to wait for process startup signal. - # startup signal: 'alive' message on the socketpair; - # is sent in both directions to ensure correct setup -_WATCH_TIMEOUT = 0.5 # time between thread and process health polls. - # health poll: check for recv, error and abort - # on the socketpair; is done in a watcher thread. -_STOP_TIMEOUT = 2.0 # time between temination signal and killing child -_BUFSIZE = 1024 # default buffer size for socket recvs +NEW = 'New' +RUNNING = 'Running' +DONE = 'Done' +FAILED = 'Failed' # ------------------------------------------------------------------------------ # -class Thread(mt.Thread): - ''' - This `Thread` class is a thin wrapper around `mt.Thread` which specifically - manages the thread lifetime in a cautious and copperative way: *no* attempt - on signal handling is made, no exceptions and interrupts are used, we expect - to exclusively communicate between parent and child via a `mt.Events` and - `Queue.Queue` instances. - - NOTE: At this point we do not implement the full `mt.Thread` constructor. - - The class also implements a number of initialization and finalization - methods which can be overloaded by any deriving class. While this can at - first be a confusing richness of methods to overload, it significantly - simplifies the implementation of non-trivial child threads. By default, - none of the initialized and finalizers needs to be overloaded. - - An important semantic difference are the `start()` and `stop()` methods: - both accept an optional `timeout` parameter, and both guarantee that the - child threads successfully started and completed upon return, respectively. - If that does not happen within the given timeout, an exception is raised. - Not that the child startup is *only* considered complete once all of its - initialization methods have been completed -- the start timeout value must - thus be chosen very carefully. Note further that the *parent* - initialization methods are also part of the `start()` invocation, and must - also be considered when choosing the timeout value. Parent initializers - will only be executed once the child is known to be alive, and the parent - can thus expect the child bootstrapping to be completed (avoiding the need - for additional handshakes etc). Any error in child or parent initialization - will result in an exception, and the child will be terminated. - - Along the same lines, the parent and child finalizers are executed in the - `stop()` method, prompting similar considerations for the `timeout` value. - - Any class which derives from this Thread class *must* overload the 'work_cb()` - method. That method is repeatedly called by the child thread's main loop, - until: - - an exception occurs (causing the child to fail with an error) - - `False` is returned by `work_cb()` (causing the child to finish w/o error) - ''' - - # FIXME: assert that start() was called for some / most methods - - # -------------------------------------------------------------------------- - # - def __init__(self, name, target=None, log=None): - - # At this point, we only initialize members which we need before start - self._ru_log = log # ru.logger for debug output - - if not name: - name = 'noname' - - # most importantly, we create an `mt.Event` and an `mt.Queue`. Both - # parent and child will watch the event, which thus acts as a lifeline - # between the threads, to detect abnormal termination in threads. - # The queue endpoint is used to send messages back and forth. - self._ru_term = mt.Event() - self._ru_endpoint = queue.Queue() - - # NOTE: threadlocal storage *must* be defined on module level, and - # cannot be instance specific - self._ru_local = mt.local() # keep some vars thread-local - - # note that some members are only really initialized when calling the - # `start()`/`run()` methods, and the various initializers. - self._ru_name = name # communicate name to thread - self._ru_local.name = name # use for logfiles etc - self._ru_local.started = False # start() has been called - self._ru_local.spawned = None # set in start(), run() - self._ru_local.initialized = False # set to signal bootstrap success - self._ru_local.terminating = False # set to signal active termination - self._ru_local.is_parent = None # set in start() - self._ru_local.is_child = None # set in run() - - if not self._ru_log: - # if no logger is passed down, log to null - self._ru_log = get_logger('radical.util.threads') - self._ru_log.debug('name: %s' % self._ru_local.name) - - if target: - # we don't support `arguments`, yet - self.work_cb = target - - # when cprofile is requested but not available, - # we complain, but continue unprofiled - self._ru_cprofile = False - if self._ru_name in os.environ.get('RADICAL_CPROFILE', '').split(':'): - try: - self._ru_log.error('enable cprofile for %s', self._ru_local.name) - import cprofile - self._ru_cprofile = True - except: - self._ru_log.error('cannot import cprofile - disable') - - # base class initialization - super(Thread, self).__init__(name=self._ru_local.name) - - # we don't want threads to linger around, waiting for children to - # terminate, and thus create sub-threads as daemons. - self.daemon = True - - - # -------------------------------------------------------------------------- - # - def _ru_msg_send(self, msg): - ''' - send new message to self._ru_endpoint. We make sure that the - message is not larger than the _BUFSIZE define in the RU source code. - ''' - - if not self._ru_local.spawned: - # no child, no communication channel - raise RuntimeError("can't communicate w/o child") - - self._ru_log.info('send message: %s', msg) - self._ru_endpoint.put('%s' % msg) - - - # -------------------------------------------------------------------------- - # - def _ru_msg_recv(self, timeout=None): - ''' - receive a message from self._ru_endpoint. - - This call is non-blocking: if no message is available, return an empty - string. - ''' - - if not self._ru_local.spawned: - # no child, no communication channel - raise RuntimeError("can't communicate w/o child") - - try: - if timeout: - msg = self._ru_endpoint.get(block=True, timeout=timeout) - else: - msg = self._ru_endpoint.get(block=False) - self._ru_log.info('recv message: %s', msg) - return msg +def lout(txt, stream=sys.stdout): - except queue.Empty: - self._ru_log.warn('recv timed out') - return '' + with _out_lock: + stream.write(txt) + stream.flush() - # -------------------------------------------------------------------------- - # - def is_alive(self, strict=True): - ''' - Since our threads are daemon threads we don't need to wait for them to - actually die, but consider it sufficient to have the terminationm signal - set. However, that can potentially cause unexpected results when locks - are used, as one would not expect to have locks hold by threads which - are not alive. For that reason we default to the 'true' (strict) alive - notion of the native Python thread - but the non-strict version can be - requates via `strict=False`. - ''' - - alive = super(Thread, self).is_alive() - termed = self._ru_term.is_set() - - if strict: - return alive - else: - return alive and not termed +# ------------------------------------------------------------------------------ +# +def Event(*args, **kwargs): + return threading.Event(*args, **kwargs) - # -------------------------------------------------------------------------- - # - def is_valid(self, term=True): - ''' - This method provides a self-check, and will call `stop()` if that check - fails and `term` is set. If `term` is not set, the return value of - `is_alive()` is passed through. - - The check will basically assert that `is_alive()` is `True`. The - purpose is to call this check frequently, e.g. at the begin of each - method invocation and during longer loops, as to catch failing - sub-thread and to then terminate. - - This method will always return True if `start()` was not called, yet. - ''' - - if not self._ru_local.initialized: - return True - - alive = self.is_alive(strict=False) - - if not alive and term: - self._ru_log.warn('alive check failed - stop [%s - %s]', alive, term) - self.stop() - else: - return alive +# ------------------------------------------------------------------------------ +# +class RLock(object): + """ + This threading.RLock wrapper is supportive of lock debugging. The only + semantic difference to threading.RLock is that a lock acquired via the + 'with' statement can be released within the 'with' scope, w/o penalty when + leaving the locked scope. This supports up-calling callback semantics, but + should be used with utter care, and rarely (such as on close()). + see http://stackoverflow.com/questions/6780613/ + is-it-possible-to-subclass-lock-objects-in-python-if-not-other-ways-to-debug + """ # -------------------------------------------------------------------------- # - def start(self, spawn=True, timeout=None): - ''' - Overload the `mt.Thread.start()` method, and block (with timeout) until - the child signals to be alive via a message over our queue. Also - run all initializers. - - If spawn is set to `False`, then no child thread is actually created, - but the parent initializers will be executed nonetheless. - - ''' - - self._ru_log.debug('start thread') - - if timeout is None: - timeout = _START_TIMEOUT - - if spawn: - # start `self.run()` in the child process, and wait for it's - # initalization to finish, which will send the 'alive' message. - super(Thread, self).start() - self._ru_local.spawned = True - - # this is the parent now - set role flags. - self._ru_local.is_parent = True - self._ru_local.is_child = False - - # from now on we need to invoke `self.stop()` for clean termination. - try: - - if self._ru_local.spawned: - # we expect an alive message message from the child, within - # timeout - # - # NOTE: If the child does not bootstrap fast enough, the timeout - # will kick in, and the child will be considered dead, - # failed and/or hung, and will be terminated! Timeout can - # be set as parameter to the `start()` method. - msg = self._ru_msg_recv(timeout=timeout) - - if msg != _ALIVE_MSG: - raise RuntimeError('Unexpected child message from %s (%s)' \ - % (self._ru_local.name, msg)) - - - # When we got the alive messages, only then will we call the parent - # initializers. This way those initializers can make some - # assumptions about successful child startup. - self._ru_initialize() - - except Exception as e: - self._ru_log.exception('%s initialization failed', self._ru_local.name) - self.stop() - raise + def __init__(self, obj=None): - # if we got this far, then all is well, we are done. - self._ru_log.debug('child thread %s started', self._ru_local.name) + self._lock = threading.RLock() - # child is alive and initialized, parent is initialized - Wohoo! + # with self._lock: + # self._obj = obj + # self._cnt = 0 # -------------------------------------------------------------------------- # - def run(self): - ''' - This method MUST NOT be overloaded! - - This is the workload of the child thread. It will first call the child - initializers and then repeatedly call `self.work_cb()`, until being - terminated. When terminated, it will call the child finalizers and - exit. Note that the child will also terminate once `work_cb()` returns - `False`. - - The implementation of `work_cb()` needs to make sure that this thread is - not spinning idly -- if there is nothing to do in `work_cb()` at any point - in time, the routine should at least sleep for a fraction of a second or - something. - - The child thread will automatically terminate (incl. finalizer calls) - when the parent thread dies. It is thus not possible to create orphaned - threads -- which is an explicit purpose of this implementation. - ''' - - # set local data - self._ru_local.name = self._ru_name + '.thread' - self._ru_local.started = True # start() has been called - self._ru_local.spawned = True # set in start(), run() - self._ru_local.initialized = False # set to signal bootstrap success - self._ru_local.terminating = False # set to signal active termination - self._ru_local.is_parent = False # set in start() - self._ru_local.is_child = True # set in run() - - # if no profiling is wanted, we just run the workload and exit - if not self._ru_cprofile: - self._run() - - # otherwise we run under the profiler, obviously - else: - import cprofile - cprofiler = cProfile.Profile() - cprofiler.runcall(self._run) - cprofiler.dump_stats('%s.cprof' % (self._ru_local.name)) - - - # -------------------------------------------------------------------------- - # - def _run(self): - - # FIXME: ensure that this is not overloaded - # TODO: how? - - try: - # we consider the invocation of the child initializers to be part of - # the bootstrap process, which includes starting the watcher thread - # to watch the parent's health (via the socket healt). - try: - self._ru_initialize() - - except BaseException as e: - self._ru_log.exception('abort') - self._ru_msg_send(repr(e)) - sys.stderr.write('initialization error in %s: %s\n' - % (self._ru_local.name, repr(e))) - sys.stderr.flush() - self._ru_term.set() - - # initialization done - we only now send the alive signal, so the - # parent can make some assumptions about the child's state - if not self._ru_term.is_set(): - self._ru_log.info('send alive') - self._ru_msg_send(_ALIVE_MSG) - - # enter the main loop and repeatedly call 'work_cb()'. - # - # If `work_cb()` ever returns `False`, we break out of the loop to call the - # finalizers and terminate. - # - # In each iteration, we also check if the Event is set -- if this is - # the case, we assume the parent to be dead and terminate (break the - # loop). - while not self._ru_term.is_set(): - - # des Pudel's Kern - if not self.work_cb(): - self._ru_msg_send('work finished') - break - - except BaseException as e: - - # This is a very global except, also catches - # sys.exit(), keyboard interrupts, etc. - # Ignore pylint and PEP-8, we want it this way! - self._ru_log.error('abort: %s', repr(e)) - self._ru_msg_send(repr(e)) - - try: - # note that we always try to call the finalizers, even if an - # exception got raised during initialization or in the work loop - # initializers failed for some reason or the other... - self._ru_finalize() + def acquire(self): - except BaseException as e: - self._ru_log.exception('finalization error') - self._ru_msg_send('finalize(): %s' % repr(e)) + # ind = (self._cnt)*' '+'>'+(30-self._cnt)*' ' + # lout("%s -- %-10s %50s acquire - %s\n" % (ind, threading.current_thread().name, self, self._lock)) - self._ru_msg_send('terminating') + self._lock.acquire() - # all is done and said - begone! - return + # self._cnt += 1 + # ind = (self._cnt)*' '+'|'+(30-self._cnt)*' ' + # lout("%s %-10s %50s acquired - %s\n" % (ind, threading.current_thread().name, self, self._lock)) # -------------------------------------------------------------------------- # - def stop(self, timeout=None): - ''' - `stop()` is symetric to `start()`, in that it can only be called by the - parent thread - - - NOTE: `stop()` implies `join()`! Use `terminate()` if that is not - wanted. - ''' - - # FIXME: This method should reduce to - # self.terminate(timeout) - # self.join(timeout) - # ie., we should move some parts to `terminate()`. - - if not hasattr(self._ru_local, 'name'): - - # This thread is now being stopped from *another* thread, ie. - # neither the parent nor the child thread, so we can't access either - # thread local storage. we thus only set the termination signal - # (which is accessible), and leave all other handling to somebody - # else. - # - # Specifically, we will not be able to join this thread, and no - # timeout is enforced - self._ru_log.info('signal stop for %s - do not join', self._ru_name) - self._ru_term.set() - return - - if not timeout: - timeout = _STOP_TIMEOUT - - # if stop() is called from the child, we just set term and leave all - # other handling to the parent. - if self._ru_local.is_child: - self._ru_log.info('child calls stop()') - self._ru_term.set() - # cancel_main_thread() - return - - self._ru_log.info('parent stops child') - - # make sure we don't recurse - if self._ru_local.terminating: - return - self._ru_local.terminating = True - - # call finalizers - this sets `self._ru_term` - self._ru_finalize() - - # After setting the termination event, the child should begin - # termination immediately. Well, whenever it realizes the event is set, - # really. We wait for that termination to complete. - self.join() - - - # -------------------------------------------------------------------------- - # - def join(self, timeout=None): - - # raise RuntimeError('call stop instead!') - # - # we can't really raise the exception above, as that would break symmetry - # with the Process class -- see documentation there. - # - # FIXME: not that `join()` w/o `stop()` will not call the parent - # finalizers. We should call those in both cases, but only once. - - if not hasattr(self._ru_local, 'name'): - - # This thread is now being stopped from *another* thread, ie. - # neither the parent nor the child thread, so we can't access either - # thread local storage. we thus only set the termination signal - # (which is accessible), and leave all other handling to somebody - # else. - # - # Specifically, we will not be able to join this thread, and no - # timeout is enforced - self._ru_log.info('signal stop for %s - do not join', self._ru_name) - self._ru_term.set() - return - - if not timeout: - timeout = _STOP_TIMEOUT - - if self._ru_local.is_parent: - try: - super(Thread, self).join(timeout=timeout) - except Exception as e: - self._ru_log.warn('ignoring %s' % e) - + def release(self): - # -------------------------------------------------------------------------- - # - def _ru_initialize(self): - ''' - Perform basic settings, then call common and parent/child initializers. - ''' + # ind = (self._cnt)*' '+'-'+(30-self._cnt)*' ' + # lout("%s %-10s %50s release - %s\n" % (ind, threading.current_thread().name, self, self._lock)) try: - # call parent and child initializers, respectively - if self._ru_local.is_parent: - self._ru_initialize_common() - self._ru_initialize_parent() - - self.ru_initialize_common() - self.ru_initialize_parent() - - elif self._ru_local.is_child: - self._ru_initialize_common() - self._ru_initialize_child() - - self.ru_initialize_common() - self.ru_initialize_child() - - self._ru_local.initialized = True - - except Exception as e: - self._ru_log.exception('initialization error') - raise RuntimeError('initialize: %s' % repr(e)) - - - # -------------------------------------------------------------------------- - # - def _ru_initialize_common(self): + self._lock.release() + except RuntimeError as e: + # lock has been released meanwhile - we allow that + # print 'ignore double lock release' + pass - pass + # self._cnt -= 1 + # ind = (self._cnt)*' '+'<'+(30-self._cnt)*' ' + # lout("%s -- %-10s %50s released - %s\n" % (ind, threading.current_thread().name, self, self._lock)) # -------------------------------------------------------------------------- # - def _ru_initialize_parent(self): + def __enter__(self) : self.acquire() + def __exit__ (self, type, value, traceback): self.release() - pass +# ------------------------------------------------------------------------------ +# +class Thread(threading.Thread): + """ + This `Thread` class is a thin wrapper around Python's native + `threading.Thread` class, which adds some convenience methods. + """ # -------------------------------------------------------------------------- # - def _ru_initialize_child(self): + def __init__(self, call, *args, **kwargs): - # TODO: should we also get an alive from parent? - # - # FIXME: move to _ru_initialize_common - # + if not callable(call): + raise ValueError("Thread requires a callable to function, not %s" \ + % (str(call))) - self._ru_log.info('child (me) initializing') + threading.Thread.__init__(self) - - # -------------------------------------------------------------------------- - # - def ru_initialize_common(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, for both the parent and the child process (individually). If - this fails on either side, the process startup is considered failed. - ''' - - self._ru_log.debug('ru_initialize_common (NOOP)') + self._call = call + self._args = args + self._kwargs = kwargs + self._state = NEW + self._result = None + self._exception = None + self._traceback = None + self.daemon = True # -------------------------------------------------------------------------- # - def ru_initialize_parent(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, in the parent process. If this fails, the process startup is - considered failed. - ''' + @classmethod + def Run(self, call, *args, **kwargs): - self._ru_log.debug('ru_initialize_parent (NOOP)') + t = self(call, *args, **kwargs) + t.start() + return t # -------------------------------------------------------------------------- # - def ru_initialize_child(self): - ''' - This method can be overloaded, and will then be executed *once* during - `start()`, in the child process. If this fails, the process startup is - considered failed. - ''' - - self._ru_log.debug('ru_initialize_child (NOOP)') + @property + def tid(self): + return self.tid # -------------------------------------------------------------------------- # - def _ru_finalize(self): - ''' - Call common and parent/child initializers. - - Note that finalizers are called in inverse order of initializers. - ''' + def run(self): try: - # signal termination - self._ru_term.set() - - # call parent and child finalizers, respectively - if self._ru_local.is_parent: - self.ru_finalize_parent() - self.ru_finalize_common() - - self._ru_finalize_parent() - self._ru_finalize_common() - - elif self._ru_local.is_child: - self.ru_finalize_child() - self.ru_finalize_common() - - self._ru_finalize_child() - self._ru_finalize_common() + self._state = RUNNING + self._result = self._call(*self._args, **self._kwargs) + self._state = DONE except Exception as e: - self._ru_log.exception('finalization error') - raise RuntimeError('finalize: %s' % repr(e)) + tb = traceback.format_exc() + self._traceback = tb + self._exception = e + self._state = FAILED # -------------------------------------------------------------------------- # - def _ru_finalize_common(self): - - pass + def wait(self): - - # -------------------------------------------------------------------------- - # - def _ru_finalize_parent(self): - - pass + if self.isAlive(): + self.join() # -------------------------------------------------------------------------- # - def _ru_finalize_child(self): - + def cancel(self): + # FIXME: this is not really implementable generically, so we ignore + # cancel requests for now. pass # -------------------------------------------------------------------------- # - def ru_finalize_common(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the parent process, in both - the parent and the child process (individually). - ''' + def get_state(self): + return self._state - self._ru_log.debug('ru_finalize_common (NOOP)') + state = property(get_state) # -------------------------------------------------------------------------- # - def ru_finalize_parent(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the parent process. - ''' + def get_result(self): - self._ru_log.debug('ru_finalize_parent (NOOP)') + if self._state == DONE: + return self._result + return None - # -------------------------------------------------------------------------- - # - def ru_finalize_child(self): - ''' - This method can be overloaded, and will then be executed *once* during - `stop()` or process child termination, in the child process. - ''' - - self._ru_log.debug('ru_finalize_child (NOOP)') + result = property(get_result) # -------------------------------------------------------------------------- # - def work_cb(self): - ''' - This method MUST be overloaded. It represents the workload of the - process, and will be called over and over again. - - This has several implications: - - * `work_cb()` needs to enforce any call rate limits on its own! - * in order to terminate the child, `work_cb()` needs to either raise an - exception, or call `sys.exit()` (which actually also raises an - exception). + def get_exception(self): - Before the first invocation, `self.ru_initialize_child()` will be called. - After the last invocation, `self.ru_finalize_child()` will be called, if - possible. The latter will not always be possible if the child is - terminated by a signal, such as when the parent process calls - `child.terminate()` -- `child.stop()` should be used instead. + return self._exception - The overloaded method MUST return `True` or `False` -- the child will - continue to work upon `True`, and otherwise (on `False`) begin - termination. - ''' - - raise NotImplementedError('ru.Thread.work_cb() MUST be overloaded') - - - - -# ------------------------------------------------------------------------------ -# -# thread-related utility classes and methods -# -class RLock(object): - """ - This mt.RLock wrapper is supportive of lock debugging. The only - semantic difference to mt.RLock is that a lock acquired via the - 'with' statement can be released within the 'with' scope, w/o penalty when - leaving the locked scope. This supports up-calling callback semantics, but - should be used with utter care, and rarely (such as on close()). - - see http://stackoverflow.com/questions/6780613/ - """ - - # -------------------------------------------------------------------------- - # - def __init__(self, obj=None): - - self._lock = mt.RLock() - - - # -------------------------------------------------------------------------- - # - def acquire(self): - - self._lock.acquire() - - - # -------------------------------------------------------------------------- - # - def release(self): - - try: - self._lock.release() - - except RuntimeError as e: - # lock has been released meanwhile - we allow that - pass + exception = property(get_exception) # -------------------------------------------------------------------------- # - def __enter__(self) : self.acquire() - def __exit__ (self, type, value, traceback): self.release() - - - -# ------------------------------------------------------------------------------ -# -def get_thread_name(): - - return mt.current_thread().name + def get_traceback(self): + return self._traceback -# ------------------------------------------------------------------------------ -# -def get_thread_id(): - - return mt.current_thread().ident + traceback = property(get_traceback) # ------------------------------------------------------------------------------ # -def is_main_thread(t=None): +def is_main_thread(): - if t: - assert(isinstance(t, mt.Thread)) - else: - t = this_thread() - - return isinstance(t, mt._MainThread) + return isinstance(threading.current_thread(), threading._MainThread) # ------------------------------------------------------------------------------ # -def is_this_thread(t): - ''' - check if the given thread (type: threading.Thread) is the same as the - current thread of execution. - ''' - - assert(isinstance(t, mt.Thread)) - - return(t == this_thread()) - - -# ------------------------------------------------------------------------------ -# -def main_thread(): - ''' - return a handle to the main thread of execution in this process - ''' - - for t in mt.enumerate(): - if isinstance(t, mt._MainThread): - return T - - assert(False), 'main thread not found' - - -# ------------------------------------------------------------------------------ -# -def this_thread(): - ''' - return a handle to the current thread of execution - ''' - - return mt.current_thread() - - -# ------------------------------------------------------------------------------ -# -_signal_lock = mt.Lock() +_signal_lock = threading.Lock() _signal_sent = dict() def cancel_main_thread(signame=None, once=False): """ @@ -840,7 +252,7 @@ def cancel_main_thread(signame=None, once=False): main thread. We do so if `signal` is specified. After sending the signal, any sub-thread will call sys.exit(), and thus - finish. We leave it to the main thread though to decide if it will exit at + finish. We leave it to the main thread thogh to decide if it will exit at this point or not. Either way, it will have to handle the signal first. If `once` is set to `True`, we will send the given signal at most once. @@ -851,9 +263,11 @@ def cancel_main_thread(signame=None, once=False): global _signal_lock global _signal_sent + if signame: signal = get_signal_by_name(signame) else : signal = None + with _signal_lock: if once: @@ -868,15 +282,12 @@ def cancel_main_thread(signame=None, once=False): # this sends a SIGINT, resulting in a KeyboardInterrupt. # NOTE: see http://bugs.python.org/issue23395 for problems on using # SIGINT in combination with signal handlers! - try: - thread.interrupt_main() - except TypeError: - # this is known to be a side effect of `thread.interrup_main()` - pass + thread.interrupt_main() # record the signal sending _signal_sent[signal] = True + # the sub thread will at this point also exit. if not is_main_thread(): sys.exit() @@ -944,6 +355,20 @@ def __init__(self, msg, signum=None): SystemExit.__init__(self, msg) +# ------------------------------------------------------------------------------ +# +def get_thread_name(): + + return threading.current_thread().name + + +# ------------------------------------------------------------------------------ +# +def get_thread_id(): + + return threading.current_thread().ident + + # ------------------------------------------------------------------------------ # def raise_in_thread(e=None, tname=None, tident=None): @@ -955,8 +380,7 @@ def raise_in_thread(e=None, tname=None, tident=None): The target thread will receive the exception with some delay. More specifically, it needs to call up to 100 op codes before the exception - is evaluated and raised. The thread interruption can thus be delayed - significantly, like when the thread sleeps. + is evaluated and raised. The default exception raised is 'radical.utils.ThreadExit' which inherits from 'SystemExit'. @@ -980,7 +404,7 @@ def raise_in_thread(e=None, tname=None, tident=None): if not tname: tname = 'MainThread' - for th in mt.enumerate(): + for th in threading.enumerate(): if tname == th.name: tident = th.ident break @@ -991,7 +415,7 @@ def raise_in_thread(e=None, tname=None, tident=None): if not e: e = ThreadExit - self_thread = mt.current_thread() + self_thread = threading.current_thread() if self_thread.ident == tident: # if we are in the target thread, we simply raise the exception. # This specifically also applies to the main thread. @@ -999,12 +423,71 @@ def raise_in_thread(e=None, tname=None, tident=None): raise e('raise_in_thread') else: - # otherwise we inject the exception into the main thread's async - # exception scheduler import ctypes ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tident), ctypes.py_object(e)) +# ------------------------------------------------------------------------------ +# +def fs_event_create(fname, msg=None): + """ + Atomically create a file at the given `fname` (relative to pwd), + with `msg` as content (or empty otherwise). + + NOTE: this expects a POSIX compliant file system to be accessible by the two + entities which use this fs event mechanism. + NOTE: this also assumes `os.rename()` to be an atomic operation. + """ + + pid = os.getpid() + + if not msg: + msg = '' + + with open('%s.%d.in' % (fname, pid), 'w') as f: + f.write(msg) + + os.rename('%s.%d.in' % (fname, pid), fname) + + +# ------------------------------------------------------------------------------ +# +def fs_event_wait(fname, timeout=None): + """ + Wait for a file ate the given `fname` to appear. Return `None` at timeout, + or the file content otherwise. + """ + + msg = None + pid = os.getpid() + start = time.time() + + while True: + + try: + with open(fname, 'r') as f: + msg = f.read() + except Exception as e: + print 'e: %s' % type(e) + print 'e: %s' % e + pass + + if msg != None: + try: + os.rename(fname, '%s.%d.out' % (fname, pid)) + os.unlink('%s.%d.out' % (fname, pid)) + except Exception as e: + # there is not much we can do at this point... + print 'unlink: %s' % e + pass + return msg + + if timeout and start + timeout <= time.time(): + return None + + time.sleep(0.1) + + # ------------------------------------------------------------------------------ diff --git a/tests/unittests/test_logger.py b/tests/unittests/test_logger.py index bc5adf28d..e4383df86 100644 --- a/tests/unittests/test_logger.py +++ b/tests/unittests/test_logger.py @@ -16,6 +16,7 @@ def test_singleton(): """ Test if the logger behaves like a singleton """ # make sure singleton works + assert ru.get_logger() == ru.get_logger() assert ru.get_logger('radical.utils') == ru.get_logger('radical.utils') def test_logger(): diff --git a/tests/unittests/test_process.py b/tests/unittests/test_process.py deleted file mode 100644 index e16acb5f4..000000000 --- a/tests/unittests/test_process.py +++ /dev/null @@ -1,233 +0,0 @@ - -__author__ = "Radical.Utils Development Team" -__copyright__ = "Copyright 2016, RADICAL@Rutgers" -__license__ = "MIT" - - -''' -Unit tests for ru.Process() -''' - -import os -import sys -import time - -import radical.utils as ru - - -# ------------------------------------------------------------------------------ -# -def test_process_basic(): - ''' - start a 'sleep 0.1', and expect this to finish within 0.x seconds - ''' - - class P(ru.Process): - def __init__(self): - return ru.Process.__init__(self, 'ru.test') - def work_cb(self): - time.sleep(0.2) - return False - - p = P() ; t1 = time.time() - p.start() ; t2 = time.time() - p.join() ; t3 = time.time() - - assert(t2-t1 > 0.0), t2-t1 - assert(t2-t1 < 0.2), t2-t1 # process startup should be quick - assert(t3-t2 > 0.2), t3-t2 # expect exactly one work iteration - assert(t3-t2 < 0.4), t3-t2 - - -# ------------------------------------------------------------------------------ -# -def test_process_autostart(): - ''' - start the child process on __init__() - ''' - - class P(ru.Process): - def __init__(self): - - self._initalize_common = False - self._initalize_parent = False - self._initalize_child = False - - self._finalize_common = False - self._finalize_parent = False - self._finalize_child = False - - self._work_done = False - - ru.Process.__init__(self, 'ru.test') - - self.start() - - assert(self._initialize_common), 'no initialize common' - assert(self._initialize_parent), 'no initialize parent' - - self.join() # wait until work is done - self.stop() # make sure finalization happens - - assert(self._finalize_common), 'no finalize common' - assert(self._finalize_parent), 'no finalize parent' - - def ru_initialize_common(self): self._initialize_common = True - def ru_initialize_parent(self): self._initialize_parent = True - def ru_initialize_child (self): self._initialize_child = True - - def ru_finalize_common(self) : self._finalize_common = True - def ru_finalize_parent(self) : self._finalize_parent = True - def ru_finalize_child (self) : self._finalize_child = True - - def work_cb(self): - assert(self._initialize_common), 'no initialize common' - assert(self._initialize_child), 'no initialize child' - self._work_done = True - print 'work' - return False # only run once - - p = P() - - -# ------------------------------------------------------------------------------ -# -def test_process_init_fail(): - ''' - make sure the parent gets notified on failing init - ''' - - class P(ru.Process): - def __init__(self): - return ru.Process.__init__(self, 'ru.test') - def ru_initialize_child(self): - raise RuntimeError('oops init') - def work_cb(self): - time.sleep(0.1) - return True - - try: - p = P() - p.start() - except RuntimeError as e: - assert('oops init' in str(e)), str(e) - else: - assert(False), 'missing exception' - - assert(not p.is_alive()) - - -# ------------------------------------------------------------------------------ -# -def test_process_final_fail(): - ''' - make sure the parent gets notified on failing finalize - ''' - - class P(ru.Process): - def __init__(self): - return ru.Process.__init__(self, 'ru.test') - def ru_initialize_child(self): - self.i = 0 - def work_cb(self): - self.i += 1 - if self.i == 5: - time.sleep(0.1) - return False - return True - def ru_finalize_child(self): - raise RuntimeError('oops final') - - try: - p = P() - p.start() - p.stop() - except Exception as e: - assert('oops final' in str(e)), str(e) - else: - pass - # assert(False) - - assert(not p.is_alive()) - - -# ------------------------------------------------------------------------------ -# -def test_process_parent_fail(): - ''' - make sure the child dies when the parent dies - ''' - - class Parent(ru.Process): - - def __init__(self): - ru.Process.__init__(self, name='ru.test') - - - def ru_initialize_child(self): - self._c = Child() - self._c.start() - assert(self._c.is_alive()) - - def work_cb(self): - sys.exit() # parent dies - - def ru_finalize_child(self): - # # below is what's needed for *clean* termination - # self._c.stop() - pass - - - class Child(ru.Process): - - def __init__(self): - with open('/tmp/c_pid.%d' % os.getuid(), 'w') as f: - f.write(str(os.getpid())) - ru.Process.__init__(self, name='ru.test.child') - - def work_cb(self): - return True - - - p = Parent() - p.start() - with open('/tmp/c_pid.%d' % os.getuid(), 'r') as f: - c_pid = int(f.read().strip()) - os.unlink('/tmp/c_pid.%d' % os.getuid()) - os.kill(p.pid, 9) - - # leave some time for child to die - time.sleep(0.01) - try: - os.kill(c_pid, 0) - except OSError as e: - pass # child is gone - except: - assert(False) - - assert(not p.is_alive()) - - -# ------------------------------------------------------------------------------ -# run tests if called directly -if __name__ == "__main__": - - N = 1 - - test_process_autostart() - for i in range(N): - test_process_final_fail() - print '.', - test_process_init_fail() - print '.', - test_process_parent_fail() - print '.', - test_process_basic() - print '.', - print i - - sys.exit() - - -# ------------------------------------------------------------------------------ - diff --git a/tests/unittests/test_signatures.py b/tests/unittests/test_signatures.py index 3c0ec7727..8207f3ba5 100644 --- a/tests/unittests/test_signatures.py +++ b/tests/unittests/test_signatures.py @@ -7,32 +7,32 @@ import radical.utils.signatures as rus -# # ------------------------------------------------------------------------------ -# @rus.takes (basestring, int, rus.optional (float)) -# @rus.returns (int) -# def sigtest (string, intger, float=3.1415926) : -# return 1 -# -# # ------------------------------------------------------------------------------ -# def test_signatures () : -# """ -# Test if signature violations are flagged -# """ -# -# try : ret = sigtest ('string', 2.4, 'hallo') -# except TypeError as e : pass -# except Exception as e : assert (False), "TypeError != %s (%s)" % (type(e), e) -# else : assert (False), "expected TypeError exception, got none" -# -# try : ret = sigtest ('string', 2, 1.1414) -# except Exception as e : assert (False), "exception %s: %s" % (type(e), e) -# -# -# # ------------------------------------------------------------------------------ -# # run tests if called directly -# if __name__ == "__main__": -# -# test_signatures () -# -# # ------------------------------------------------------------------------------ +# ------------------------------------------------------------------------------ +@rus.takes (basestring, int, rus.optional (float)) +@rus.returns (int) +def sigtest (string, intger, float=3.1415926) : + return 1 + +# ------------------------------------------------------------------------------ +def test_signatures () : + """ + Test if signature violations are flagged + """ + + try : ret = sigtest ('string', 2.4, 'hallo') + except TypeError as e : pass + except Exception as e : assert (False), "TypeError != %s (%s)" % (type(e), e) + else : assert (False), "expected TypeError exception, got none" + + try : ret = sigtest ('string', 2, 1.1414) + except Exception as e : assert (False), "exception %s: %s" % (type(e), e) + + +# ------------------------------------------------------------------------------ +# run tests if called directly +if __name__ == "__main__": + + test_signatures () + +# ------------------------------------------------------------------------------