Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bump to 3.2.1 #97

Merged
merged 7 commits into from
May 28, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions docs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
# 3.2.1 (2020.5.28)
- Allow pyppl_postrun hook to stop the exception raising
- Make proc_prerun hook access proc props and initialized jobs
- Fix tag set in runtime not updated to proc in log
- Get progress bar length according to the terminal width
- Deprecate setup hook
- Fix job input loses key of empty list

# 3.2.0 (2020.5.11)
- Add plugin names in logging messages

Expand Down
45 changes: 41 additions & 4 deletions docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,42 @@

The signature

!!! abstract "method: `log_msg_len(procid, joblen, ret, fallback, maxlen, minlen)`"

Get the progress bar size (length) according to the terminal width

- **params**

- `procid (str)`: The process id

- `joblen (int)`: The number of jobs

- `fallback (int)`: The value to return if we fail

- `adjust (int)`: Adjust the length to get the length of message with

or without proc id and/or job index indicators

- `with-pbar`: return the progress bar size with everything

- `with-nothing`: return the length without process id nor job

index indicator

- `with-proc`: return the length of message with proc id only

- `with-job`: return the length of message with proc id and

job index indicator

- `maxlen (int)`: The maximum length of returned length

- `minlen (int)`: The minimum length of returned length

- **returns**

- `(int)`: The progress bar size

!!! abstract "method: `name2filename(name)`"

Convert any name to a valid filename
Expand Down Expand Up @@ -1405,8 +1441,6 @@

- `PBAR_LEVEL (dict)`: The levels for different states

- `PBAR_SIZE (int)`: the size of the progress bar

!!! example "class: `Jobmgr`"

Job manager
Expand Down Expand Up @@ -1641,7 +1675,7 @@

- `proc (Proc)`: The Proc instance

- `status (str)`: succeeded/failed
- `status (str)`: succeeded/failed/cached

!!! abstract "method: `proc_prerun(proc)`"

Expand All @@ -1668,8 +1702,9 @@

PLUGIN API
After the pipeline is done
If the pipeline fails, this won't run.
If you want to do something after a process fails
Use proc_postrun(proc = proc, status = 'failed') instead.
Return False will stop exception being raised

- **params**

Expand All @@ -1691,6 +1726,8 @@

PLUGIN API
Add default configs
Note that this is not running for runtime plugins, which are regiested later
after the module is loaded.

- **params**

Expand Down
2 changes: 1 addition & 1 deletion pyppl/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""PyPPL - A Python PiPeLine framework."""

__version__ = "3.2.0"
__version__ = "3.2.1"

from .config import load_config
from .plugin import config_plugins
Expand Down
1 change: 1 addition & 0 deletions pyppl/_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def job_input(this, value): # pylint: disable=too-many-branches
(key, intype),
slevel='INFILE_EMPTY',
level="warning")
this.data.i[key] = []
continue

if not isinstance(indata, list):
Expand Down
1 change: 1 addition & 0 deletions pyppl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,5 @@ def load_config(default_config, *config_files):
# save runners in memory
register_runner(runner)
config_plugins(*config.plugins)
# deprecate this, since at this point, no plugins are loaded yet
pluginmgr.hook.setup(config=config)
22 changes: 12 additions & 10 deletions pyppl/jobmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
STATES (dict): Possible states for the job
PBAR_MARKS (dict): The marks on progress bar for different states
PBAR_LEVEL (dict): The levels for different states
PBAR_SIZE (int): the size of the progress bar
"""
import sys
from time import sleep
from threading import Lock
from queue import Queue
from diot import Diot
from .utils import StateMachine, PQueue, ThreadPool
from .utils import StateMachine, PQueue, ThreadPool, log_msg_len
from .plugin import pluginmgr
from .logger import logger
from .exception import JobBuildingError, JobFailError
Expand Down Expand Up @@ -68,14 +67,12 @@
STATES.KILLFAILED: 'KILLING',
}

PBAR_SIZE = 50


class Jobmgr:
"""@API
Job manager"""

__slots__ = ('jobs', 'proc', 'stop', 'queue', 'nslots', 'lock')
__slots__ = ('jobs', 'proc', 'stop', 'queue', 'nslots', 'lock', 'barsize')

def __init__(self, jobs):
"""@API
Expand All @@ -101,6 +98,9 @@ def __init__(self, jobs):
self.queue = PQueue(batch_len=len(jobs))
self.nslots = min(self.queue.batch_len, int(self.proc.nthread))

self.barsize = log_msg_len(procid=self.proc.id,
joblen=len(jobs))

for job in jobs:
self.queue.put(job.index)

Expand Down Expand Up @@ -178,7 +178,9 @@ def start(self):
Start the queue.
"""
# no jobs
if not hasattr(self, 'lock'):
try:
self.jobs
except AttributeError:
return

pool = ThreadPool(self.nslots, initializer=self.worker)
Expand All @@ -191,16 +193,16 @@ def _get_jobs_by_states(self, *states):
def _distribute_jobs_to_pbar(self):
joblen = len(self.jobs)
index_bjobs = []
if joblen <= PBAR_SIZE:
div, mod = divmod(PBAR_SIZE, joblen)
if joblen <= self.barsize:
div, mod = divmod(self.barsize, joblen)
for j in range(joblen):
step = div + 1 if j < mod else div
for _ in range(step):
index_bjobs.append([j])
else:
jobx = 0
div, mod = divmod(joblen, PBAR_SIZE)
for i in range(PBAR_SIZE):
div, mod = divmod(joblen, self.barsize)
for i in range(self.barsize):
step = div + 1 if i < mod else div
index_bjobs.append([jobx + jobstep for jobstep in range(step)])
jobx += step
Expand Down
17 changes: 11 additions & 6 deletions pyppl/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,21 @@
hookspec = pluggy.HookspecMarker(PMNAME)


@hookspec
@hookspec(
warn_on_impl=DeprecationWarning("This hook is deprecated. "
"Please initialize configs inside "
"your plugin")
)
def setup(config):
"""@API
PLUGIN API
Add default configs
Note that this is not running for runtime plugins, which are regiested later
after the module is loaded.
@params:
config (Config): The default configurations
"""


@hookspec
def proc_init(proc):
"""@API
Expand All @@ -51,15 +56,14 @@ def proc_prerun(proc):
proc (Proc): The Proc instance
"""


@hookspec
def proc_postrun(proc, status):
"""@API
PLUGIN API
After a process has done
@params:
proc (Proc): The Proc instance
status (str): succeeded/failed
status (str): succeeded/failed/cached
"""


Expand All @@ -86,13 +90,14 @@ def pyppl_prerun(ppl):
"""


@hookspec
@hookspec(firstresult=True)
def pyppl_postrun(ppl):
"""@API
PLUGIN API
After the pipeline is done
If the pipeline fails, this won't run.
If you want to do something after a process fails
Use proc_postrun(proc = proc, status = 'failed') instead.
Return False will stop exception being raised
@params:
ppl (PyPPL): The PyPPL instance
"""
Expand Down
20 changes: 15 additions & 5 deletions pyppl/proc.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,8 @@ def run(self, runtime_config):
self.output # pylint: disable=pointless-statement
self.suffix # pylint: disable=pointless-statement
logger.workdir(self.workdir, proc=self.id)
ret = pluginmgr.hook.proc_prerun(proc=self)
# plugins can stop process from running
if ret is not False:
self._save_settings()
self._run_jobs()
self._save_settings()
self._run_jobs()
self._post_run()

def _post_run(self):
Expand Down Expand Up @@ -273,13 +270,15 @@ def _post_run(self):
pluginmgr.hook.proc_postrun(proc=self, status='failed')
sys.exit(1)

logger.debug("Running hook.proc_postrun ...", proc=self.id)
pluginmgr.hook.proc_postrun(
proc=self,
status='cached'
if jobs and len(jobs.get(STATES.DONECACHED, [])) == self.size else
'succeeded')

if self.jobs:
# pylint: disable=unsupported-delete-operation
del self.jobs[:]

def _save_settings(self):
Expand All @@ -303,7 +302,18 @@ def stringify(conf):

def _run_jobs(self):
logger.debug('Queue starts ...', proc=self.id)
# jobs are constructed here, hook.job_init called
jobmgr = Jobmgr(self.jobs)

logger.debug("Running hook.proc_prerun ...", proc=self.id)
ret = pluginmgr.hook.proc_prerun(proc=self)
# plugins can stop process from running
# let's put it here since jobs have been init'ed
# we can then access proc.size, input, etc..
if ret is False:
# avoid error in _post_run
self.jobs = []
return
# we need to jobs to be initialized, as during initialization
# use_runner called, and we only initialize that runner
runnermgr.hook.runner_init(proc=self)
Expand Down
Loading