diff --git a/docs/CHANGELOG.md b/docs/CHANGELOG.md index 970ea363..fa3ec1da 100644 --- a/docs/CHANGELOG.md +++ b/docs/CHANGELOG.md @@ -1,3 +1,11 @@ +# 3.2.1 (2020.5.28) +- Allow pyppl_postrun hook to stop the exception raising +- Make proc_prerun hook access proc props and initialized jobs +- Fix tag set in runtime not updated to proc in log +- Get progress bar length according to the terminal width +- Deprecate setup hook +- Fix job input loses key of empty list + # 3.2.0 (2020.5.11) - Add plugin names in logging messages diff --git a/docs/api.md b/docs/api.md index 75a8b8a5..dbd96b2a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -336,6 +336,42 @@ The signature +!!! abstract "method: `log_msg_len(procid, joblen, ret, fallback, maxlen, minlen)`" + + Get the progress bar size (length) according to the terminal width + + - **params** + + - `procid (str)`: The process id + + - `joblen (int)`: The number of jobs + + - `fallback (int)`: The value to return if we fail + + - `adjust (int)`: Adjust the length to get the length of message with + + or without proc id and/or job index indicators + + - `with-pbar`: return the progress bar size with everything + + - `with-nothing`: return the length without process id nor job + + index indicator + + - `with-proc`: return the length of message with proc id only + + - `with-job`: return the length of message with proc id and + + job index indicator + + - `maxlen (int)`: The maximum length of returned length + + - `minlen (int)`: The minimum length of returned length + + - **returns** + + - `(int)`: The progress bar size + !!! abstract "method: `name2filename(name)`" Convert any name to a valid filename @@ -1405,8 +1441,6 @@ - `PBAR_LEVEL (dict)`: The levels for different states - - `PBAR_SIZE (int)`: the size of the progress bar - !!! example "class: `Jobmgr`" Job manager @@ -1641,7 +1675,7 @@ - `proc (Proc)`: The Proc instance - - `status (str)`: succeeded/failed + - `status (str)`: succeeded/failed/cached !!! abstract "method: `proc_prerun(proc)`" @@ -1668,8 +1702,9 @@ PLUGIN API After the pipeline is done - If the pipeline fails, this won't run. + If you want to do something after a process fails Use proc_postrun(proc = proc, status = 'failed') instead. + Return False will stop exception being raised - **params** @@ -1691,6 +1726,8 @@ PLUGIN API Add default configs + Note that this is not running for runtime plugins, which are regiested later + after the module is loaded. - **params** diff --git a/pyppl/__init__.py b/pyppl/__init__.py index 71aac5a7..6bb2e05e 100644 --- a/pyppl/__init__.py +++ b/pyppl/__init__.py @@ -1,6 +1,6 @@ """PyPPL - A Python PiPeLine framework.""" -__version__ = "3.2.0" +__version__ = "3.2.1" from .config import load_config from .plugin import config_plugins diff --git a/pyppl/_job.py b/pyppl/_job.py index 81087cba..0b101902 100644 --- a/pyppl/_job.py +++ b/pyppl/_job.py @@ -105,6 +105,7 @@ def job_input(this, value): # pylint: disable=too-many-branches (key, intype), slevel='INFILE_EMPTY', level="warning") + this.data.i[key] = [] continue if not isinstance(indata, list): diff --git a/pyppl/config.py b/pyppl/config.py index f6ba1d90..06e94269 100644 --- a/pyppl/config.py +++ b/pyppl/config.py @@ -85,4 +85,5 @@ def load_config(default_config, *config_files): # save runners in memory register_runner(runner) config_plugins(*config.plugins) +# deprecate this, since at this point, no plugins are loaded yet pluginmgr.hook.setup(config=config) diff --git a/pyppl/jobmgr.py b/pyppl/jobmgr.py index 2a442a9c..8e57d86e 100644 --- a/pyppl/jobmgr.py +++ b/pyppl/jobmgr.py @@ -3,14 +3,13 @@ STATES (dict): Possible states for the job PBAR_MARKS (dict): The marks on progress bar for different states PBAR_LEVEL (dict): The levels for different states - PBAR_SIZE (int): the size of the progress bar """ import sys from time import sleep from threading import Lock from queue import Queue from diot import Diot -from .utils import StateMachine, PQueue, ThreadPool +from .utils import StateMachine, PQueue, ThreadPool, log_msg_len from .plugin import pluginmgr from .logger import logger from .exception import JobBuildingError, JobFailError @@ -68,14 +67,12 @@ STATES.KILLFAILED: 'KILLING', } -PBAR_SIZE = 50 - class Jobmgr: """@API Job manager""" - __slots__ = ('jobs', 'proc', 'stop', 'queue', 'nslots', 'lock') + __slots__ = ('jobs', 'proc', 'stop', 'queue', 'nslots', 'lock', 'barsize') def __init__(self, jobs): """@API @@ -101,6 +98,9 @@ def __init__(self, jobs): self.queue = PQueue(batch_len=len(jobs)) self.nslots = min(self.queue.batch_len, int(self.proc.nthread)) + self.barsize = log_msg_len(procid=self.proc.id, + joblen=len(jobs)) + for job in jobs: self.queue.put(job.index) @@ -178,7 +178,9 @@ def start(self): Start the queue. """ # no jobs - if not hasattr(self, 'lock'): + try: + self.jobs + except AttributeError: return pool = ThreadPool(self.nslots, initializer=self.worker) @@ -191,16 +193,16 @@ def _get_jobs_by_states(self, *states): def _distribute_jobs_to_pbar(self): joblen = len(self.jobs) index_bjobs = [] - if joblen <= PBAR_SIZE: - div, mod = divmod(PBAR_SIZE, joblen) + if joblen <= self.barsize: + div, mod = divmod(self.barsize, joblen) for j in range(joblen): step = div + 1 if j < mod else div for _ in range(step): index_bjobs.append([j]) else: jobx = 0 - div, mod = divmod(joblen, PBAR_SIZE) - for i in range(PBAR_SIZE): + div, mod = divmod(joblen, self.barsize) + for i in range(self.barsize): step = div + 1 if i < mod else div index_bjobs.append([jobx + jobstep for jobstep in range(step)]) jobx += step diff --git a/pyppl/plugin.py b/pyppl/plugin.py index 10ec51c9..b39ae679 100644 --- a/pyppl/plugin.py +++ b/pyppl/plugin.py @@ -19,16 +19,21 @@ hookspec = pluggy.HookspecMarker(PMNAME) -@hookspec +@hookspec( + warn_on_impl=DeprecationWarning("This hook is deprecated. " + "Please initialize configs inside " + "your plugin") +) def setup(config): """@API PLUGIN API Add default configs + Note that this is not running for runtime plugins, which are regiested later + after the module is loaded. @params: config (Config): The default configurations """ - @hookspec def proc_init(proc): """@API @@ -51,7 +56,6 @@ def proc_prerun(proc): proc (Proc): The Proc instance """ - @hookspec def proc_postrun(proc, status): """@API @@ -59,7 +63,7 @@ def proc_postrun(proc, status): After a process has done @params: proc (Proc): The Proc instance - status (str): succeeded/failed + status (str): succeeded/failed/cached """ @@ -86,13 +90,14 @@ def pyppl_prerun(ppl): """ -@hookspec +@hookspec(firstresult=True) def pyppl_postrun(ppl): """@API PLUGIN API After the pipeline is done - If the pipeline fails, this won't run. + If you want to do something after a process fails Use proc_postrun(proc = proc, status = 'failed') instead. + Return False will stop exception being raised @params: ppl (PyPPL): The PyPPL instance """ diff --git a/pyppl/proc.py b/pyppl/proc.py index 3c23b267..a695a1a1 100644 --- a/pyppl/proc.py +++ b/pyppl/proc.py @@ -226,11 +226,8 @@ def run(self, runtime_config): self.output # pylint: disable=pointless-statement self.suffix # pylint: disable=pointless-statement logger.workdir(self.workdir, proc=self.id) - ret = pluginmgr.hook.proc_prerun(proc=self) - # plugins can stop process from running - if ret is not False: - self._save_settings() - self._run_jobs() + self._save_settings() + self._run_jobs() self._post_run() def _post_run(self): @@ -273,6 +270,7 @@ def _post_run(self): pluginmgr.hook.proc_postrun(proc=self, status='failed') sys.exit(1) + logger.debug("Running hook.proc_postrun ...", proc=self.id) pluginmgr.hook.proc_postrun( proc=self, status='cached' @@ -280,6 +278,7 @@ def _post_run(self): 'succeeded') if self.jobs: + # pylint: disable=unsupported-delete-operation del self.jobs[:] def _save_settings(self): @@ -303,7 +302,18 @@ def stringify(conf): def _run_jobs(self): logger.debug('Queue starts ...', proc=self.id) + # jobs are constructed here, hook.job_init called jobmgr = Jobmgr(self.jobs) + + logger.debug("Running hook.proc_prerun ...", proc=self.id) + ret = pluginmgr.hook.proc_prerun(proc=self) + # plugins can stop process from running + # let's put it here since jobs have been init'ed + # we can then access proc.size, input, etc.. + if ret is False: + # avoid error in _post_run + self.jobs = [] + return # we need to jobs to be initialized, as during initialization # use_runner called, and we only initialize that runner runnermgr.hook.runner_init(proc=self) diff --git a/pyppl/pyppl.py b/pyppl/pyppl.py index 75f04b26..7daf2b67 100644 --- a/pyppl/pyppl.py +++ b/pyppl/pyppl.py @@ -22,10 +22,10 @@ ProcessAlreadyRegistered, PyPPLWrongPositionMethod, PyPPLMethodExists) -from .utils import try_deepcopy, name2filename, fs +from .utils import try_deepcopy, name2filename, fs, log_msg_len # length of separators in log -SEPARATOR_LEN = 80 +SEPARATOR_LEN = log_msg_len(ret='with-nothing', minlen=45) # Regiester processes PROCESSES = set() # tips @@ -239,16 +239,13 @@ def __init__(self, config=None, name=None, config_files=None, **kwconfigs): self.props = Diot() # for plugins pluginmgr.hook.pyppl_init(ppl=self) - def run(self, profile='default'): + def run(self, profile='default'): # pylint: disable=too-many-branches """@API Run the pipeline with certain profile @params: profile (str): The profile name """ - # plugins can stop pipeling being running - if pluginmgr.hook.pyppl_prerun(ppl=self) is False: - pluginmgr.hook.pyppl_postrun(ppl=self) - return self + # This function needs to be simplified later # for default profile, we shall not load anything from default_config # as part/base of runtime config, since they have alread been used as @@ -277,59 +274,79 @@ def run(self, profile='default'): defconfig.pop('logger', None) defconfig.pop('plugins', None) - for proc in self.procs: - # echo the process name and description - name = '%s%s: ' % ( - proc.name, - ' (%s)' % proc.origin - if proc.origin and proc.origin != proc.id - else '' - ) - - logger.process('-' * SEPARATOR_LEN) - if len(name + proc.desc) > SEPARATOR_LEN: - logger.process(name) - for i in textwrap.wrap(proc.desc, - SEPARATOR_LEN, - initial_indent=' ', - subsequent_indent=' '): - logger.process(i) - else: - logger.process(name + proc.desc) - - logger.process('-' * SEPARATOR_LEN) - - # echo the dependencies - depends = ([dproc.name for dproc in proc.depends] - if proc.depends - else ['START']) - nexts = ([nproc.name for nproc in proc.nexts] - if proc.nexts - else ['END']) - depmaxlen = max([len(dep) for dep in depends]) - nxtmaxlen = max([len(nxt) for nxt in nexts]) - lendiff = len(depends) - len(nexts) - lessprocs = depends if lendiff < 0 else nexts - lendiff = abs(lendiff) - lessprocs.extend([''] * (lendiff - int(lendiff / 2))) - for i in range(int(lendiff/2)): - lessprocs.insert(0, '') - - for i in range(len(lessprocs)): - if i == int((len(lessprocs) - 1) / 2): - logger.depends('| %s | => %s => | %s |', - depends[i].ljust(depmaxlen), - proc.name, - nexts[i].ljust(nxtmaxlen)) + # update tags, and maybe someother props that are import in prerun + if 'tag' in defconfig: + for proc in self.procs: + if proc._setcounter.get('tag', 0) > 0: + continue + proc.tag = defconfig.tag + + # plugins can stop pipeling being running + if pluginmgr.hook.pyppl_prerun(ppl=self) is False: + pluginmgr.hook.pyppl_postrun(ppl=self) + return self + + try: + for proc in self.procs: + # echo the process name and description + name = '%s%s: ' % ( + proc.name, + ' (%s)' % proc.origin + if proc.origin and proc.origin != proc.id + else '' + ) + + logger.process('-' * SEPARATOR_LEN) + if len(name + proc.desc) > SEPARATOR_LEN: + logger.process(name) + for i in textwrap.wrap(proc.desc, + SEPARATOR_LEN, + initial_indent=' ', + subsequent_indent=' '): + logger.process(i) else: - logger.depends('| %s | %s | %s |', - depends[i].ljust(depmaxlen), - ' ' * len(proc.name), - nexts[i].ljust(nxtmaxlen)) + logger.process(name + proc.desc) + + logger.process('-' * SEPARATOR_LEN) + + # echo the dependencies + depends = ([dproc.name for dproc in proc.depends] + if proc.depends + else ['START']) + nexts = ([nproc.name for nproc in proc.nexts] + if proc.nexts + else ['END']) + depmaxlen = max([len(dep) for dep in depends]) + nxtmaxlen = max([len(nxt) for nxt in nexts]) + lendiff = len(depends) - len(nexts) + lessprocs = depends if lendiff < 0 else nexts + lendiff = abs(lendiff) + lessprocs.extend([''] * (lendiff - int(lendiff / 2))) + for i in range(int(lendiff/2)): + lessprocs.insert(0, '') + + for i in range(len(lessprocs)): + middle = (proc.name if i == int((len(lessprocs) - 1) / 2) + else ' ' * len(proc.name)) + logger.depends( + '| {0} | {1} {2} {1} | {3} |'.format( + depends[i].ljust(depmaxlen), + '=>' if i == int((len(lessprocs)-1)/2) else ' ', + middle, + nexts[i].ljust(nxtmaxlen) + ) + ) + + proc.run(defconfig) - proc.run(defconfig) + except BaseException as ex: + logger.error(f"{type(ex).__name__}:") + logger.error(str(ex)) + if pluginmgr.hook.pyppl_postrun(ppl=self) is not False: + raise + else: + pluginmgr.hook.pyppl_postrun(ppl=self) - pluginmgr.hook.pyppl_postrun(ppl=self) return self def start(self, *anything): diff --git a/pyppl/utils.py b/pyppl/utils.py index 2b4fea05..ba428791 100644 --- a/pyppl/utils.py +++ b/pyppl/utils.py @@ -4,6 +4,7 @@ from copy import deepcopy from queue import PriorityQueue from threading import Thread +import shutil import cmdy from transitions import Transition, Machine from liquid.stream import safe_split @@ -22,6 +23,64 @@ def name2filename(name): name = re.sub(r'_+', '_', name) return name.strip('_') +def log_msg_len(procid='', # pylint: disable=too-many-arguments + joblen=0, + ret='with-pbar', + fallback=50, + maxlen=80, + minlen=20): + """@API + Get the progress bar size (length) according to the terminal width + @params: + procid (str): The process id + joblen (int): The number of jobs + fallback (int): The value to return if we fail + adjust (int): Adjust the length to get the length of message with + or without proc id and/or job index indicators + - `with-pbar`: return the progress bar size with everything + - `with-nothing`: return the length without process id nor job + index indicator + - `with-proc`: return the length of message with proc id only + - `with-job`: return the length of message with proc id and + job index indicator + maxlen (int): The maximum length of returned length + minlen (int): The minimum length of returned length + @returns: + (int): The progress bar size + """ + # pylint: disable=line-too-long + # calculate # chars other than the pbar + # typically: + # [05-27 17:59:58 MAIN.JOBDONE] pProcess: [3/3] [ ...pbar... ] Done: 100.0% | Running: 0 + # ------------------------------------j-j---............--------------------------j + # pylint: enable=line-too-long + proclen = len(procid) + jobidlen = len(str(joblen)) + + if ret == 'with-pbar': + nonpbar_len = (33 + # logformat + proclen+2 + # proc id + 1+jobidlen+1+jobidlen+2 + # job index indicator + 1 + # [ + 2 + # ] + 24+jobidlen) # Done ... + elif ret == 'with-nothing': + nonpbar_len = 33 + elif ret == 'with-proc': + nonpbar_len = 33 + proclen+2 + elif ret == 'with-job': + nonpbar_len = 33 + proclen+2 + 2*jobidlen+4 + + total_fallback = nonpbar_len + fallback + + try: + total_width = shutil.get_terminal_size().columns - 1 + except (OSError, AttributeError): # pragma: no cover + total_width = total_fallback + + if not isinstance(total_width, int) or total_width <= nonpbar_len: + total_width = total_fallback + return max(minlen, min(maxlen, total_width - nonpbar_len)) def format_secs(seconds): """@API diff --git a/pyproject.toml b/pyproject.toml index e1ac64ce..1cb8e6fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.masonry.api" [tool.poetry] name = "PyPPL" -version = "3.2.0" +version = "3.2.1" description = "A Python PiPeLine framework" authors = [ "pwwang ",] license = "MIT" diff --git a/setup.py b/setup.py index 6d166bc6..109dc95b 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ setup( long_description=readme, name='PyPPL', - version='3.2.0', + version='3.2.1', description='A Python PiPeLine framework', python_requires='==3.*,>=3.6.0', project_urls={ diff --git a/tests/test_jobmgr.py b/tests/test_jobmgr.py index 9a495707..d70a6e95 100644 --- a/tests/test_jobmgr.py +++ b/tests/test_jobmgr.py @@ -179,8 +179,9 @@ def test_start_all_running(job_done, jobindex_reset, forks, caplog): jobindex_reset(jobs) jobs[0].proc.forks = forks jm = Jobmgr(jobs) + jm.barsize = 10 jm.start() - assert '[==================================================]' in caplog.text + assert '[==========]' in caplog.text @pytest.mark.parametrize('pbarsize, expect', [ @@ -190,6 +191,7 @@ def test_start_all_running(job_done, jobindex_reset, forks, caplog): ]) # len(jobs_all) == 8 def test_distributejobstopbar(jobs_all, pbarsize, expect): from pyppl import jobmgr - jobmgr.PBAR_SIZE = pbarsize + #jobmgr.PBAR_SIZE = pbarsize jm = Jobmgr(jobs_all) + jm.barsize = pbarsize assert jm._distribute_jobs_to_pbar() == expect diff --git a/tox.ini b/tox.ini index bc2dfefa..f3e9cfc9 100644 --- a/tox.ini +++ b/tox.ini @@ -1,3 +1,4 @@ [pytest] addopts = -vv --cov-config=.coveragerc --cov=pyppl --cov-report xml:.coverage.xml --cov-report term-missing console_output_style = progress +junit_family = xunit1