From d07708cfcb06828e63b19681acaf1489e3700882 Mon Sep 17 00:00:00 2001 From: Tibor Simko Date: Tue, 12 Aug 2014 11:52:22 +0200 Subject: [PATCH] global: move to Unix style line terminators * Moves all files to use Unix style line terminators, since most development is being done on Unix like platforms. Signed-off-by: Tibor Simko --- workflow/config.py | 912 ++++++++++----------- workflow/engine.py | 1322 +++++++++++++++--------------- workflow/patterns/__init__.py | 42 +- workflow/patterns/controlflow.py | 722 ++++++++-------- workflow/patterns/utils.py | 1064 ++++++++++++------------ 5 files changed, 2031 insertions(+), 2031 deletions(-) diff --git a/workflow/config.py b/workflow/config.py index 9c19d1c..452141e 100644 --- a/workflow/config.py +++ b/workflow/config.py @@ -1,456 +1,456 @@ -import sys -import os -from configobj import Section, OPTION_DEFAULTS, ConfigObjError, ConfigObj -import inspect -import traceback - -""" -Provides a class for reading global configuration options -The reader in itself is using configobj to access the ini -files. The reader should be initialized (from the project -root) with the path of the folder where configuration files -live. -""" - -class CustomConfigObj(ConfigObj): - """This is very small change into the default ConfigObj class - - the only difference is in the parent_config parameter, if passed - we will add it to the new instance and interpolation will use the - values of the parent_config for lookup - """ - - def __init__(self, infile=None, options=None, configspec=None, encoding=None, - interpolation=True, raise_errors=False, list_values=True, - create_empty=False, file_error=False, stringify=True, - indent_type=None, default_encoding=None, unrepr=False, - write_empty_values=False, _inspec=False, parent_config=None): - """ - Parse a config file or create a config file object. - - ``ConfigObj(infile=None, configspec=None, encoding=None, - interpolation=True, raise_errors=False, list_values=True, - create_empty=False, file_error=False, stringify=True, - indent_type=None, default_encoding=None, unrepr=False, - write_empty_values=False, _inspec=False)`` - """ - self._inspec = _inspec - # init the superclass - # this is the only change - we pass the parent configobj if - # available, to have lookup use its values - Section.__init__(self, parent_config or self, 0, self) - - - infile = infile or [] - if options is not None: - import warnings - warnings.warn('Passing in an options dictionary to ConfigObj() is ', - 'deprecated. Use **options instead.', - DeprecationWarning, stacklevel=2) - - _options = {'configspec': configspec, - 'encoding': encoding, 'interpolation': interpolation, - 'raise_errors': raise_errors, 'list_values': list_values, - 'create_empty': create_empty, 'file_error': file_error, - 'stringify': stringify, 'indent_type': indent_type, - 'default_encoding': default_encoding, 'unrepr': unrepr, - 'write_empty_values': write_empty_values} - - options = dict(options or {}) - options.update(_options) - - # XXXX this ignores an explicit list_values = True in combination - # with _inspec. The user should *never* do that anyway, but still... - if _inspec: - options['list_values'] = False - - defaults = OPTION_DEFAULTS.copy() - # TODO: check the values too. - for entry in options: - if entry not in defaults: - raise TypeError('Unrecognised option "%s".' % entry) - - # Add any explicit options to the defaults - defaults.update(options) - self._initialise(defaults) - configspec = defaults['configspec'] - self._original_configspec = configspec - self._load(infile, configspec) - - - -class ConfigReader(object): - """Instance that facilitates easy reading/access to the configuration values - from .ini files - - Modules/workflows should not import this, but config instance - from workflow import config - - During instantion, reader loads the global config file - usually from ./cfg/global.ini - The values will be accessible as attributes, eg: - reader.BASEDIR - reader.sectionX.VAL - - When workflow/module is accessing an attribute, the reader will also load special - configuration (workflow-specific configuration) which has the same name as a - workflow/module. - - Example: workflow 'load_seman_components.py' - from merkur.config import reader - reader.LOCAL_VALUE # at this moment, reader will check if exists %basedir/etc/load_seman_components.ini - # if yes, the reader will load the configuration and store it inside ._local - # if no LOCAL_VALUE exists, error will be raised - # if in the meantime, some other module imported reader and tries to access - # an attribute, the reader will recognize the caller is different, will update - # the local config and will server workflow-specifi configuration automatically - - You can pass a list of basedir folders - in that case, only the last one will - be used for lookup of local configurations, but the global values will be inherited - from all global.ini files found in the basedir folders - """ - - def __init__(self, basedir=os.path.abspath(os.path.dirname(__file__)), - caching=True): - object.__init__(self) - self._local = {} - self._global = {} - self._on_demand = {} - self._recent_caller = '' - self._caching = caching - self._main_config = None - self._basedir = [] - - self.setBasedir(basedir) - - # load configurations - if isinstance(basedir, list) or isinstance(basedir, tuple): - files = [] - for d in self._basedir: - if os.path.exists(d): - files.append(os.path.abspath(os.path.join(d, 'global.ini'))) - self.update(files) - else: - self.update() - - def __getattr__(self, key): - """Returns cfg value - 1. first lookup in the local values - 2. then in the global values - """ - - - # first find out who is trying to access us - frame = inspect.currentframe().f_back - - # TODO - make it try the hierarchy first? - if frame: - cfile = self._getCallerPath(frame) - if cfile: - caller = self._getCallerName(cfile) - if caller != self._recent_caller: - # TODO: make it optional, allow for read-once-updates - self.update_local(caller) #update config - - - if key in self._local: - return self._local[key] - elif key in self._global: - return self._global[key] #raise error ok - else: - global_cfg_path = self._main_config and os.path.abspath(self._main_config.filename) or 'None' - local_cfg_path = self._findConfigPath(self._recent_caller) - raise AttributeError('Attribute "%s" not defined\nglobal_config: %s\nlocal_config: %s' % - (key, global_cfg_path, local_cfg_path)) - - def _getCallerId(self, frame): - if frame: - cfile = self._getCallerPath(frame) - if cfile: - caller = self._getCallerName(cfile) - return caller - - - def getBaseDir(self): - return self._basedir - - def setBasedir(self, basedir): - """Sets a new basedir path - this is a root of the configuration - directives from which other paths are resolved""" - if not (isinstance(basedir, list) or isinstance(basedir, tuple)): - basedir = [basedir] - new_base = [] - for b in basedir: - b = os.path.abspath(b) - if b[0] != '\\': - b = b.replace('\\', '/') - b = b[0].lower() + b[1:] - if b not in new_base: - new_base.append(b) - self._basedir = new_base - self.update() - - def update(self, files=None, replace_keys={}): - """Updates values reading them from the main configuration file(s) - @var files: list of configuration files (if empty, default file is read) - @keyword replace_keys: dictionary of values that you want to replace - this allows you to change config at runtime, but IT IS NOT - RECOMMENDED to change anything else than global values (and you can - change only top level values). If you don't know what you are doing, - do not replace any keys! - """ - if files is None: - files = self._makeAllConfigPaths('global') - - updated = 0 - for file in files: - if os.path.exists(file): - # if we have more files, we will wrap/inherit them into one object - # this object should not be probably usef for writing - config = self._main_config = CustomConfigObj(file, - encoding='UTF8', - parent_config = self._main_config) - if replace_keys: - for k, v in replace_keys.items(): - if k in config: - config[k] = v - self._update(self._global, config) - updated += 1 - return updated - - def init(self, filename): - if not os.path.exists(filename): - filename = self._findConfigPath(filename) - caller = self._getCallerId(inspect.currentframe().f_back) - if not(self.update_local(caller, filename)): - raise Exception('Config file: %s does not exist' % filename) - - def update_local(self, name, file=None): - """Update the local, workflow-specific cache - @var name: name of the calling module (without suffix) - @keyword file: file to load config from (if empty, default ini file will be sought) - """ - self._recent_caller = name - self._local = {} - if file is None: - file = self._findConfigPath(name) - - if file and os.path.exists(file): - config = CustomConfigObj(file, - encoding='UTF8', - parent_config = self._main_config) - - self._update(self._local, config) - return True - - def load(self, cfgfile, force_reload=False, failonerror=True, - replace_keys={}): - """Loads configuration file on demand - @var cfgfile: path to the file, the path may be relative, in - that case we will try to guess it using set basedir. Or it - can be absolute - @keyword force_reload: returns cached configuration or reloads - it again from file if force_reload=True - @keyword failonerror: bool, raise Exception when config file - is not found/loaded - @return: config object or None - - example: - c = config.load('some-file.txt') - c.some.key - """ - realpath = None - if os.path.exists(cfgfile): - realpath = cfgfile - else: - new_p = self._findConfigPath(cfgfile) - if new_p: - realpath = new_p - else: - new_p = self._findConfigPath(cfgfile.rsplit('.', 1)[0]) - if new_p: - realpath = new_p - - if not realpath: - if failonerror: - raise Exception('Cannot find: %s' % cfgfile) - else: - sys.stderr.write('Cannot find: %s' % cfgfile) - return - - if realpath in self._on_demand and not force_reload: - return ConfigWrapper(realpath, self._on_demand[realpath]) - - try: - config = CustomConfigObj(realpath, - encoding='UTF8', - parent_config = self._main_config) - if replace_keys: - for k, v in replace_keys.items(): - if k in config: - config[k] = v - except ConfigObjError, msg: - if failonerror: - raise ConfigObjError(msg) - else: - self.traceback.print_exc() - return - - self._on_demand[realpath] = {} - self._update(self._on_demand[realpath], config) - return ConfigWrapper(realpath, self._on_demand[realpath]) - - - def get(self, key, failonerror=True): - """Gets value from the key identified by string, eg. 'index.dir'""" - parts = key.split('.') - pointer = self - try: - for p in parts: - pointer = getattr(pointer, p) - return pointer - except (KeyError, AttributeError): - global_cfg_path = self._main_config and os.path.abspath(self._main_config.filename) or 'None' - local_cfg_path = self._findConfigPath(self._recent_caller) - m = 'Attribute "%s" not defined\nglobal_config: %s\nlocal_config: %s' % \ - (key, global_cfg_path, local_cfg_path) - if failonerror: - raise AttributeError(m) - else: - sys.stderr.write(m) - - - def _getCallerPath(self, frame): - cfile = os.path.abspath(inspect.getfile(frame)).replace('\\', '/') - f = __file__.replace('\\', '/') - if f != cfile: - return cfile - - - def _getCallerName(self, path): - cfile = os.path.split(path)[1] - return cfile.rsplit('.', 1)[0] - - - def getCallersConfig(self, failonerror=True): - """Gets the value from the calling workflow configuration - - this is useful if we want to access configuration of the object - that included us - @var key: name of the key to access, it is a string in a dot notation - """ - - # first find out who is trying to access us - caller = '' - frame = inspect.currentframe().f_back - frame = inspect.currentframe().f_back - if frame: - frame = frame.f_back - if frame: - caller = self._getCallerName(self._getCallerPath(frame)) - path = self._findConfigPath(caller) - if path: - config = self.load(path) - if config: - return config - if failonerror: - raise Exception('Error, cannot find the caller') - - def _findConfigPath(self, name): - """Finds the most specific config path""" - for path in reversed(self._makeAllConfigPaths(name)): - if os.path.exists(path): - return path - - def _makeAllConfigPaths(self, name): - f = [] - for d in self._basedir: - path = '%s/%s.ini' % (d, name) - f.append(path.replace('\\', '/')) - return f - - def _update(self, pointer, config): - for key, cfg_val in config.items(): - if isinstance(cfg_val, Section): - o = cfgval() - for k,v in cfg_val.items(): - if isinstance(v, Section): - o2 = cfgval() - o.__setattr__(k, o2) - self._update(o2, v) - else: - o.__setattr__(k, v) - pointer[key] = o - else: - pointer[key] = cfg_val - - - def __str__(self): - """Returns textual representation of the current config - which can be used for special purposes (ie. to save values - somewhere and reload them -- however, they will be a simple - dictionaries of textual values; without special powers. This - class also does not provide ways to load such dumped values, - we would be circumventing configobj and that is no good. - """ - return """{'global_config' : %s, - 'local_config' : %s, - 'on_demand_config': %s, - 'recent_caller' : '%s'}""" % (self._global, - self._local, - self._on_demand, - self._recent_caller) - - -class cfgval(object): - def __init__(self): - object.__init__(self) - def __setattr__(self, k, v): - self.__dict__[k] = v - def __repr__(self): - #return '{%s}' % ',\n'.join(map(lambda o: "'.%s': %s" % (o[0], repr(o[1])), self.__dict__.items())) - return '%s\n%s' % ('#cfgwrapper', repr(self.__dict__)) - def __iter__(self): - return iter(self.__dict__) - def __setitem__(self, k, v): - self.__setattr__(k, v) - def items(self): - return self.__dict__.items() - def keys(self): - return self.__dict__.keys() - def values(self): - return self.__dict__.values() - -class ConfigWrapper(object): - def __init__(self, realpath, config): - self.__dict__['_config'] = config - self.__dict__['_realpath'] = realpath - def __getattr__(self, key): - return self._config[key] - def __setattr__(self, key, value): - self._config.__setitem__(key, value) - def get(self, key): - parts = key.split('.') - pointer = self - for p in parts: - pointer = getattr(pointer, p) - return pointer - def __str__(self): - return "%s #config from: %s" % (self._config, self._realpath) - - - -# The config instance is a configuration reader -# The configuration can sit in different places - -__cfgdir = None -# set by environmental variable -if 'WORKFLOWCFG' in os.environ: - __cfgdir = os.environ['WORKFLOWCFG'] - if os.pathsep in __cfgdir: - __cfgdir = __cfgdir.split(os.pathsep) -else: - __cfgdir = [os.path.abspath(os.path.dirname(__file__)), - os.path.abspath(os.path.dirname(__file__) + '/cfg')] - - -# This instance has access to all global/local config options -config_reader = ConfigReader(basedir=__cfgdir) - +import sys +import os +from configobj import Section, OPTION_DEFAULTS, ConfigObjError, ConfigObj +import inspect +import traceback + +""" +Provides a class for reading global configuration options +The reader in itself is using configobj to access the ini +files. The reader should be initialized (from the project +root) with the path of the folder where configuration files +live. +""" + +class CustomConfigObj(ConfigObj): + """This is very small change into the default ConfigObj class + - the only difference is in the parent_config parameter, if passed + we will add it to the new instance and interpolation will use the + values of the parent_config for lookup + """ + + def __init__(self, infile=None, options=None, configspec=None, encoding=None, + interpolation=True, raise_errors=False, list_values=True, + create_empty=False, file_error=False, stringify=True, + indent_type=None, default_encoding=None, unrepr=False, + write_empty_values=False, _inspec=False, parent_config=None): + """ + Parse a config file or create a config file object. + + ``ConfigObj(infile=None, configspec=None, encoding=None, + interpolation=True, raise_errors=False, list_values=True, + create_empty=False, file_error=False, stringify=True, + indent_type=None, default_encoding=None, unrepr=False, + write_empty_values=False, _inspec=False)`` + """ + self._inspec = _inspec + # init the superclass + # this is the only change - we pass the parent configobj if + # available, to have lookup use its values + Section.__init__(self, parent_config or self, 0, self) + + + infile = infile or [] + if options is not None: + import warnings + warnings.warn('Passing in an options dictionary to ConfigObj() is ', + 'deprecated. Use **options instead.', + DeprecationWarning, stacklevel=2) + + _options = {'configspec': configspec, + 'encoding': encoding, 'interpolation': interpolation, + 'raise_errors': raise_errors, 'list_values': list_values, + 'create_empty': create_empty, 'file_error': file_error, + 'stringify': stringify, 'indent_type': indent_type, + 'default_encoding': default_encoding, 'unrepr': unrepr, + 'write_empty_values': write_empty_values} + + options = dict(options or {}) + options.update(_options) + + # XXXX this ignores an explicit list_values = True in combination + # with _inspec. The user should *never* do that anyway, but still... + if _inspec: + options['list_values'] = False + + defaults = OPTION_DEFAULTS.copy() + # TODO: check the values too. + for entry in options: + if entry not in defaults: + raise TypeError('Unrecognised option "%s".' % entry) + + # Add any explicit options to the defaults + defaults.update(options) + self._initialise(defaults) + configspec = defaults['configspec'] + self._original_configspec = configspec + self._load(infile, configspec) + + + +class ConfigReader(object): + """Instance that facilitates easy reading/access to the configuration values + from .ini files + + Modules/workflows should not import this, but config instance + from workflow import config + + During instantion, reader loads the global config file - usually from ./cfg/global.ini + The values will be accessible as attributes, eg: + reader.BASEDIR + reader.sectionX.VAL + + When workflow/module is accessing an attribute, the reader will also load special + configuration (workflow-specific configuration) which has the same name as a + workflow/module. + + Example: workflow 'load_seman_components.py' + from merkur.config import reader + reader.LOCAL_VALUE # at this moment, reader will check if exists %basedir/etc/load_seman_components.ini + # if yes, the reader will load the configuration and store it inside ._local + # if no LOCAL_VALUE exists, error will be raised + # if in the meantime, some other module imported reader and tries to access + # an attribute, the reader will recognize the caller is different, will update + # the local config and will server workflow-specifi configuration automatically + + You can pass a list of basedir folders - in that case, only the last one will + be used for lookup of local configurations, but the global values will be inherited + from all global.ini files found in the basedir folders + """ + + def __init__(self, basedir=os.path.abspath(os.path.dirname(__file__)), + caching=True): + object.__init__(self) + self._local = {} + self._global = {} + self._on_demand = {} + self._recent_caller = '' + self._caching = caching + self._main_config = None + self._basedir = [] + + self.setBasedir(basedir) + + # load configurations + if isinstance(basedir, list) or isinstance(basedir, tuple): + files = [] + for d in self._basedir: + if os.path.exists(d): + files.append(os.path.abspath(os.path.join(d, 'global.ini'))) + self.update(files) + else: + self.update() + + def __getattr__(self, key): + """Returns cfg value + 1. first lookup in the local values + 2. then in the global values + """ + + + # first find out who is trying to access us + frame = inspect.currentframe().f_back + + # TODO - make it try the hierarchy first? + if frame: + cfile = self._getCallerPath(frame) + if cfile: + caller = self._getCallerName(cfile) + if caller != self._recent_caller: + # TODO: make it optional, allow for read-once-updates + self.update_local(caller) #update config + + + if key in self._local: + return self._local[key] + elif key in self._global: + return self._global[key] #raise error ok + else: + global_cfg_path = self._main_config and os.path.abspath(self._main_config.filename) or 'None' + local_cfg_path = self._findConfigPath(self._recent_caller) + raise AttributeError('Attribute "%s" not defined\nglobal_config: %s\nlocal_config: %s' % + (key, global_cfg_path, local_cfg_path)) + + def _getCallerId(self, frame): + if frame: + cfile = self._getCallerPath(frame) + if cfile: + caller = self._getCallerName(cfile) + return caller + + + def getBaseDir(self): + return self._basedir + + def setBasedir(self, basedir): + """Sets a new basedir path - this is a root of the configuration + directives from which other paths are resolved""" + if not (isinstance(basedir, list) or isinstance(basedir, tuple)): + basedir = [basedir] + new_base = [] + for b in basedir: + b = os.path.abspath(b) + if b[0] != '\\': + b = b.replace('\\', '/') + b = b[0].lower() + b[1:] + if b not in new_base: + new_base.append(b) + self._basedir = new_base + self.update() + + def update(self, files=None, replace_keys={}): + """Updates values reading them from the main configuration file(s) + @var files: list of configuration files (if empty, default file is read) + @keyword replace_keys: dictionary of values that you want to replace + this allows you to change config at runtime, but IT IS NOT + RECOMMENDED to change anything else than global values (and you can + change only top level values). If you don't know what you are doing, + do not replace any keys! + """ + if files is None: + files = self._makeAllConfigPaths('global') + + updated = 0 + for file in files: + if os.path.exists(file): + # if we have more files, we will wrap/inherit them into one object + # this object should not be probably usef for writing + config = self._main_config = CustomConfigObj(file, + encoding='UTF8', + parent_config = self._main_config) + if replace_keys: + for k, v in replace_keys.items(): + if k in config: + config[k] = v + self._update(self._global, config) + updated += 1 + return updated + + def init(self, filename): + if not os.path.exists(filename): + filename = self._findConfigPath(filename) + caller = self._getCallerId(inspect.currentframe().f_back) + if not(self.update_local(caller, filename)): + raise Exception('Config file: %s does not exist' % filename) + + def update_local(self, name, file=None): + """Update the local, workflow-specific cache + @var name: name of the calling module (without suffix) + @keyword file: file to load config from (if empty, default ini file will be sought) + """ + self._recent_caller = name + self._local = {} + if file is None: + file = self._findConfigPath(name) + + if file and os.path.exists(file): + config = CustomConfigObj(file, + encoding='UTF8', + parent_config = self._main_config) + + self._update(self._local, config) + return True + + def load(self, cfgfile, force_reload=False, failonerror=True, + replace_keys={}): + """Loads configuration file on demand + @var cfgfile: path to the file, the path may be relative, in + that case we will try to guess it using set basedir. Or it + can be absolute + @keyword force_reload: returns cached configuration or reloads + it again from file if force_reload=True + @keyword failonerror: bool, raise Exception when config file + is not found/loaded + @return: config object or None + + example: + c = config.load('some-file.txt') + c.some.key + """ + realpath = None + if os.path.exists(cfgfile): + realpath = cfgfile + else: + new_p = self._findConfigPath(cfgfile) + if new_p: + realpath = new_p + else: + new_p = self._findConfigPath(cfgfile.rsplit('.', 1)[0]) + if new_p: + realpath = new_p + + if not realpath: + if failonerror: + raise Exception('Cannot find: %s' % cfgfile) + else: + sys.stderr.write('Cannot find: %s' % cfgfile) + return + + if realpath in self._on_demand and not force_reload: + return ConfigWrapper(realpath, self._on_demand[realpath]) + + try: + config = CustomConfigObj(realpath, + encoding='UTF8', + parent_config = self._main_config) + if replace_keys: + for k, v in replace_keys.items(): + if k in config: + config[k] = v + except ConfigObjError, msg: + if failonerror: + raise ConfigObjError(msg) + else: + self.traceback.print_exc() + return + + self._on_demand[realpath] = {} + self._update(self._on_demand[realpath], config) + return ConfigWrapper(realpath, self._on_demand[realpath]) + + + def get(self, key, failonerror=True): + """Gets value from the key identified by string, eg. 'index.dir'""" + parts = key.split('.') + pointer = self + try: + for p in parts: + pointer = getattr(pointer, p) + return pointer + except (KeyError, AttributeError): + global_cfg_path = self._main_config and os.path.abspath(self._main_config.filename) or 'None' + local_cfg_path = self._findConfigPath(self._recent_caller) + m = 'Attribute "%s" not defined\nglobal_config: %s\nlocal_config: %s' % \ + (key, global_cfg_path, local_cfg_path) + if failonerror: + raise AttributeError(m) + else: + sys.stderr.write(m) + + + def _getCallerPath(self, frame): + cfile = os.path.abspath(inspect.getfile(frame)).replace('\\', '/') + f = __file__.replace('\\', '/') + if f != cfile: + return cfile + + + def _getCallerName(self, path): + cfile = os.path.split(path)[1] + return cfile.rsplit('.', 1)[0] + + + def getCallersConfig(self, failonerror=True): + """Gets the value from the calling workflow configuration - + this is useful if we want to access configuration of the object + that included us + @var key: name of the key to access, it is a string in a dot notation + """ + + # first find out who is trying to access us + caller = '' + frame = inspect.currentframe().f_back + frame = inspect.currentframe().f_back + if frame: + frame = frame.f_back + if frame: + caller = self._getCallerName(self._getCallerPath(frame)) + path = self._findConfigPath(caller) + if path: + config = self.load(path) + if config: + return config + if failonerror: + raise Exception('Error, cannot find the caller') + + def _findConfigPath(self, name): + """Finds the most specific config path""" + for path in reversed(self._makeAllConfigPaths(name)): + if os.path.exists(path): + return path + + def _makeAllConfigPaths(self, name): + f = [] + for d in self._basedir: + path = '%s/%s.ini' % (d, name) + f.append(path.replace('\\', '/')) + return f + + def _update(self, pointer, config): + for key, cfg_val in config.items(): + if isinstance(cfg_val, Section): + o = cfgval() + for k,v in cfg_val.items(): + if isinstance(v, Section): + o2 = cfgval() + o.__setattr__(k, o2) + self._update(o2, v) + else: + o.__setattr__(k, v) + pointer[key] = o + else: + pointer[key] = cfg_val + + + def __str__(self): + """Returns textual representation of the current config + which can be used for special purposes (ie. to save values + somewhere and reload them -- however, they will be a simple + dictionaries of textual values; without special powers. This + class also does not provide ways to load such dumped values, + we would be circumventing configobj and that is no good. + """ + return """{'global_config' : %s, + 'local_config' : %s, + 'on_demand_config': %s, + 'recent_caller' : '%s'}""" % (self._global, + self._local, + self._on_demand, + self._recent_caller) + + +class cfgval(object): + def __init__(self): + object.__init__(self) + def __setattr__(self, k, v): + self.__dict__[k] = v + def __repr__(self): + #return '{%s}' % ',\n'.join(map(lambda o: "'.%s': %s" % (o[0], repr(o[1])), self.__dict__.items())) + return '%s\n%s' % ('#cfgwrapper', repr(self.__dict__)) + def __iter__(self): + return iter(self.__dict__) + def __setitem__(self, k, v): + self.__setattr__(k, v) + def items(self): + return self.__dict__.items() + def keys(self): + return self.__dict__.keys() + def values(self): + return self.__dict__.values() + +class ConfigWrapper(object): + def __init__(self, realpath, config): + self.__dict__['_config'] = config + self.__dict__['_realpath'] = realpath + def __getattr__(self, key): + return self._config[key] + def __setattr__(self, key, value): + self._config.__setitem__(key, value) + def get(self, key): + parts = key.split('.') + pointer = self + for p in parts: + pointer = getattr(pointer, p) + return pointer + def __str__(self): + return "%s #config from: %s" % (self._config, self._realpath) + + + +# The config instance is a configuration reader +# The configuration can sit in different places + +__cfgdir = None +# set by environmental variable +if 'WORKFLOWCFG' in os.environ: + __cfgdir = os.environ['WORKFLOWCFG'] + if os.pathsep in __cfgdir: + __cfgdir = __cfgdir.split(os.pathsep) +else: + __cfgdir = [os.path.abspath(os.path.dirname(__file__)), + os.path.abspath(os.path.dirname(__file__) + '/cfg')] + + +# This instance has access to all global/local config options +config_reader = ConfigReader(basedir=__cfgdir) + diff --git a/workflow/engine.py b/workflow/engine.py index 1e7582f..267989d 100644 --- a/workflow/engine.py +++ b/workflow/engine.py @@ -1,661 +1,661 @@ - -####################################################################################### -## Copyright (c) 2010-2011, 2014 CERN ## -## All rights reserved. ## -## ## -## Redistribution and use in source and binary forms, with or without modification, ## -## are permitted provided that the following conditions are met: ## -## ## -## * Redistributions of source code must retain the above copyright notice, ## -## this list of conditions and the following disclaimer. ## -## * Redistributions in binary form must reproduce the above copyright notice, ## -## this list of conditions and the following disclaimer in the documentation ## -## and/or other materials provided with the distribution. ## -## * Neither the name of the author nor the names of its contributors may be ## -## used to endorse or promote products derived from this software without ## -## specific prior written permission. ## -## ## -## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ## -## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ## -## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.## -## IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, ## -## INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, ## -## BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, ## -## DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ## -## LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE ## -## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED ## -## OF THE POSSIBILITY OF SUCH DAMAGE. ## -## ## -####################################################################################### - -import logging # we are not using the newseman logging to make this library independent -import new -import copy -import pickle -import sys - -DEBUG = False -LOGGING_LEVEL = logging.INFO -LOG = None - -class WorkflowTransition(Exception): pass # base class for WFE -class WorkflowError(Exception): pass # general error - -class StopProcessing(WorkflowTransition): pass #stops current workflow -class HaltProcessing(WorkflowTransition): pass #halts the workflow (can be used for nested wf engines) -class ContinueNextToken(WorkflowTransition): pass # can be called many levels deep, jumps up to next token -class JumpTokenForward(WorkflowTransition): pass -class JumpTokenBack(WorkflowTransition): pass - -class JumpCallForward(WorkflowTransition): pass #in one loop [call, call...] jumps x steps forward -class JumpCallBack(WorkflowTransition): pass #in one loop [call, call...] jumps x steps forward -class BreakFromThisLoop(WorkflowTransition): pass #break from this loop, but do not stop processing - -class WorkflowMissingKey(WorkflowError): pass # when trying to use unregistered workflow key - - -class GenericWorkflowEngine(object): - """Wofklow engine is a Finite State Machine with memory - It is used to execute set of methods in a specified order. - - example: - - from merkur.workflows.parts import load_annie, load_seman - from newseman.general.workflow import patterns as p - - workflow = [ - load_seman_components.workflow, - p.IF(p.OBJ_GET(['path', 'text'], cond='any'), - [ p.TRY(g.get_annotations(), retry=1, - onfailure=p.ERROR('Error in the annotation workflow'), - verbose=True), - p.IF(p.OBJ_GET('xml'), - translate_document.workflow) - ]) - ] - - This workflow is then used as: - wfe = GenericWorkflowEngine() - wfe.setWorkflow(workflow) - wfe.process([{foo:bar}, {foo:...}]) - - This workflow engine instance can be freezed and restarted, it remembers - its internal state and will pick up processing after the last finished - task. - - import pickle - s = pickle.dumps(wfe) - - However, when restarting the workflow, you must initialize the workflow - tasks manually using their original definition - - wfe = pickle.loads(s) - wfe.setWorkflow(workflow) - - It is also not possible to serialize WFE when custom factory - tasks were provided. If you attempt to serialize such a WFE instance, - it will raise exception. If you want to serialize - WFE including its factory hooks and workflow callbacks, use the - PhoenixWorkflowEngine class instead. - - - """ - - def __init__(self, - processing_factory=None, - callback_chooser=None, - before_processing=None, - after_processing=None): - - self._picklable_safe = True - for name, x in [('processing_factory', processing_factory), - ('callback_chooser', callback_chooser), - ('before_processing', before_processing), - ('after_processing', after_processing)]: - if x: - if not callable(x): - raise WorkflowError('Callback "%s" must be a callable object' % name) - else: - setattr(self, name, x) - self._picklable_safe = False - - self._callbacks = {} - self._store = {} - self._objects = [] # tmp storage of processed objects - self._i = [-1, [0]] # holds ids of the currently processed object and task - self._unpickled = False - self.log = logging.getLogger("workflow.%s" % self.__class__) # default logging - - def __getstate__(self): - if not self._picklable_safe: - raise pickle.PickleError("The instance of the workflow engine cannot be serialized, " - "because it was constructed with custom, user-supplied callbacks. Either use" - "PickableWorkflowEngine or provide your own __getstate__ method.") - return {'_store':self._store, '_objects': self._objects, - '_i': self._i, '_callbacks': {}, 'log': self.log} - - - def __setstate__(self, state): - self._store = state['_store'] - self._objects = state['_objects'] - self._i = state['_i'] - self._callbacks = state['_callbacks'] - self.log = state['log'] - if len(self._objects) < self._i[0]: - raise pickle.PickleError("The workflow instance inconsistent state, too few objects") - self._unpickled = True - - - def setLogger(self, logger): - """The logger instance must be pickable if the serialization should work""" - self.log = logger - - - def continueNextToken(self): - """Continue with the next token""" - raise ContinueNextToken - - def stopProcessing(self): - """Break out, stops everything (in the current wfe)""" - raise StopProcessing - - def haltProcessing(self): - """Halt the workflow (stop also any parent wfe)""" - raise HaltProcessing - - def jumpTokenForward(self, offset): - """Jumps to xth token""" - raise JumpTokenForward(offset) - - def jumpTokenBack(self, offset): - """Returns x tokens back - be careful with circular loops""" - raise JumpTokenBack(offset) - - def jumpCallForward(self, offset): - """Jumps to xth call in this loop""" - raise JumpCallForward(offset) - - def jumpCallBack(self, offset): - """Returns x calls back in the current loop - be careful with circular loop""" - raise JumpCallBack(offset) - - def breakFromThisLoop(self): - """Stops in the current loop but continues in those above""" - raise BreakFromThisLoop - - def configure(self, **kwargs): - """Method to set attributes of the workflow engine - use with extreme care - (well, you can set up the attrs directly, I am not protecting them, but - that is not nice) - Used mainly if you want to change the engine's callbacks - if processing factory - before_processing, after_processing - - @var **kwargs: dictionary of values - """ - for (key, value) in kwargs.items(): - if hasattr(self, key): - setattr(self, key, value) - else: - raise WorkflowError("Object %s does not have attr %s - it is not allowed to set nonexisting attribute (and you don't circumvent interface, do you?)" % (str(self), key)) - - def process(self, objects): - """Start processing - @param objects: either a list of object or - instance of TokenizedDocument - @return: You never know what will be returned from the workflow ;-) - But many exceptions can be raised, so watch out for them, - if there happened an exception, you can be sure something - wrong happened (something that your workflow should handle - and didn't). Workflow engine is not interfering into the - processing chain, it is not catching exceptions for you. - """ - if isinstance(objects, list): - if not len(objects): - self.log.warning('List of objects is empty. Running workflow on empty set has no effect.') - return self.processing_factory(objects, self) - elif hasattr(objects, 'TokenizedDocument') and objects.TokenizedDocument: - if not len(objects.tokens()): - self.log.warning('Running workflow on empty TokenizedDocument set has no effect.') - return self.processing_factory(objects.tokens(), self) - else: - raise WorkflowError('Passed in object %s is neither list nor TokenizedDocument' % (objects.__class__)) - - - - @staticmethod - def before_processing(objects, self): - """Standard pre-processing callback - saves a pointer to the processed objects""" - #self.reset() - self._objects = objects - - - @staticmethod - def after_processing(objects, self): - """Standard post-processing callback, basic cleaning""" - self._objects = [] - self._i = [-1, [0]] - - @staticmethod - def callback_chooser(obj, self): - """There are possibly many workflows inside this workflow engine - and they are meant for different types of objects, this method - should choose and return the callbacks appropriate for the currently - processed object - @var obj: currently processed object - @var eng: the workflow engine object - @return: set of callbacks to run - """ - if hasattr(obj, 'getFeature'): - t = obj.getFeature('type') - if t: - return self.getCallbacks(t) - else: - return self.getCallbacks('*') #for the non-token types return default workflows - - @staticmethod - def processing_factory(objects, self): - """Default processing factory, will process objects in order - - @var objects: list of objects (passed in by self.process()) - @keyword cls: engine object itself, because this method may - be implemented by the standalone function, we pass the - self also as a cls argument - - As the WFE proceeds, it increments the internal counter, the - first position is the number of the element. This pointer increases - before the object is taken - - 2nd pos is reserved for the array that points to the task position. - The number there points to the task that is currently executed; - when error happens, it will be there unchanged. The pointer is - updated after the task finished running. - """ - - self.before_processing(objects, self) - - i = self._i - - while i[0] < len(objects)-1 and i[0] >= -1: # negative index not allowed, -1 is special - i[0] += 1 - obj = objects[i[0]] - callbacks = self.callback_chooser(obj, self) - if callbacks: - try: - self.run_callbacks(callbacks, objects, obj) - i[1] = [0] #reset the callbacks pointer - except StopProcessing: - if DEBUG: - self.log.debug("Processing was stopped: '%s' (object: %s)" % (str(callbacks), repr(obj))) - break - except JumpTokenBack, step: - if step.args[0] > 0: - raise WorkflowError("JumpTokenBack cannot be positive number") - if DEBUG: - self.log.debug('Warning, we go back [%s] objects' % step.args[0]) - i[0] = max(-1, i[0] - 1 + step.args[0]) - i[1] = [0] #reset the callbacks pointer - except JumpTokenForward, step: - if step.args[0] < 0: - raise WorkflowError("JumpTokenForward cannot be negative number") - if DEBUG: - self.log.debug('We skip [%s] objects' % step.args[0]) - i[0] = min(len(objects), i[0] - 1 + step.args[0]) - i[1] = [0] #reset the callbacks pointer - except ContinueNextToken: - if DEBUG: - self.log.debug('Stop processing for this object, continue with next') - i[1] = [0] #reset the callbacks pointer - continue - except HaltProcessing: - if DEBUG: - self.log.debug('Processing was halted at step: %s' % i) - # reraise the exception, this is the only case when a WFE can be completely - # stopped - raise - - self.after_processing(objects, self) - - - - def run_callbacks(self, callbacks, objects, obj, indent=0): - """This method will execute callbacks in the workflow - @var callbacks: list of callables (may be deep nested) - @var objects: list of processed objects - @var obj: currently processed object - @keyword indent: int, indendation level - the counter - at the indent level is increases after the task has - finished processing; on error it will point to the - last executed task position. - The position adjusting also happens after the - task has finished. - """ - c = 0 #Just a counter for debugging - y = self._i[1] #Position of the task - while y[indent] < len(callbacks): - # jump to the appropriate place if we were restarted - if len(y)-1 > indent: - self.log.debug('Fast-forwarding to the position:callback = %s:%s' % (indent, y[indent])) - #print 'indent=%s, y=%s, y=%s, \nbefore=%s\nafter=%s' % (indent, y, y[indent], callbacks, callbacks[y[indent]]) - self.run_callbacks(callbacks[y[indent]], objects, obj, indent+1) - y.pop(-1) - y[indent] += 1 - continue - f = callbacks[y[indent]] - try: - c += 1 - if isinstance(f, list) or isinstance(f, tuple): - y.append(0) - self.run_callbacks(f, objects, obj, indent+1) - y.pop(-1) - y[indent] += 1 - continue - if DEBUG: - self.log.debug("Running (%s%s.) callback '%s' for obj: %s" % (indent * '-', c, f.__name__, repr(obj))) - self.execute_callback(f, obj) - if DEBUG: - self.log.debug('+ok') - except BreakFromThisLoop: - if DEBUG: - self.log.debug('Break from this loop') - return - except JumpCallBack, step: - if DEBUG: - self.log.debug('Warning, we go [%s] calls back' % step.args[0]) - if step.args[0] > 0: - raise WorkflowError("JumpCallBack cannot be positive number") - y[indent] = max(-1, y[indent] + step.args[0]-1) - except JumpCallForward, step: - if DEBUG: - self.log.debug('We skip [%s] calls' % step.args[0]) - if step.args[0] < 0: - raise WorkflowError("JumpCallForward cannot be negative number") - y[indent] = min(len(callbacks), y[indent] + step.args[0]-1) - y[indent] += 1 - #y[indent] -= 1 # adjust the counter so that it always points to the last executed task - - def setPosition(self, obj_pos, task_pos): - """Sets the internal pointers (of current state/obj) - @var obj_pos: (int) index of the currently processed object - After invocation, the engine will grab the next obj - from the list - @var task_pos: (list) multidimensional one-element list - that says at which level the task should restart. Example: - 6th branch, 2nd task = [5, 1] - """ - #TODO: check that positions are not out-of-bounds - self._i[0] = obj_pos - self._i[1] = task_pos - - def execute_callback(self, callback, obj): - """Executes the callback - override this method to implement logging""" - - callback(obj, self) - #print self._i - - - def getCallbacks(self, key='*'): - """Returns callbacks for the given workflow - @keyword key: name of the workflow (default: *) - if you want to get all configured workflows - pass None object as a key - @return: list of callbacks - """ - if key: - try: - return self._callbacks[key] - except KeyError, e: - raise WorkflowMissingKey('No workflow is registered for the key: %s. Perhaps you forgot to load workflows or the workflow definition for the given key was empty?' % key) - else: - return self._callbacks - - def addCallback(self, key, func, before=None, after=None, relative_weight=None): - '''Inserts one callable to the stack of the callables''' - try: - if func: #can be None - self.getCallbacks(key).append(func) - except WorkflowMissingKey: - self._callbacks[key] = [] - return self._callbacks[key].append(func) - except Exception, e: - self.log.debug('Impossible to add callback %s for key: %s' % (str(func), key)) - self.log.debug(e) - - def addManyCallbacks(self, key, list_or_tuple): - list_or_tuple = list(self._cleanUpCallables(list_or_tuple)) - for f in list_or_tuple: - self.addCallback(key, f) - - @classmethod - def _cleanUpCallables(cls, callbacks): - """helper method to remove non-callables from the passed-in callbacks""" - if callable(callbacks): - yield callbacks - for x in callbacks: - if isinstance(x, list): - yield list(cls._cleanUpCallables(x)) - elif isinstance(x, tuple): - # tumples are simply converted to normal members - for fc in cls._cleanUpCallables(x): - yield fc - elif x is not None: - yield x - - def removeAllCallbacks(self): - """Removes all the tasks from the workflow engine instance""" - self._callbacks = {} - - def removeCallbacks(self, key): - """for the given key, remove callbacks""" - try: - del(self._callbacks[key]) - except KeyError: - pass - - def reset(self): - """Empties the stack memory""" - self._i = [-1, [0]] - self._store = {} - - def replaceCallbacks(self, key, funcs): - """replace processing workflow with a new workflow""" - list_or_tuple = list(self._cleanUpCallables(funcs)) - self.removeCallbacks(key) - for f in list_or_tuple: - self.addCallback(key, f) - - def setWorkflow(self, list_or_tuple): - """Sets the (default) workflow which will be run when - you call process() - @var list_or_tuple: workflow configuration - """ - if not isinstance(list_or_tuple, list) or not isinstance(list_or_tuple, tuple): - list_or_tuple = (list_or_tuple,) - self.replaceCallbacks('*', list_or_tuple) - - def setVar(self, key, what): - """Stores the obj in the internal stack""" - self._store[key] = what - - def getVar(self, key, default=None): - """returns named obj from internal stack. If not found, returns None. - @param key: name of the object to return - @keyword default: if not found, what to return instead (if this arg - is present, the stack will be initialized with the same value) - @return: anything or None""" - try: - return self._store[key] - except: - if default is not None: - self.setVar(key, default) - return default - - def hasVar(self, key): - """Returns True if parameter of this name is stored""" - if key in self._store: - return True - - def delVar(self, key): - """Deletes parameter from the internal storage""" - if key in self._store: - del self._store[key] - - def getCurrObjId(self): - """Returns id of the currently processed object""" - return self._i[0] - - def getCurrTaskId(self): - """Returns id of the currently processed task. Note that the return value of this method is not thread-safe.""" - return self._i[1] - - def getObjects(self): - """Returns iterator for walking through the objects""" - i = 0 - for obj in self._objects: - yield (i, obj) - i += 1 - - def restart(self, obj, task, objects=None): - """Restart the workflow engine after it was deserialized - - """ - - if self._unpickled is not True: - raise Exception("You can call this method only after loading serialized engine") - if len(self.getCallbacks(key=None)) == 0: - raise Exception("The callbacks are empty, did you set workflows?") - - # set the point from which to start processing - if obj == 'prev': # start with the previous object - self._i[0] -= 2 #TODO: check if there is any object there - elif obj == 'current': # continue with the current object - self._i[0] -= 1 - elif obj == 'next': - pass - else: - raise Exception('Unknown start point for object: %s' % obj) - - # set the task that will be executed first - if task == 'prev': # the previous - self._i[1][-1] -= 1 - elif task == 'current': # restart the task again - self._i[1][-1] -= 0 - elif task == 'next': # continue with the next task - self._i[1][-1] += 1 - else: - raise Exception('Unknown start pointfor task: %s' % obj) - - if objects: - self.process(objects) - else: - self.process(self._objects) - - self._unpickled = False - - -class PhoenixWorkflowEngine(GenericWorkflowEngine): - """Implementation of the GenericWorkflowEngine which is able to be - *serialized* and re-executed also with its workflow tasks - without knowing - their original definition. This implementation depends on the - picloud module - http://www.picloud.com/. The module must be - installed in the standard location. - - """ - - def __init__(self, *args, **kwargs): - super(PhoenixWorkflowEngine, self).__init__(*args, **kwargs) - from cloud import serialization - self._picloud_serializer = serialization - - - def __getstate__(self): - out = super(PhoenixWorkflowEngine, self).__getstate__() - cbs = self.getCallbacks(key=None) - out['_callbacks'] = self._picloud_serializer.serialize(cbs, needsPyCloudSerializer=True) - factory_calls = {} - for name in ('processing_factory', 'callback_chooser', 'before_processing', 'after_processing'): - c = getattr(self, name) - if c.__class__ != 'PhoenixWorkflowEngine': - factory_calls[name] = c - out['factory_calls'] = self._picloud_serializer.serialize(factory_calls, needsPyCloudSerializer=True) - return out - - def __setstate__(self, state): - from cloud import serialization - self._picloud_serializer = serialization - - state['_callbacks'] = self._picloud_serializer.deserialize(state['_callbacks']) - super(PhoenixWorkflowEngine, self).__setstate__(state) - factory_calls = self._picloud_serializer.deserialize(state['factory_calls']) - for k,v in factory_calls.items(): - setattr(self, k, v) - - - - - - - -# ------------------------------------------------------------- # -# helper methods/classes # -# ------------------------------------------------------------- # - -def duplicate_engine_instance(eng): - """creates a new instance of the workflow engine based on existing instance""" - #new_eng = copy.deepcopy(eng) - #new_eng.removeAllCallbacks() - #new_eng.reset() - - - new_eng = eng.__class__(processing_factory=eng.processing_factory, - callback_chooser=eng.callback_chooser, - before_processing=eng.before_processing, - after_processing=eng.after_processing) - return new_eng - - -def get_logger(name): - """Creates a logger for you - with the parent logger and - common configuration""" - if name[0:8] != 'workflow' and len(name) > 8: - sys.stderr.write("Warning: you are creating a logger without 'workflow' as a root (%s)," - "this means that it will not share workflow settings and cannot be administered from one place" % name) - if LOG: - logger = LOG.manager.getLogger(name) - else: - logger = logging.getLogger(name) - hdlr = logging.StreamHandler(sys.stderr) - formatter = logging.Formatter('%(levelname)s %(asctime)s %(name)s:%(lineno)d %(message)s') - hdlr.setFormatter(formatter) - logger.addHandler(hdlr) - logger.setLevel(LOGGING_LEVEL) - logger.propagate = 0 - if logger not in _loggers: - _loggers.append(logger) - return logger - -def reset_all_loggers(level): - """Set logging level for every active logger - beware, if the global - manager level is higher, then still nothing will be see. Manager - level has precedence - use set_global_level - """ - for l in _loggers: - l.setLevel(LOGGING_LEVEL) - -def set_global_level(level): - """Sets the global level to the manager, the parent manager of all - the newseman loggers. With this one call, you can switch off all - loggers at once. But you can't enable them using this call, because - every logger may have a specific log level - """ - global LOGGING_LEVEL - LOGGING_LEVEL = int(level) - LOG.manager.disable = LOGGING_LEVEL - 1 - - -_loggers = [] -LOG = get_logger('workflow') -set_global_level(LOGGING_LEVEL) - - - -__version__ = '1.2.0.dev20140812' + +####################################################################################### +## Copyright (c) 2010-2011, 2014 CERN ## +## All rights reserved. ## +## ## +## Redistribution and use in source and binary forms, with or without modification, ## +## are permitted provided that the following conditions are met: ## +## ## +## * Redistributions of source code must retain the above copyright notice, ## +## this list of conditions and the following disclaimer. ## +## * Redistributions in binary form must reproduce the above copyright notice, ## +## this list of conditions and the following disclaimer in the documentation ## +## and/or other materials provided with the distribution. ## +## * Neither the name of the author nor the names of its contributors may be ## +## used to endorse or promote products derived from this software without ## +## specific prior written permission. ## +## ## +## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ## +## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ## +## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.## +## IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, ## +## INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, ## +## BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, ## +## DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ## +## LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE ## +## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED ## +## OF THE POSSIBILITY OF SUCH DAMAGE. ## +## ## +####################################################################################### + +import logging # we are not using the newseman logging to make this library independent +import new +import copy +import pickle +import sys + +DEBUG = False +LOGGING_LEVEL = logging.INFO +LOG = None + +class WorkflowTransition(Exception): pass # base class for WFE +class WorkflowError(Exception): pass # general error + +class StopProcessing(WorkflowTransition): pass #stops current workflow +class HaltProcessing(WorkflowTransition): pass #halts the workflow (can be used for nested wf engines) +class ContinueNextToken(WorkflowTransition): pass # can be called many levels deep, jumps up to next token +class JumpTokenForward(WorkflowTransition): pass +class JumpTokenBack(WorkflowTransition): pass + +class JumpCallForward(WorkflowTransition): pass #in one loop [call, call...] jumps x steps forward +class JumpCallBack(WorkflowTransition): pass #in one loop [call, call...] jumps x steps forward +class BreakFromThisLoop(WorkflowTransition): pass #break from this loop, but do not stop processing + +class WorkflowMissingKey(WorkflowError): pass # when trying to use unregistered workflow key + + +class GenericWorkflowEngine(object): + """Wofklow engine is a Finite State Machine with memory + It is used to execute set of methods in a specified order. + + example: + + from merkur.workflows.parts import load_annie, load_seman + from newseman.general.workflow import patterns as p + + workflow = [ + load_seman_components.workflow, + p.IF(p.OBJ_GET(['path', 'text'], cond='any'), + [ p.TRY(g.get_annotations(), retry=1, + onfailure=p.ERROR('Error in the annotation workflow'), + verbose=True), + p.IF(p.OBJ_GET('xml'), + translate_document.workflow) + ]) + ] + + This workflow is then used as: + wfe = GenericWorkflowEngine() + wfe.setWorkflow(workflow) + wfe.process([{foo:bar}, {foo:...}]) + + This workflow engine instance can be freezed and restarted, it remembers + its internal state and will pick up processing after the last finished + task. + + import pickle + s = pickle.dumps(wfe) + + However, when restarting the workflow, you must initialize the workflow + tasks manually using their original definition + + wfe = pickle.loads(s) + wfe.setWorkflow(workflow) + + It is also not possible to serialize WFE when custom factory + tasks were provided. If you attempt to serialize such a WFE instance, + it will raise exception. If you want to serialize + WFE including its factory hooks and workflow callbacks, use the + PhoenixWorkflowEngine class instead. + + + """ + + def __init__(self, + processing_factory=None, + callback_chooser=None, + before_processing=None, + after_processing=None): + + self._picklable_safe = True + for name, x in [('processing_factory', processing_factory), + ('callback_chooser', callback_chooser), + ('before_processing', before_processing), + ('after_processing', after_processing)]: + if x: + if not callable(x): + raise WorkflowError('Callback "%s" must be a callable object' % name) + else: + setattr(self, name, x) + self._picklable_safe = False + + self._callbacks = {} + self._store = {} + self._objects = [] # tmp storage of processed objects + self._i = [-1, [0]] # holds ids of the currently processed object and task + self._unpickled = False + self.log = logging.getLogger("workflow.%s" % self.__class__) # default logging + + def __getstate__(self): + if not self._picklable_safe: + raise pickle.PickleError("The instance of the workflow engine cannot be serialized, " + "because it was constructed with custom, user-supplied callbacks. Either use" + "PickableWorkflowEngine or provide your own __getstate__ method.") + return {'_store':self._store, '_objects': self._objects, + '_i': self._i, '_callbacks': {}, 'log': self.log} + + + def __setstate__(self, state): + self._store = state['_store'] + self._objects = state['_objects'] + self._i = state['_i'] + self._callbacks = state['_callbacks'] + self.log = state['log'] + if len(self._objects) < self._i[0]: + raise pickle.PickleError("The workflow instance inconsistent state, too few objects") + self._unpickled = True + + + def setLogger(self, logger): + """The logger instance must be pickable if the serialization should work""" + self.log = logger + + + def continueNextToken(self): + """Continue with the next token""" + raise ContinueNextToken + + def stopProcessing(self): + """Break out, stops everything (in the current wfe)""" + raise StopProcessing + + def haltProcessing(self): + """Halt the workflow (stop also any parent wfe)""" + raise HaltProcessing + + def jumpTokenForward(self, offset): + """Jumps to xth token""" + raise JumpTokenForward(offset) + + def jumpTokenBack(self, offset): + """Returns x tokens back - be careful with circular loops""" + raise JumpTokenBack(offset) + + def jumpCallForward(self, offset): + """Jumps to xth call in this loop""" + raise JumpCallForward(offset) + + def jumpCallBack(self, offset): + """Returns x calls back in the current loop - be careful with circular loop""" + raise JumpCallBack(offset) + + def breakFromThisLoop(self): + """Stops in the current loop but continues in those above""" + raise BreakFromThisLoop + + def configure(self, **kwargs): + """Method to set attributes of the workflow engine - use with extreme care + (well, you can set up the attrs directly, I am not protecting them, but + that is not nice) + Used mainly if you want to change the engine's callbacks - if processing factory + before_processing, after_processing + + @var **kwargs: dictionary of values + """ + for (key, value) in kwargs.items(): + if hasattr(self, key): + setattr(self, key, value) + else: + raise WorkflowError("Object %s does not have attr %s - it is not allowed to set nonexisting attribute (and you don't circumvent interface, do you?)" % (str(self), key)) + + def process(self, objects): + """Start processing + @param objects: either a list of object or + instance of TokenizedDocument + @return: You never know what will be returned from the workflow ;-) + But many exceptions can be raised, so watch out for them, + if there happened an exception, you can be sure something + wrong happened (something that your workflow should handle + and didn't). Workflow engine is not interfering into the + processing chain, it is not catching exceptions for you. + """ + if isinstance(objects, list): + if not len(objects): + self.log.warning('List of objects is empty. Running workflow on empty set has no effect.') + return self.processing_factory(objects, self) + elif hasattr(objects, 'TokenizedDocument') and objects.TokenizedDocument: + if not len(objects.tokens()): + self.log.warning('Running workflow on empty TokenizedDocument set has no effect.') + return self.processing_factory(objects.tokens(), self) + else: + raise WorkflowError('Passed in object %s is neither list nor TokenizedDocument' % (objects.__class__)) + + + + @staticmethod + def before_processing(objects, self): + """Standard pre-processing callback - saves a pointer to the processed objects""" + #self.reset() + self._objects = objects + + + @staticmethod + def after_processing(objects, self): + """Standard post-processing callback, basic cleaning""" + self._objects = [] + self._i = [-1, [0]] + + @staticmethod + def callback_chooser(obj, self): + """There are possibly many workflows inside this workflow engine + and they are meant for different types of objects, this method + should choose and return the callbacks appropriate for the currently + processed object + @var obj: currently processed object + @var eng: the workflow engine object + @return: set of callbacks to run + """ + if hasattr(obj, 'getFeature'): + t = obj.getFeature('type') + if t: + return self.getCallbacks(t) + else: + return self.getCallbacks('*') #for the non-token types return default workflows + + @staticmethod + def processing_factory(objects, self): + """Default processing factory, will process objects in order + + @var objects: list of objects (passed in by self.process()) + @keyword cls: engine object itself, because this method may + be implemented by the standalone function, we pass the + self also as a cls argument + + As the WFE proceeds, it increments the internal counter, the + first position is the number of the element. This pointer increases + before the object is taken + + 2nd pos is reserved for the array that points to the task position. + The number there points to the task that is currently executed; + when error happens, it will be there unchanged. The pointer is + updated after the task finished running. + """ + + self.before_processing(objects, self) + + i = self._i + + while i[0] < len(objects)-1 and i[0] >= -1: # negative index not allowed, -1 is special + i[0] += 1 + obj = objects[i[0]] + callbacks = self.callback_chooser(obj, self) + if callbacks: + try: + self.run_callbacks(callbacks, objects, obj) + i[1] = [0] #reset the callbacks pointer + except StopProcessing: + if DEBUG: + self.log.debug("Processing was stopped: '%s' (object: %s)" % (str(callbacks), repr(obj))) + break + except JumpTokenBack, step: + if step.args[0] > 0: + raise WorkflowError("JumpTokenBack cannot be positive number") + if DEBUG: + self.log.debug('Warning, we go back [%s] objects' % step.args[0]) + i[0] = max(-1, i[0] - 1 + step.args[0]) + i[1] = [0] #reset the callbacks pointer + except JumpTokenForward, step: + if step.args[0] < 0: + raise WorkflowError("JumpTokenForward cannot be negative number") + if DEBUG: + self.log.debug('We skip [%s] objects' % step.args[0]) + i[0] = min(len(objects), i[0] - 1 + step.args[0]) + i[1] = [0] #reset the callbacks pointer + except ContinueNextToken: + if DEBUG: + self.log.debug('Stop processing for this object, continue with next') + i[1] = [0] #reset the callbacks pointer + continue + except HaltProcessing: + if DEBUG: + self.log.debug('Processing was halted at step: %s' % i) + # reraise the exception, this is the only case when a WFE can be completely + # stopped + raise + + self.after_processing(objects, self) + + + + def run_callbacks(self, callbacks, objects, obj, indent=0): + """This method will execute callbacks in the workflow + @var callbacks: list of callables (may be deep nested) + @var objects: list of processed objects + @var obj: currently processed object + @keyword indent: int, indendation level - the counter + at the indent level is increases after the task has + finished processing; on error it will point to the + last executed task position. + The position adjusting also happens after the + task has finished. + """ + c = 0 #Just a counter for debugging + y = self._i[1] #Position of the task + while y[indent] < len(callbacks): + # jump to the appropriate place if we were restarted + if len(y)-1 > indent: + self.log.debug('Fast-forwarding to the position:callback = %s:%s' % (indent, y[indent])) + #print 'indent=%s, y=%s, y=%s, \nbefore=%s\nafter=%s' % (indent, y, y[indent], callbacks, callbacks[y[indent]]) + self.run_callbacks(callbacks[y[indent]], objects, obj, indent+1) + y.pop(-1) + y[indent] += 1 + continue + f = callbacks[y[indent]] + try: + c += 1 + if isinstance(f, list) or isinstance(f, tuple): + y.append(0) + self.run_callbacks(f, objects, obj, indent+1) + y.pop(-1) + y[indent] += 1 + continue + if DEBUG: + self.log.debug("Running (%s%s.) callback '%s' for obj: %s" % (indent * '-', c, f.__name__, repr(obj))) + self.execute_callback(f, obj) + if DEBUG: + self.log.debug('+ok') + except BreakFromThisLoop: + if DEBUG: + self.log.debug('Break from this loop') + return + except JumpCallBack, step: + if DEBUG: + self.log.debug('Warning, we go [%s] calls back' % step.args[0]) + if step.args[0] > 0: + raise WorkflowError("JumpCallBack cannot be positive number") + y[indent] = max(-1, y[indent] + step.args[0]-1) + except JumpCallForward, step: + if DEBUG: + self.log.debug('We skip [%s] calls' % step.args[0]) + if step.args[0] < 0: + raise WorkflowError("JumpCallForward cannot be negative number") + y[indent] = min(len(callbacks), y[indent] + step.args[0]-1) + y[indent] += 1 + #y[indent] -= 1 # adjust the counter so that it always points to the last executed task + + def setPosition(self, obj_pos, task_pos): + """Sets the internal pointers (of current state/obj) + @var obj_pos: (int) index of the currently processed object + After invocation, the engine will grab the next obj + from the list + @var task_pos: (list) multidimensional one-element list + that says at which level the task should restart. Example: + 6th branch, 2nd task = [5, 1] + """ + #TODO: check that positions are not out-of-bounds + self._i[0] = obj_pos + self._i[1] = task_pos + + def execute_callback(self, callback, obj): + """Executes the callback - override this method to implement logging""" + + callback(obj, self) + #print self._i + + + def getCallbacks(self, key='*'): + """Returns callbacks for the given workflow + @keyword key: name of the workflow (default: *) + if you want to get all configured workflows + pass None object as a key + @return: list of callbacks + """ + if key: + try: + return self._callbacks[key] + except KeyError, e: + raise WorkflowMissingKey('No workflow is registered for the key: %s. Perhaps you forgot to load workflows or the workflow definition for the given key was empty?' % key) + else: + return self._callbacks + + def addCallback(self, key, func, before=None, after=None, relative_weight=None): + '''Inserts one callable to the stack of the callables''' + try: + if func: #can be None + self.getCallbacks(key).append(func) + except WorkflowMissingKey: + self._callbacks[key] = [] + return self._callbacks[key].append(func) + except Exception, e: + self.log.debug('Impossible to add callback %s for key: %s' % (str(func), key)) + self.log.debug(e) + + def addManyCallbacks(self, key, list_or_tuple): + list_or_tuple = list(self._cleanUpCallables(list_or_tuple)) + for f in list_or_tuple: + self.addCallback(key, f) + + @classmethod + def _cleanUpCallables(cls, callbacks): + """helper method to remove non-callables from the passed-in callbacks""" + if callable(callbacks): + yield callbacks + for x in callbacks: + if isinstance(x, list): + yield list(cls._cleanUpCallables(x)) + elif isinstance(x, tuple): + # tumples are simply converted to normal members + for fc in cls._cleanUpCallables(x): + yield fc + elif x is not None: + yield x + + def removeAllCallbacks(self): + """Removes all the tasks from the workflow engine instance""" + self._callbacks = {} + + def removeCallbacks(self, key): + """for the given key, remove callbacks""" + try: + del(self._callbacks[key]) + except KeyError: + pass + + def reset(self): + """Empties the stack memory""" + self._i = [-1, [0]] + self._store = {} + + def replaceCallbacks(self, key, funcs): + """replace processing workflow with a new workflow""" + list_or_tuple = list(self._cleanUpCallables(funcs)) + self.removeCallbacks(key) + for f in list_or_tuple: + self.addCallback(key, f) + + def setWorkflow(self, list_or_tuple): + """Sets the (default) workflow which will be run when + you call process() + @var list_or_tuple: workflow configuration + """ + if not isinstance(list_or_tuple, list) or not isinstance(list_or_tuple, tuple): + list_or_tuple = (list_or_tuple,) + self.replaceCallbacks('*', list_or_tuple) + + def setVar(self, key, what): + """Stores the obj in the internal stack""" + self._store[key] = what + + def getVar(self, key, default=None): + """returns named obj from internal stack. If not found, returns None. + @param key: name of the object to return + @keyword default: if not found, what to return instead (if this arg + is present, the stack will be initialized with the same value) + @return: anything or None""" + try: + return self._store[key] + except: + if default is not None: + self.setVar(key, default) + return default + + def hasVar(self, key): + """Returns True if parameter of this name is stored""" + if key in self._store: + return True + + def delVar(self, key): + """Deletes parameter from the internal storage""" + if key in self._store: + del self._store[key] + + def getCurrObjId(self): + """Returns id of the currently processed object""" + return self._i[0] + + def getCurrTaskId(self): + """Returns id of the currently processed task. Note that the return value of this method is not thread-safe.""" + return self._i[1] + + def getObjects(self): + """Returns iterator for walking through the objects""" + i = 0 + for obj in self._objects: + yield (i, obj) + i += 1 + + def restart(self, obj, task, objects=None): + """Restart the workflow engine after it was deserialized + + """ + + if self._unpickled is not True: + raise Exception("You can call this method only after loading serialized engine") + if len(self.getCallbacks(key=None)) == 0: + raise Exception("The callbacks are empty, did you set workflows?") + + # set the point from which to start processing + if obj == 'prev': # start with the previous object + self._i[0] -= 2 #TODO: check if there is any object there + elif obj == 'current': # continue with the current object + self._i[0] -= 1 + elif obj == 'next': + pass + else: + raise Exception('Unknown start point for object: %s' % obj) + + # set the task that will be executed first + if task == 'prev': # the previous + self._i[1][-1] -= 1 + elif task == 'current': # restart the task again + self._i[1][-1] -= 0 + elif task == 'next': # continue with the next task + self._i[1][-1] += 1 + else: + raise Exception('Unknown start pointfor task: %s' % obj) + + if objects: + self.process(objects) + else: + self.process(self._objects) + + self._unpickled = False + + +class PhoenixWorkflowEngine(GenericWorkflowEngine): + """Implementation of the GenericWorkflowEngine which is able to be + *serialized* and re-executed also with its workflow tasks - without knowing + their original definition. This implementation depends on the + picloud module - http://www.picloud.com/. The module must be + installed in the standard location. + + """ + + def __init__(self, *args, **kwargs): + super(PhoenixWorkflowEngine, self).__init__(*args, **kwargs) + from cloud import serialization + self._picloud_serializer = serialization + + + def __getstate__(self): + out = super(PhoenixWorkflowEngine, self).__getstate__() + cbs = self.getCallbacks(key=None) + out['_callbacks'] = self._picloud_serializer.serialize(cbs, needsPyCloudSerializer=True) + factory_calls = {} + for name in ('processing_factory', 'callback_chooser', 'before_processing', 'after_processing'): + c = getattr(self, name) + if c.__class__ != 'PhoenixWorkflowEngine': + factory_calls[name] = c + out['factory_calls'] = self._picloud_serializer.serialize(factory_calls, needsPyCloudSerializer=True) + return out + + def __setstate__(self, state): + from cloud import serialization + self._picloud_serializer = serialization + + state['_callbacks'] = self._picloud_serializer.deserialize(state['_callbacks']) + super(PhoenixWorkflowEngine, self).__setstate__(state) + factory_calls = self._picloud_serializer.deserialize(state['factory_calls']) + for k,v in factory_calls.items(): + setattr(self, k, v) + + + + + + + +# ------------------------------------------------------------- # +# helper methods/classes # +# ------------------------------------------------------------- # + +def duplicate_engine_instance(eng): + """creates a new instance of the workflow engine based on existing instance""" + #new_eng = copy.deepcopy(eng) + #new_eng.removeAllCallbacks() + #new_eng.reset() + + + new_eng = eng.__class__(processing_factory=eng.processing_factory, + callback_chooser=eng.callback_chooser, + before_processing=eng.before_processing, + after_processing=eng.after_processing) + return new_eng + + +def get_logger(name): + """Creates a logger for you - with the parent logger and + common configuration""" + if name[0:8] != 'workflow' and len(name) > 8: + sys.stderr.write("Warning: you are creating a logger without 'workflow' as a root (%s)," + "this means that it will not share workflow settings and cannot be administered from one place" % name) + if LOG: + logger = LOG.manager.getLogger(name) + else: + logger = logging.getLogger(name) + hdlr = logging.StreamHandler(sys.stderr) + formatter = logging.Formatter('%(levelname)s %(asctime)s %(name)s:%(lineno)d %(message)s') + hdlr.setFormatter(formatter) + logger.addHandler(hdlr) + logger.setLevel(LOGGING_LEVEL) + logger.propagate = 0 + if logger not in _loggers: + _loggers.append(logger) + return logger + +def reset_all_loggers(level): + """Set logging level for every active logger - beware, if the global + manager level is higher, then still nothing will be see. Manager + level has precedence - use set_global_level + """ + for l in _loggers: + l.setLevel(LOGGING_LEVEL) + +def set_global_level(level): + """Sets the global level to the manager, the parent manager of all + the newseman loggers. With this one call, you can switch off all + loggers at once. But you can't enable them using this call, because + every logger may have a specific log level + """ + global LOGGING_LEVEL + LOGGING_LEVEL = int(level) + LOG.manager.disable = LOGGING_LEVEL - 1 + + +_loggers = [] +LOG = get_logger('workflow') +set_global_level(LOGGING_LEVEL) + + + +__version__ = '1.2.0.dev20140812' diff --git a/workflow/patterns/__init__.py b/workflow/patterns/__init__.py index 5dc780c..f9935d2 100644 --- a/workflow/patterns/__init__.py +++ b/workflow/patterns/__init__.py @@ -1,21 +1,21 @@ -#basic flow commands -from controlflow import STOP, BREAK, \ - OBJ_JUMP_BWD, OBJ_JUMP_FWD, OBJ_NEXT, \ - TASK_JUMP_BWD, TASK_JUMP_FWD, TASK_JUMP_IF - - -# conditions -from controlflow import IF, IF_NOT, IF_ELSE, WHILE - -# basic patterns -from controlflow import PARALLEL_SPLIT, SYNCHRONIZE, SIMPLE_MERGE, CHOICE - - -# helper functions -from utils import EMPTY_CALL, \ - ENG_GET, ENG_SET, \ - OBJ_SET, OBJ_GET, \ - ERROR, TRY, RUN_WF, \ - CALLFUNC, DEBUG_CYCLE, \ - PROFILE - +#basic flow commands +from controlflow import STOP, BREAK, \ + OBJ_JUMP_BWD, OBJ_JUMP_FWD, OBJ_NEXT, \ + TASK_JUMP_BWD, TASK_JUMP_FWD, TASK_JUMP_IF + + +# conditions +from controlflow import IF, IF_NOT, IF_ELSE, WHILE + +# basic patterns +from controlflow import PARALLEL_SPLIT, SYNCHRONIZE, SIMPLE_MERGE, CHOICE + + +# helper functions +from utils import EMPTY_CALL, \ + ENG_GET, ENG_SET, \ + OBJ_SET, OBJ_GET, \ + ERROR, TRY, RUN_WF, \ + CALLFUNC, DEBUG_CYCLE, \ + PROFILE + diff --git a/workflow/patterns/controlflow.py b/workflow/patterns/controlflow.py index 80d7aef..962c34b 100644 --- a/workflow/patterns/controlflow.py +++ b/workflow/patterns/controlflow.py @@ -1,361 +1,361 @@ - -####################################################################################### -## Copyright (c) 2010-2011, CERN ## -## All rights reserved. ## -## ## -## Redistribution and use in source and binary forms, with or without modification, ## -## are permitted provided that the following conditions are met: ## -## ## -## * Redistributions of source code must retain the above copyright notice, ## -## this list of conditions and the following disclaimer. ## -## * Redistributions in binary form must reproduce the above copyright notice, ## -## this list of conditions and the following disclaimer in the documentation ## -## and/or other materials provided with the distribution. ## -## * Neither the name of the author nor the names of its contributors may be ## -## used to endorse or promote products derived from this software without ## -## specific prior written permission. ## -## ## -## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ## -## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ## -## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.## -## IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, ## -## INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, ## -## BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, ## -## DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ## -## LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE ## -## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED ## -## OF THE POSSIBILITY OF SUCH DAMAGE. ## -####################################################################################### - -import threading -import thread -import Queue -import time -import copy - - -MAX_TIMEOUT = 30000 - - -from workflow.engine import GenericWorkflowEngine as engine -from workflow.engine import duplicate_engine_instance - -# ----------------------- helper calls -------------------------------- # - - - -def TASK_JUMP_BWD(step=-1): - """Jumps to the previous task - eng.jumpCallBack - example: A, B, TASK_JUMP_FWD(-2), C, D, ... - will produce: A, B, A, B, A, B, ... (recursion!) - @var step: int, must not be positive number - """ - def _move_back(obj, eng): - eng.jumpCallBack(step) - _move_back.__name__ = 'TASK_JUMP_BWD' - return _move_back - -def TASK_JUMP_FWD(step=1): - """Jumps to the next task - eng.jumpCallForward() - example: A, B, TASK_JUMP_FWD(2), C, D, ... - will produce: A, B, D - @var step: int - """ - def _x(obj, eng): - eng.jumpCallForward(step) - _x.__name__ = 'TASK_JUMP_FWD' - return _x - -def TASK_JUMP_IF(cond, step): - """Jumps in the specified direction if the condition - evaluates to True, the difference from other IF conditions - is that this one does not insert the code inside a [] block - @var cond: function - @var step: int, negative jumps back, positive forward - """ - def minus(obj, eng): - return cond(obj, eng) and eng.jumpCallBack(step) - def plus(obj, eng): - return cond(obj, eng) and eng.jumpCallForward(step) - if int(step) < 0: - return minus - else: - return plus - -def BREAK(): - """Stops execution of the current block, but keeps running - in the workflow - eng.breakFromThisLoop() - """ - def x(obj, eng): - eng.breakFromThisLoop() - x.__name__ = 'BREAK' - return x - -def STOP(): - """Unconditional stop of the workflow execution""" - def x(obj, eng): - eng.stopProcessing() - x.__name__ = 'STOP' - return x - -def OBJ_NEXT(): - """Stops the workflow execution for the current object and start - the same worfklow for the next object - eng.continueNextToken()""" - def x(obj, eng): - eng.continueNextToken() - x.__name__ = 'OBJ_NEXT' - return x - -def OBJ_JUMP_FWD(step=1): - """Stops the workflow execution, jumps to xth consecutive object - and starts executing the workflow on it - eng.jumpTokenForward() - @var step: int, relative jump from the current obj, must not be - negative number - """ - def x(obj, eng): - eng.jumpTokenForward(step) - x.__name__ = 'OBJ_JUMP_FWD' - return x - -def OBJ_JUMP_BWD(step=-1): - """Stops the workflow execution, jumps to xth antecedent object - and starts executing the workflow on it - eng.jumpTokenForward() - @var step: int, relative jump from the current obj, must not be - negative number - """ - def _x(obj, eng): - eng.jumpTokenBackward(step) - _x.__name__ = 'OBJ_JUMP_BWD' - return _x - -# -------------------------- some conditions -------------------------------------- # - -def IF(cond, branch): - """Implements condition, if cond evaluates to True - branch is executed - @var cond: callable, function that decides - @var branch: block of functions to run - - @attention: the branch is inserted inside [] block, therefore jumping is limited - only inside the branch - """ - x = lambda obj, eng: cond(obj,eng) and eng.jumpCallForward(1) or eng.breakFromThisLoop() - x.__name__ = 'IF' - return [x, branch] - -def IF_NOT(cond, branch): - """Implements condition, if cond evaluates to False - branch is executed - @var cond: callable, function that decides - @var branch: block of functions to run - - @attention: the branch is inserted inside [] block, therefore jumping is limited - only inside the branch - """ - def x(obj, eng): - if cond(obj, eng): - eng.breakFromThisLoop() - return 1 - x.__name__ = 'IF_NOT' - return [x, branch] - -def IF_ELSE(cond, branch1, branch2): - """Implements condition, if cond evaluates to True - branch1 is executed, otherwise branch2 - @var cond: callable, function that decides - @var branch1: block of functions to run [if=true] - @var branch2: block of functions to run [else] - - @attention: the branch is inserted inside [] block, therefore jumping is limited - only inside the branch - """ - if branch1 is None or branch2 is None: - raise Exception ("Neither of the branches can be None/empty") - x = lambda obj, eng: cond(obj, eng) and eng.jumpCallForward(1) or eng.jumpCallForward(3) - x.__name__ = 'IF_ELSE' - return [x, branch1, BREAK(), branch2] - -def WHILE(cond, branch): - """Keeps executing branch as long as the condition cond is True - @var cond: callable, function that decides - @var branch: block of functions to run [if=true] - """ - # quite often i passed a function, which results in errors - if callable(branch): - branch = (branch,) - # we don't know what is hiding inside branch - branch = tuple(engine._cleanUpCallables(branch)) - def x(obj, eng): - if not cond(obj, eng): - eng.breakFromThisLoop() - x.__name__ = 'WHILE' - return [x, branch, TASK_JUMP_BWD(-(len(branch)+1))] - -# -------------------- basic control flow patterns -------------------------------- # -# -------- http://www.yawlfoundation.org/resources/patterns.html#basic ------------ # - -def PARALLEL_SPLIT(*args): - """ - Tasks A,B,C,D... are all started in parallel - @attention: tasks A,B,C,D... are not addressable, you can't - you can't use jumping to them (they are invisible to - the workflow engine). Though you can jump inside the - branches - @attention: tasks B,C,D... will be running on their own - once you have started them, and we are not waiting for - them to finish. Workflow will continue executing other - tasks while B,C,D... might be still running. - @attention: a new engine is spawned for each branch or code, - all operations works as expected, but mind that the branches - know about themselves, they don't see other tasks outside. - They are passed the object, but not the old workflow - engine object - @postcondition: eng object will contain lock (to be used - by threads) - """ - - def _parallel_split(obj, eng, calls): - lock=thread.allocate_lock() - i = 0 - eng.setVar('lock', lock) - for func in calls: - new_eng = duplicate_engine_instance(eng) - new_eng.setWorkflow([lambda o,e: e.setVar('lock', lock), func]) - thread.start_new_thread(new_eng.process, ([obj], )) - #new_eng.process([obj]) - return lambda o, e: _parallel_split(o, e, args) - - -def SYNCHRONIZE(*args, **kwargs): - """ - After the execution of task B, task C, and task D, task E can be executed. - @var *args: args can be a mix of callables and list of callables - the simplest situation comes when you pass a list of callables - they will be simply executed in parallel. - But if you pass a list of callables (branch of callables) - which is potentionally a new workflow, we will first create a - workflow engine with the workflows, and execute the branch in it - @attention: you should never jump out of the synchronized branches - """ - timeout = MAX_TIMEOUT - if 'timeout' in kwargs: - timeout = kwargs['timeout'] - - if len(args) < 2: - raise Exception('You must pass at least two callables') - - def _synchronize(obj, eng): - queue = MyTimeoutQueue() - #spawn a pool of threads, and pass them queue instance - for i in range(len(args)-1): - t = MySpecialThread(queue) - t.setDaemon(True) - t.start() - - for func in args[0:-1]: - if isinstance(func, list) or isinstance(func, tuple): - new_eng = duplicate_engine_instance(eng) - new_eng.setWorkflow(func) - queue.put(lambda: new_eng.process([obj])) - else: - queue.put(lambda: func(obj, eng)) - - #wait on the queue until everything has been processed - queue.join_with_timeout(timeout) - - #run the last func - args[-1](obj, eng) - _synchronize.__name__ = 'SYNCHRONIZE' - return _synchronize - -def CHOICE(arbiter, *predicates, **kwpredicates): - """ - A choice is made to execute either task B, task C or task D - after execution of task A. - @var arbiter: a function which returns some value (the value - must be inside the predicates dictionary) - @var predicates: list of callables, the first item must be the - value returned by the arbiter, example: - ('submit', task_a), - ('upload' : task_a, [task_b, task_c]...) - @keyword **kwpredicates: you can supply predicates also as a - keywords, example - CHOICE(arbiter, one=lambda...., two=[lambda o,e:...., ...]) - @postcondition: all tasks are 'jumpable' - - """ - workflow = [] - mapping = {} - for branch in predicates: - workflow.append(branch[1:]) - mapping[branch[0]] = len(workflow) - workflow.append(BREAK()) - - for k, v in kwpredicates.items(): - workflow.append(v) - mapping[k] = len(workflow) - workflow.append(BREAK()) - - def _exclusive_choice(obj, eng): - val = arbiter(obj, eng) - i = mapping[val] # die on error - eng.jumpCallForward(i) - c = _exclusive_choice - c.__name__ = arbiter.__name__ - workflow.insert(0, c) - return workflow - - -def SIMPLE_MERGE(*args): - """ - Task E will be started when any one of the tasks B, C or D completes. - This pattern though makes a context assumption: there is no - parallelism preceding task E. - """ - - if len(args) < 2: - raise Exception("You must suply at least 2 callables") - - final_task = args[-1] - workflow = [] - mapping = {} - total = ((len(args)-1) * 2) + 1 - for branch in args[0:-1]: - total -= 2 - workflow.append(branch) - workflow.append(TASK_JUMP_FWD(total)) - - workflow.append(final_task) - return workflow - - -# ------------------------------------------------------------- # -# helper methods/classes # -# ------------------------------------------------------------- # - - -class MyTimeoutQueue(Queue.Queue): - def __init__(self, *args): - Queue.Queue.__init__(self, *args) - - def join_with_timeout(self, timeout): - self.all_tasks_done.acquire() - try: - endtime = time.time() + timeout - while self.unfinished_tasks: - remaining = endtime - time.time() - if remaining <= 0.0: - raise threading.ThreadError('NotFinished') - time.sleep(.05) - self.all_tasks_done.wait(remaining) - finally: - self.all_tasks_done.release() - -class MySpecialThread(threading.Thread): - def __init__(self, itemq, *args, **kwargs): - threading.Thread.__init__(self, *args, **kwargs) - self.itemq = itemq - - def run(self): - call = self.itemq.get() - call() + +####################################################################################### +## Copyright (c) 2010-2011, CERN ## +## All rights reserved. ## +## ## +## Redistribution and use in source and binary forms, with or without modification, ## +## are permitted provided that the following conditions are met: ## +## ## +## * Redistributions of source code must retain the above copyright notice, ## +## this list of conditions and the following disclaimer. ## +## * Redistributions in binary form must reproduce the above copyright notice, ## +## this list of conditions and the following disclaimer in the documentation ## +## and/or other materials provided with the distribution. ## +## * Neither the name of the author nor the names of its contributors may be ## +## used to endorse or promote products derived from this software without ## +## specific prior written permission. ## +## ## +## THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ## +## ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED ## +## WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.## +## IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, ## +## INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, ## +## BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, ## +## DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF ## +## LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE ## +## OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED ## +## OF THE POSSIBILITY OF SUCH DAMAGE. ## +####################################################################################### + +import threading +import thread +import Queue +import time +import copy + + +MAX_TIMEOUT = 30000 + + +from workflow.engine import GenericWorkflowEngine as engine +from workflow.engine import duplicate_engine_instance + +# ----------------------- helper calls -------------------------------- # + + + +def TASK_JUMP_BWD(step=-1): + """Jumps to the previous task - eng.jumpCallBack + example: A, B, TASK_JUMP_FWD(-2), C, D, ... + will produce: A, B, A, B, A, B, ... (recursion!) + @var step: int, must not be positive number + """ + def _move_back(obj, eng): + eng.jumpCallBack(step) + _move_back.__name__ = 'TASK_JUMP_BWD' + return _move_back + +def TASK_JUMP_FWD(step=1): + """Jumps to the next task - eng.jumpCallForward() + example: A, B, TASK_JUMP_FWD(2), C, D, ... + will produce: A, B, D + @var step: int + """ + def _x(obj, eng): + eng.jumpCallForward(step) + _x.__name__ = 'TASK_JUMP_FWD' + return _x + +def TASK_JUMP_IF(cond, step): + """Jumps in the specified direction if the condition + evaluates to True, the difference from other IF conditions + is that this one does not insert the code inside a [] block + @var cond: function + @var step: int, negative jumps back, positive forward + """ + def minus(obj, eng): + return cond(obj, eng) and eng.jumpCallBack(step) + def plus(obj, eng): + return cond(obj, eng) and eng.jumpCallForward(step) + if int(step) < 0: + return minus + else: + return plus + +def BREAK(): + """Stops execution of the current block, but keeps running + in the workflow - eng.breakFromThisLoop() + """ + def x(obj, eng): + eng.breakFromThisLoop() + x.__name__ = 'BREAK' + return x + +def STOP(): + """Unconditional stop of the workflow execution""" + def x(obj, eng): + eng.stopProcessing() + x.__name__ = 'STOP' + return x + +def OBJ_NEXT(): + """Stops the workflow execution for the current object and start + the same worfklow for the next object - eng.continueNextToken()""" + def x(obj, eng): + eng.continueNextToken() + x.__name__ = 'OBJ_NEXT' + return x + +def OBJ_JUMP_FWD(step=1): + """Stops the workflow execution, jumps to xth consecutive object + and starts executing the workflow on it - eng.jumpTokenForward() + @var step: int, relative jump from the current obj, must not be + negative number + """ + def x(obj, eng): + eng.jumpTokenForward(step) + x.__name__ = 'OBJ_JUMP_FWD' + return x + +def OBJ_JUMP_BWD(step=-1): + """Stops the workflow execution, jumps to xth antecedent object + and starts executing the workflow on it - eng.jumpTokenForward() + @var step: int, relative jump from the current obj, must not be + negative number + """ + def _x(obj, eng): + eng.jumpTokenBackward(step) + _x.__name__ = 'OBJ_JUMP_BWD' + return _x + +# -------------------------- some conditions -------------------------------------- # + +def IF(cond, branch): + """Implements condition, if cond evaluates to True + branch is executed + @var cond: callable, function that decides + @var branch: block of functions to run + + @attention: the branch is inserted inside [] block, therefore jumping is limited + only inside the branch + """ + x = lambda obj, eng: cond(obj,eng) and eng.jumpCallForward(1) or eng.breakFromThisLoop() + x.__name__ = 'IF' + return [x, branch] + +def IF_NOT(cond, branch): + """Implements condition, if cond evaluates to False + branch is executed + @var cond: callable, function that decides + @var branch: block of functions to run + + @attention: the branch is inserted inside [] block, therefore jumping is limited + only inside the branch + """ + def x(obj, eng): + if cond(obj, eng): + eng.breakFromThisLoop() + return 1 + x.__name__ = 'IF_NOT' + return [x, branch] + +def IF_ELSE(cond, branch1, branch2): + """Implements condition, if cond evaluates to True + branch1 is executed, otherwise branch2 + @var cond: callable, function that decides + @var branch1: block of functions to run [if=true] + @var branch2: block of functions to run [else] + + @attention: the branch is inserted inside [] block, therefore jumping is limited + only inside the branch + """ + if branch1 is None or branch2 is None: + raise Exception ("Neither of the branches can be None/empty") + x = lambda obj, eng: cond(obj, eng) and eng.jumpCallForward(1) or eng.jumpCallForward(3) + x.__name__ = 'IF_ELSE' + return [x, branch1, BREAK(), branch2] + +def WHILE(cond, branch): + """Keeps executing branch as long as the condition cond is True + @var cond: callable, function that decides + @var branch: block of functions to run [if=true] + """ + # quite often i passed a function, which results in errors + if callable(branch): + branch = (branch,) + # we don't know what is hiding inside branch + branch = tuple(engine._cleanUpCallables(branch)) + def x(obj, eng): + if not cond(obj, eng): + eng.breakFromThisLoop() + x.__name__ = 'WHILE' + return [x, branch, TASK_JUMP_BWD(-(len(branch)+1))] + +# -------------------- basic control flow patterns -------------------------------- # +# -------- http://www.yawlfoundation.org/resources/patterns.html#basic ------------ # + +def PARALLEL_SPLIT(*args): + """ + Tasks A,B,C,D... are all started in parallel + @attention: tasks A,B,C,D... are not addressable, you can't + you can't use jumping to them (they are invisible to + the workflow engine). Though you can jump inside the + branches + @attention: tasks B,C,D... will be running on their own + once you have started them, and we are not waiting for + them to finish. Workflow will continue executing other + tasks while B,C,D... might be still running. + @attention: a new engine is spawned for each branch or code, + all operations works as expected, but mind that the branches + know about themselves, they don't see other tasks outside. + They are passed the object, but not the old workflow + engine object + @postcondition: eng object will contain lock (to be used + by threads) + """ + + def _parallel_split(obj, eng, calls): + lock=thread.allocate_lock() + i = 0 + eng.setVar('lock', lock) + for func in calls: + new_eng = duplicate_engine_instance(eng) + new_eng.setWorkflow([lambda o,e: e.setVar('lock', lock), func]) + thread.start_new_thread(new_eng.process, ([obj], )) + #new_eng.process([obj]) + return lambda o, e: _parallel_split(o, e, args) + + +def SYNCHRONIZE(*args, **kwargs): + """ + After the execution of task B, task C, and task D, task E can be executed. + @var *args: args can be a mix of callables and list of callables + the simplest situation comes when you pass a list of callables + they will be simply executed in parallel. + But if you pass a list of callables (branch of callables) + which is potentionally a new workflow, we will first create a + workflow engine with the workflows, and execute the branch in it + @attention: you should never jump out of the synchronized branches + """ + timeout = MAX_TIMEOUT + if 'timeout' in kwargs: + timeout = kwargs['timeout'] + + if len(args) < 2: + raise Exception('You must pass at least two callables') + + def _synchronize(obj, eng): + queue = MyTimeoutQueue() + #spawn a pool of threads, and pass them queue instance + for i in range(len(args)-1): + t = MySpecialThread(queue) + t.setDaemon(True) + t.start() + + for func in args[0:-1]: + if isinstance(func, list) or isinstance(func, tuple): + new_eng = duplicate_engine_instance(eng) + new_eng.setWorkflow(func) + queue.put(lambda: new_eng.process([obj])) + else: + queue.put(lambda: func(obj, eng)) + + #wait on the queue until everything has been processed + queue.join_with_timeout(timeout) + + #run the last func + args[-1](obj, eng) + _synchronize.__name__ = 'SYNCHRONIZE' + return _synchronize + +def CHOICE(arbiter, *predicates, **kwpredicates): + """ + A choice is made to execute either task B, task C or task D + after execution of task A. + @var arbiter: a function which returns some value (the value + must be inside the predicates dictionary) + @var predicates: list of callables, the first item must be the + value returned by the arbiter, example: + ('submit', task_a), + ('upload' : task_a, [task_b, task_c]...) + @keyword **kwpredicates: you can supply predicates also as a + keywords, example + CHOICE(arbiter, one=lambda...., two=[lambda o,e:...., ...]) + @postcondition: all tasks are 'jumpable' + + """ + workflow = [] + mapping = {} + for branch in predicates: + workflow.append(branch[1:]) + mapping[branch[0]] = len(workflow) + workflow.append(BREAK()) + + for k, v in kwpredicates.items(): + workflow.append(v) + mapping[k] = len(workflow) + workflow.append(BREAK()) + + def _exclusive_choice(obj, eng): + val = arbiter(obj, eng) + i = mapping[val] # die on error + eng.jumpCallForward(i) + c = _exclusive_choice + c.__name__ = arbiter.__name__ + workflow.insert(0, c) + return workflow + + +def SIMPLE_MERGE(*args): + """ + Task E will be started when any one of the tasks B, C or D completes. + This pattern though makes a context assumption: there is no + parallelism preceding task E. + """ + + if len(args) < 2: + raise Exception("You must suply at least 2 callables") + + final_task = args[-1] + workflow = [] + mapping = {} + total = ((len(args)-1) * 2) + 1 + for branch in args[0:-1]: + total -= 2 + workflow.append(branch) + workflow.append(TASK_JUMP_FWD(total)) + + workflow.append(final_task) + return workflow + + +# ------------------------------------------------------------- # +# helper methods/classes # +# ------------------------------------------------------------- # + + +class MyTimeoutQueue(Queue.Queue): + def __init__(self, *args): + Queue.Queue.__init__(self, *args) + + def join_with_timeout(self, timeout): + self.all_tasks_done.acquire() + try: + endtime = time.time() + timeout + while self.unfinished_tasks: + remaining = endtime - time.time() + if remaining <= 0.0: + raise threading.ThreadError('NotFinished') + time.sleep(.05) + self.all_tasks_done.wait(remaining) + finally: + self.all_tasks_done.release() + +class MySpecialThread(threading.Thread): + def __init__(self, itemq, *args, **kwargs): + threading.Thread.__init__(self, *args, **kwargs) + self.itemq = itemq + + def run(self): + call = self.itemq.get() + call() diff --git a/workflow/patterns/utils.py b/workflow/patterns/utils.py index d5cc669..e24ddf0 100644 --- a/workflow/patterns/utils.py +++ b/workflow/patterns/utils.py @@ -1,532 +1,532 @@ - -import inspect -import traceback -import sys -import pstats -import timeit -from workflow.engine import duplicate_engine_instance, WorkflowTransition -try: - import cProfile -except ImportError: - import profile as cProfile - - -def RUN_WF(workflow, engine=None, - processing_factory = None, - callback_chooser = None, - before_processing = None, - after_processing = None, - data_connector = None, - pass_eng = [], - pass_always = None, - outkey = 'RUN_WF', - reinit=False): - """Task for running other workflow - ie. new workflow engine will - be created and the workflow run. The workflow engine is garbage - collected together with the function. Therefore you can run the - function many times and it will reuse the already-loaded WE. In fact - this WE has an empty before_processing callback. - - @see before_processing callback for more information. - - @var workflow: normal workflow tasks definition - @keyword engine: class of the engine to create WE, if None, the new - WFE instance will be of the same class as the calling WFE. - Attention, changes in the classes of the WFE instances may have - many consequences, so be careful. For example, if you use - serialiazable WFE instance, but you create another instance of WFE - which is not serializable, then you will be in problems. - @keyword processing_factory: WE callback - @keyword callback_chooser: WE callback - @keyword before_processing: WE callback - @keyword after_processing: WE callback - --- - @keyword data_connector: callback which will prepare data and pass - the corrent objects into the workflow engine (from the calling - engine into the called WE), if not present, the current obj is - passed (possibly wrapped into a list) - @keyword pass_eng: list of keys corresponding to the values, that should - be passed from the calling engine to the called engine. This is - called only once, during initialization. - @keyword outkey: if outkey is present, the initialized new - workflow engine is stored into the calling workflow engine - so that you can get access to it from outside. This instance - will be available at runtime - @keyword reinit: if True, wfe will be re-instantiated always - for every invocation of the function - """ - - store = [] - - def x(obj, eng=None): - - # decorate the basic callback to make sure the objects of the calling - # engine are always there. But since the default callback no longer - # calls reset(), this is no longer necessary - #if not before_processing: - # old = eng.before_processing - # def _before_processing(obj, eng): - # old(obj, eng) - # setattr(eng, '_objects', obj) - #else: - # _before_processing = None - - if engine: #user supplied class - engine_cls = engine - else: - engine_cls = eng.__class__ - - # a lot of typing, but let's make it explicit what happens... - _processing_factory = processing_factory or engine_cls.processing_factory - _callback_chooser = callback_chooser or engine_cls.callback_chooser - _before_processing = before_processing or engine_cls.before_processing - _after_processing = after_processing or engine_cls.after_processing - - if not store: - store.append(engine_cls(processing_factory=_processing_factory, - callback_chooser=_callback_chooser, - before_processing=_before_processing, - after_processing=_after_processing)) - store[0].setWorkflow(workflow) - - if reinit: # re-init wfe to have a clean plate - store[0] = engine_cls(processing_factory=_processing_factory, - callback_chooser=_callback_chooser, - before_processing=_before_processing, - after_processing=_after_processing) - store[0].setWorkflow(workflow) - - wfe = store[0] - - if outkey: - eng.setVar(outkey, wfe) - - # pass data from the old wf engine to the new one - to_remove = [] - for k in pass_eng: - wfe.setVar(k, eng.getVar(k)) - if not pass_always and not reinit: - to_remove.append(k) - if to_remove: - for k in to_remove: - pass_eng.remove(k) - - - if data_connector: - data = data_connector(obj, eng) - wfe.process(data) - else: - if not isinstance(obj, list): - wfe.process([obj]) - else: - wfe.process(obj) - x.__name__ = 'RUN_WF' - return x - -# -------------------------- useful structures------------------------------------- # - -def EMPTY_CALL(obj, eng): - """Empty call that does nothing""" - pass - -def ENG_GET(something): - """this is the same as lambda obj, eng: eng.getVar('something') - @var something: str, key of the object to retrieve - @return: value of the key from eng object - """ - def x(obj, eng): - return eng.getVar(something) - x.__name__ = 'ENG_GET' - return x - -def ENG_SET(key, value): - """this is the same as lambda obj, eng: eng.setVar('key', value) - @var key: str, key of the object to retrieve - @var value: anything - @attention: this call is executed when the workflow is created - therefore, the key and value must exist at the time - (obj and eng don't exist yet) - """ - def _eng_set(obj, eng): - return eng.setVar(key, value) - _eng_set.__name__ = 'ENG_SET' - return _eng_set - -def OBJ_GET(something, cond='all'): - """this is the same as lambda obj, eng: something in obj and obj[something] - @var something: str, key of the object to retrieve or list of strings - @keyword cond: how to evaluate several keys, all|any|many - @return: value of the key from obj object, if you are looking at several - keys, then a list is returned. Watch for empty and None returns! - - """ - def x(obj, eng): - if isinstance(something, basestring): - return something in obj and obj[something] - else: - if cond.lower() == 'any': - for o in something: - if o in obj and obj[o]: - return obj[o] - elif cond.lower() == 'many': - r = {} - for o in something: - if o in obj and obj[o]: - r[o] = obj[o] - return r - else: - r = {} - for o in something: - if o in obj and obj[o]: - r[o] = obj[o] - else: - return False - return r - - - x.__name__ = 'OBJ_GET' - return x - -def OBJ_SET(key, value): - """this is the same as lambda obj, eng: obj.__setitem__(key, value) - @var key: str, key of the object to retrieve - @var value: anything - @attention: this call is executed when the workflow is created - therefore, the key and value must exist at the time - (obj and eng don't exist yet) - """ - def x(obj, eng): - obj[key] = value - x.__name__ = 'OBJ_SET' - return x - -# ----------------------- error handlling ------------------------------- - -def ERROR(msg='Error in the workflow'): - """Throws uncatchable error stopping execution and printing the message""" - caller = inspect.getmodule(inspect.currentframe().f_back) - if caller : - caller = caller.__file__ - else: - caller = '' - def x(obj, eng): - raise Exception('in %s : %s' % (caller, msg)) - x.__name__ = 'ERROR' - return x - -def TRY(onecall, retry=1, onfailure=Exception, verbose=True): - """Wrap the call in try...except statement and eventually - retries when failure happens - @keyword attempts: how many times to retry - @keyword onfailure: exception to raise or callable to call on failure, - if callable, then it will receive standard obj, eng arguments - """ - - if not callable(onecall): - raise Exception('You can wrap only one callable with TRY') - - def x(obj, eng): - tries = 1 + retry - i = 0 - while i < tries: - try: - onecall(obj, eng) - break # success - except WorkflowTransition, msg: - raise # just let it propagate - except: - if verbose: - eng.log.error('Error reported from the call') - traceback.print_exc() - i += 1 - if i >= tries: - if isinstance(onfailure, Exception): - raise onfailure - elif callable(onfailure): - onfailure(obj, eng) - else: - raise Exception('Error after attempting to run: %s' % onecall) - - x.__name__ = 'TRY' - return x - - - - - -def PROFILE(call, output=None, stats=['time', 'calls', 'cumulative', 'pcalls']): - """Run the call(s) inside profiler - @var call: either function or list of functions - - if it is a single callable, it will be executed - - if it is a list of callables, a new workflow engine (a duplicate) - will be created, the workflow will be set with the calls, and - calls executed; thus by providing list of functions, you are - actually profiling also the workflow engine! - @keyword output: where to save the stats, if empty, it will be printed - to stdout - @keyword stats: list of statistical outputs, - default is: time, calls, cumulative, pcalls - @see pstats module for explanation - """ - - def x(obj, eng): - if isinstance(call, list) or isinstance(call, tuple): - new_eng = duplicate_engine_instance(eng) - new_eng.setWorkflow(call) - profileit = lambda: new_eng.process([obj]) - else: - profileit = lambda: call(obj, eng) - if output: - cProfile.runctx('profileit()', globals(), locals(), output) - else: - cProfile.runctx('profileit()', globals(), locals()) - - if output and stats: - for st in stats: - fname = '%s.stats-%s' % (output, st) - fo = open(fname, 'w') - - p = pstats.Stats(output, stream = fo) - p.strip_dirs() - p.sort_stats(st) - p.print_stats() - - fo.close() - x.__name__ = 'PROFILE' - return x - - -def DEBUG_CYCLE(stmt, setup=None, - onerror=None, - debug_stopper=None, - **kwargs): - """Workflow task DEBUG_CYCLE used to repeatedly execute - certain call - you can effectively reload modules and - hotplug the new code, debug at runtime. The call is - taking advantage of the internal python timeit module. - The parameters are the same as for timeit module - ie. - - @param stmt: string to evaluate (ie. "print sys") - @keyword setup: initialization (ie. "import sys") - - The debug_stopper is a callback that receives (eng, obj) - *after* execution of the main call. If debug_stopper - returns True, it means 'stop', don't continue. - - @keyword onerror: string (like the setup) which will - be appended to setup in case of error. Ie. if execution - failed, you can reload the module and try again. This - gets fired only after an exception! - - You can also pass any number of arguments as keywords, - they will be available to your function at the runtime. - - Here is example of testing engine task: - - >>> from merkur.box.code import debug_cycle - >>> def debug_stopper(obj, eng): - ... if obj: - ... return True - ... - >>> def engtask(config, something): - ... def x(obj, eng): - ... print config - ... print something - ... return x - ... - >>> config = {'some': 'value'} - >>> debug_cycle = testpass.debug_cycle - >>> c = DEBUG_CYCLE("engtask(config, something)(obj,eng)", - ... "from __main__ import engtask", - ... config=config, - ... something='hi!', - ... ) - >>> c('eng', 'obj') - {'some': 'value'} - hi! - >>> - - You can of course test any other python calls, not only - engine tasks with this function. If you want to reload - code, use the setup argument: - - c = DEBUG_CYCLE("mm.engtask(config, something)(obj,eng)", - ... "import mm;reload(mm)", - ... config=config) - - """ - - - if not callable(debug_stopper): - debug_stopper = lambda obj, eng: False - to_pass = {} - if kwargs: - to_pass.update(kwargs) - - def x(obj, eng): - - storage = [0, debug_stopper, True] #counter, callback, flag - - - def _timer(): - if storage[0] == 0: - storage[0] = 1 - return timeit.default_timer() - else: - storage[0] = 0 - try: - if storage[1](obj, eng): - storage[2] = False - except: - traceback.print_exc() - storage[2] = False - return timeit.default_timer() - - class Timer(timeit.Timer): - def timeit(self): - # i am taking advantage of the timeit template - # and passing in the self object inside the array - timing = self.inner([self], self.timer) - return timing - - error_caught = False - while storage[2]: - try: - # include passed in arguments and also obj, eng - _setup = ';'.join(['%s=_it[0].topass[\'%s\']' % (k, k) for k, v in to_pass.items()]) - _setup += '\nobj=_it[0].obj\neng=_it[0].eng' - _setup += '\n%s' % setup - if error_caught and onerror: - _setup += '\n%s' % onerror - try: - t = Timer(stmt, _setup, _timer) - except: - traceback.print_exc() - break - - # set reference to the passed in values - t.topass = to_pass - t.obj = obj - t.eng = eng - - #print t.src - print 'Execution time: %.3s' % (t.timeit()) - except: - traceback.print_exc() - lasterr = traceback.format_exc().splitlines() - if '' in lasterr[-2]: - sys.stderr.write('Error most likely in compilation, printing the source code:\n%s%s\n%s\n' % - ('=' *60, t.src, '=' * 60)) - break - - - x.__name__ = 'DEBUG_CYCLE' - return x - - - -def CALLFUNC(func, outkey=None, debug=False, stopper=None, - args=[], oeargs=[], ekeys={}, okeys={}, **kwargs): - """Workflow task CALLFUNC - This wf task allows you to call any function - @param func: identification of the function, it can be either - string (fully qualified function name) or the callable - itself - @keyword outkey: results of the call will be stored inside - eng.setVar(outkey) if outkey != None - @keyword debug: boolean, if True, we will run the call in a - loop, reloading the module after each error - @keyword stopper: a callable which will receive obj, eng - after each run. If the callable returns True, we will - stop running the func (only applicable when debug=True) - @keyword args: params passed on to the function - @keyword ekeys: dictionary of key:value pairs, we will take - 'value' from the engine, and pass it to the function under - the 'key' name. - @keyword okeys: the same as ekeys, only that values are taken - from the obj - @keyword oeargs: definition of arguments that should be put - inside the *args; you can use syntactic sugar to instruct - system where to take the value, for example Eseman - will - take eng.getVar('seman') -- 'O' [capital letter Oooo] means - take the value from obj - @keyword **kwargs: other kwargs passed on to the function - @return: nothing, value is stored inside obj[outkey] - """ - mod, new_func = _get_mod_func(func) - args = list(args) - def x(obj, eng): - try: - for key in oeargs: - first_key, rest_key = key[0], key[1:] - if first_key == 'O': - args.append(obj[rest_key]) - elif first_key == 'E' and eng.hasVar(rest_key): - args.append(eng.getVar(rest_key)) - else: - if key in obj: - args.append(obj[key]) - elif eng.hasVar(key): - args.append(eng.getVar(key)) - else: - raise Exception("%s is not inside obj nor eng, try specifying Okey or Ekey" % key) - except Exception, msg: - eng.log.error(traceback.format_exc()) - eng.log.error('Check your "oeargs" configuration. Key "%s" not available' % key) - sys.exit(1) - - for k,v in ekeys.items(): - kwargs[k] = eng.getVar(v) - for k,v in okeys.items(): - kwargs[k] = obj[v] - - if debug: - universal_repeater(mod, new_func, stopper, *args, **kwargs) - else: - if outkey: - obj[outkey] = new_func(*args, **kwargs) - else: - new_func(*args, **kwargs) - x.__name__ = 'CALLFUNC' - return x - -# ----------------- not wf tasks ----------------------------- - -def _get_mod_func(func): - """for a given callable finds its module - imports it - and returns module, call -- module can be reloaded""" - # find out module of this call - def get_mod(modid, func_name): - mod = __import__(modid) - components = modid.split('.') - for comp in components[1:]: - mod = getattr(mod, comp) - return getattr(mod, func_name), mod - - if callable(func): - m = func.__module__ - n = func.__name__ - new_func, mod = get_mod(m,n) - else: - m, n = str(func).rsplit('.', 1) - new_func, mod = get_mod(m,n) - return mod, new_func - -def debug_simple(func, *args, **kwargs): - """Runs func with *args, **kwargs and reloads it - after each failure - this is not a wfe task""" - mod, new_func = _get_mod_func(func) - universal_repeater(mod, new_func) - - -def universal_repeater(mod, call, stopper=None, *args, **kwargs): - fname = call.__name__ - while True: - if callable(stopper) and stopper(*args, **kwargs): - break - try: - call(*args, **kwargs) - except: - traceback.print_exc() - reload(mod) - call = getattr(mod, fname) + +import inspect +import traceback +import sys +import pstats +import timeit +from workflow.engine import duplicate_engine_instance, WorkflowTransition +try: + import cProfile +except ImportError: + import profile as cProfile + + +def RUN_WF(workflow, engine=None, + processing_factory = None, + callback_chooser = None, + before_processing = None, + after_processing = None, + data_connector = None, + pass_eng = [], + pass_always = None, + outkey = 'RUN_WF', + reinit=False): + """Task for running other workflow - ie. new workflow engine will + be created and the workflow run. The workflow engine is garbage + collected together with the function. Therefore you can run the + function many times and it will reuse the already-loaded WE. In fact + this WE has an empty before_processing callback. + + @see before_processing callback for more information. + + @var workflow: normal workflow tasks definition + @keyword engine: class of the engine to create WE, if None, the new + WFE instance will be of the same class as the calling WFE. + Attention, changes in the classes of the WFE instances may have + many consequences, so be careful. For example, if you use + serialiazable WFE instance, but you create another instance of WFE + which is not serializable, then you will be in problems. + @keyword processing_factory: WE callback + @keyword callback_chooser: WE callback + @keyword before_processing: WE callback + @keyword after_processing: WE callback + --- + @keyword data_connector: callback which will prepare data and pass + the corrent objects into the workflow engine (from the calling + engine into the called WE), if not present, the current obj is + passed (possibly wrapped into a list) + @keyword pass_eng: list of keys corresponding to the values, that should + be passed from the calling engine to the called engine. This is + called only once, during initialization. + @keyword outkey: if outkey is present, the initialized new + workflow engine is stored into the calling workflow engine + so that you can get access to it from outside. This instance + will be available at runtime + @keyword reinit: if True, wfe will be re-instantiated always + for every invocation of the function + """ + + store = [] + + def x(obj, eng=None): + + # decorate the basic callback to make sure the objects of the calling + # engine are always there. But since the default callback no longer + # calls reset(), this is no longer necessary + #if not before_processing: + # old = eng.before_processing + # def _before_processing(obj, eng): + # old(obj, eng) + # setattr(eng, '_objects', obj) + #else: + # _before_processing = None + + if engine: #user supplied class + engine_cls = engine + else: + engine_cls = eng.__class__ + + # a lot of typing, but let's make it explicit what happens... + _processing_factory = processing_factory or engine_cls.processing_factory + _callback_chooser = callback_chooser or engine_cls.callback_chooser + _before_processing = before_processing or engine_cls.before_processing + _after_processing = after_processing or engine_cls.after_processing + + if not store: + store.append(engine_cls(processing_factory=_processing_factory, + callback_chooser=_callback_chooser, + before_processing=_before_processing, + after_processing=_after_processing)) + store[0].setWorkflow(workflow) + + if reinit: # re-init wfe to have a clean plate + store[0] = engine_cls(processing_factory=_processing_factory, + callback_chooser=_callback_chooser, + before_processing=_before_processing, + after_processing=_after_processing) + store[0].setWorkflow(workflow) + + wfe = store[0] + + if outkey: + eng.setVar(outkey, wfe) + + # pass data from the old wf engine to the new one + to_remove = [] + for k in pass_eng: + wfe.setVar(k, eng.getVar(k)) + if not pass_always and not reinit: + to_remove.append(k) + if to_remove: + for k in to_remove: + pass_eng.remove(k) + + + if data_connector: + data = data_connector(obj, eng) + wfe.process(data) + else: + if not isinstance(obj, list): + wfe.process([obj]) + else: + wfe.process(obj) + x.__name__ = 'RUN_WF' + return x + +# -------------------------- useful structures------------------------------------- # + +def EMPTY_CALL(obj, eng): + """Empty call that does nothing""" + pass + +def ENG_GET(something): + """this is the same as lambda obj, eng: eng.getVar('something') + @var something: str, key of the object to retrieve + @return: value of the key from eng object + """ + def x(obj, eng): + return eng.getVar(something) + x.__name__ = 'ENG_GET' + return x + +def ENG_SET(key, value): + """this is the same as lambda obj, eng: eng.setVar('key', value) + @var key: str, key of the object to retrieve + @var value: anything + @attention: this call is executed when the workflow is created + therefore, the key and value must exist at the time + (obj and eng don't exist yet) + """ + def _eng_set(obj, eng): + return eng.setVar(key, value) + _eng_set.__name__ = 'ENG_SET' + return _eng_set + +def OBJ_GET(something, cond='all'): + """this is the same as lambda obj, eng: something in obj and obj[something] + @var something: str, key of the object to retrieve or list of strings + @keyword cond: how to evaluate several keys, all|any|many + @return: value of the key from obj object, if you are looking at several + keys, then a list is returned. Watch for empty and None returns! + + """ + def x(obj, eng): + if isinstance(something, basestring): + return something in obj and obj[something] + else: + if cond.lower() == 'any': + for o in something: + if o in obj and obj[o]: + return obj[o] + elif cond.lower() == 'many': + r = {} + for o in something: + if o in obj and obj[o]: + r[o] = obj[o] + return r + else: + r = {} + for o in something: + if o in obj and obj[o]: + r[o] = obj[o] + else: + return False + return r + + + x.__name__ = 'OBJ_GET' + return x + +def OBJ_SET(key, value): + """this is the same as lambda obj, eng: obj.__setitem__(key, value) + @var key: str, key of the object to retrieve + @var value: anything + @attention: this call is executed when the workflow is created + therefore, the key and value must exist at the time + (obj and eng don't exist yet) + """ + def x(obj, eng): + obj[key] = value + x.__name__ = 'OBJ_SET' + return x + +# ----------------------- error handlling ------------------------------- + +def ERROR(msg='Error in the workflow'): + """Throws uncatchable error stopping execution and printing the message""" + caller = inspect.getmodule(inspect.currentframe().f_back) + if caller : + caller = caller.__file__ + else: + caller = '' + def x(obj, eng): + raise Exception('in %s : %s' % (caller, msg)) + x.__name__ = 'ERROR' + return x + +def TRY(onecall, retry=1, onfailure=Exception, verbose=True): + """Wrap the call in try...except statement and eventually + retries when failure happens + @keyword attempts: how many times to retry + @keyword onfailure: exception to raise or callable to call on failure, + if callable, then it will receive standard obj, eng arguments + """ + + if not callable(onecall): + raise Exception('You can wrap only one callable with TRY') + + def x(obj, eng): + tries = 1 + retry + i = 0 + while i < tries: + try: + onecall(obj, eng) + break # success + except WorkflowTransition, msg: + raise # just let it propagate + except: + if verbose: + eng.log.error('Error reported from the call') + traceback.print_exc() + i += 1 + if i >= tries: + if isinstance(onfailure, Exception): + raise onfailure + elif callable(onfailure): + onfailure(obj, eng) + else: + raise Exception('Error after attempting to run: %s' % onecall) + + x.__name__ = 'TRY' + return x + + + + + +def PROFILE(call, output=None, stats=['time', 'calls', 'cumulative', 'pcalls']): + """Run the call(s) inside profiler + @var call: either function or list of functions + - if it is a single callable, it will be executed + - if it is a list of callables, a new workflow engine (a duplicate) + will be created, the workflow will be set with the calls, and + calls executed; thus by providing list of functions, you are + actually profiling also the workflow engine! + @keyword output: where to save the stats, if empty, it will be printed + to stdout + @keyword stats: list of statistical outputs, + default is: time, calls, cumulative, pcalls + @see pstats module for explanation + """ + + def x(obj, eng): + if isinstance(call, list) or isinstance(call, tuple): + new_eng = duplicate_engine_instance(eng) + new_eng.setWorkflow(call) + profileit = lambda: new_eng.process([obj]) + else: + profileit = lambda: call(obj, eng) + if output: + cProfile.runctx('profileit()', globals(), locals(), output) + else: + cProfile.runctx('profileit()', globals(), locals()) + + if output and stats: + for st in stats: + fname = '%s.stats-%s' % (output, st) + fo = open(fname, 'w') + + p = pstats.Stats(output, stream = fo) + p.strip_dirs() + p.sort_stats(st) + p.print_stats() + + fo.close() + x.__name__ = 'PROFILE' + return x + + +def DEBUG_CYCLE(stmt, setup=None, + onerror=None, + debug_stopper=None, + **kwargs): + """Workflow task DEBUG_CYCLE used to repeatedly execute + certain call - you can effectively reload modules and + hotplug the new code, debug at runtime. The call is + taking advantage of the internal python timeit module. + The parameters are the same as for timeit module - ie. + + @param stmt: string to evaluate (ie. "print sys") + @keyword setup: initialization (ie. "import sys") + + The debug_stopper is a callback that receives (eng, obj) + *after* execution of the main call. If debug_stopper + returns True, it means 'stop', don't continue. + + @keyword onerror: string (like the setup) which will + be appended to setup in case of error. Ie. if execution + failed, you can reload the module and try again. This + gets fired only after an exception! + + You can also pass any number of arguments as keywords, + they will be available to your function at the runtime. + + Here is example of testing engine task: + + >>> from merkur.box.code import debug_cycle + >>> def debug_stopper(obj, eng): + ... if obj: + ... return True + ... + >>> def engtask(config, something): + ... def x(obj, eng): + ... print config + ... print something + ... return x + ... + >>> config = {'some': 'value'} + >>> debug_cycle = testpass.debug_cycle + >>> c = DEBUG_CYCLE("engtask(config, something)(obj,eng)", + ... "from __main__ import engtask", + ... config=config, + ... something='hi!', + ... ) + >>> c('eng', 'obj') + {'some': 'value'} + hi! + >>> + + You can of course test any other python calls, not only + engine tasks with this function. If you want to reload + code, use the setup argument: + + c = DEBUG_CYCLE("mm.engtask(config, something)(obj,eng)", + ... "import mm;reload(mm)", + ... config=config) + + """ + + + if not callable(debug_stopper): + debug_stopper = lambda obj, eng: False + to_pass = {} + if kwargs: + to_pass.update(kwargs) + + def x(obj, eng): + + storage = [0, debug_stopper, True] #counter, callback, flag + + + def _timer(): + if storage[0] == 0: + storage[0] = 1 + return timeit.default_timer() + else: + storage[0] = 0 + try: + if storage[1](obj, eng): + storage[2] = False + except: + traceback.print_exc() + storage[2] = False + return timeit.default_timer() + + class Timer(timeit.Timer): + def timeit(self): + # i am taking advantage of the timeit template + # and passing in the self object inside the array + timing = self.inner([self], self.timer) + return timing + + error_caught = False + while storage[2]: + try: + # include passed in arguments and also obj, eng + _setup = ';'.join(['%s=_it[0].topass[\'%s\']' % (k, k) for k, v in to_pass.items()]) + _setup += '\nobj=_it[0].obj\neng=_it[0].eng' + _setup += '\n%s' % setup + if error_caught and onerror: + _setup += '\n%s' % onerror + try: + t = Timer(stmt, _setup, _timer) + except: + traceback.print_exc() + break + + # set reference to the passed in values + t.topass = to_pass + t.obj = obj + t.eng = eng + + #print t.src + print 'Execution time: %.3s' % (t.timeit()) + except: + traceback.print_exc() + lasterr = traceback.format_exc().splitlines() + if '' in lasterr[-2]: + sys.stderr.write('Error most likely in compilation, printing the source code:\n%s%s\n%s\n' % + ('=' *60, t.src, '=' * 60)) + break + + + x.__name__ = 'DEBUG_CYCLE' + return x + + + +def CALLFUNC(func, outkey=None, debug=False, stopper=None, + args=[], oeargs=[], ekeys={}, okeys={}, **kwargs): + """Workflow task CALLFUNC + This wf task allows you to call any function + @param func: identification of the function, it can be either + string (fully qualified function name) or the callable + itself + @keyword outkey: results of the call will be stored inside + eng.setVar(outkey) if outkey != None + @keyword debug: boolean, if True, we will run the call in a + loop, reloading the module after each error + @keyword stopper: a callable which will receive obj, eng + after each run. If the callable returns True, we will + stop running the func (only applicable when debug=True) + @keyword args: params passed on to the function + @keyword ekeys: dictionary of key:value pairs, we will take + 'value' from the engine, and pass it to the function under + the 'key' name. + @keyword okeys: the same as ekeys, only that values are taken + from the obj + @keyword oeargs: definition of arguments that should be put + inside the *args; you can use syntactic sugar to instruct + system where to take the value, for example Eseman - will + take eng.getVar('seman') -- 'O' [capital letter Oooo] means + take the value from obj + @keyword **kwargs: other kwargs passed on to the function + @return: nothing, value is stored inside obj[outkey] + """ + mod, new_func = _get_mod_func(func) + args = list(args) + def x(obj, eng): + try: + for key in oeargs: + first_key, rest_key = key[0], key[1:] + if first_key == 'O': + args.append(obj[rest_key]) + elif first_key == 'E' and eng.hasVar(rest_key): + args.append(eng.getVar(rest_key)) + else: + if key in obj: + args.append(obj[key]) + elif eng.hasVar(key): + args.append(eng.getVar(key)) + else: + raise Exception("%s is not inside obj nor eng, try specifying Okey or Ekey" % key) + except Exception, msg: + eng.log.error(traceback.format_exc()) + eng.log.error('Check your "oeargs" configuration. Key "%s" not available' % key) + sys.exit(1) + + for k,v in ekeys.items(): + kwargs[k] = eng.getVar(v) + for k,v in okeys.items(): + kwargs[k] = obj[v] + + if debug: + universal_repeater(mod, new_func, stopper, *args, **kwargs) + else: + if outkey: + obj[outkey] = new_func(*args, **kwargs) + else: + new_func(*args, **kwargs) + x.__name__ = 'CALLFUNC' + return x + +# ----------------- not wf tasks ----------------------------- + +def _get_mod_func(func): + """for a given callable finds its module - imports it + and returns module, call -- module can be reloaded""" + # find out module of this call + def get_mod(modid, func_name): + mod = __import__(modid) + components = modid.split('.') + for comp in components[1:]: + mod = getattr(mod, comp) + return getattr(mod, func_name), mod + + if callable(func): + m = func.__module__ + n = func.__name__ + new_func, mod = get_mod(m,n) + else: + m, n = str(func).rsplit('.', 1) + new_func, mod = get_mod(m,n) + return mod, new_func + +def debug_simple(func, *args, **kwargs): + """Runs func with *args, **kwargs and reloads it + after each failure - this is not a wfe task""" + mod, new_func = _get_mod_func(func) + universal_repeater(mod, new_func) + + +def universal_repeater(mod, call, stopper=None, *args, **kwargs): + fname = call.__name__ + while True: + if callable(stopper) and stopper(*args, **kwargs): + break + try: + call(*args, **kwargs) + except: + traceback.print_exc() + reload(mod) + call = getattr(mod, fname)