Skip to content

Commit

Permalink
add verbose print and fix v3io stream (#268)
Browse files Browse the repository at this point in the history
* add verbose print and fix v3io stream
* add pythonpath env in jobs
  • Loading branch information
yaronha committed May 18, 2020
1 parent c015be3 commit 1150218
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 8 deletions.
5 changes: 3 additions & 2 deletions mlrun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,12 @@ def main():
@click.option('--workdir', default='', help='run working directory')
@click.option('--label', multiple=True, help="run labels (key=val)")
@click.option('--watch', '-w', is_flag=True, help='watch/tail run log')
@click.option('--verbose', is_flag=True, help='verbose log')
@click.argument('run_args', nargs=-1, type=click.UNPROCESSED)
def run(url, param, inputs, outputs, in_path, out_path, secrets, uid,
name, workflow, project, db, runtime, kfp, hyperparam, param_file,
selector, func_url, task, handler, mode, schedule, from_env, dump,
image, workdir, label, watch, run_args):
image, workdir, label, watch, verbose, run_args):
"""Execute a task and inject parameters."""

out_path = out_path or environ.get('MLRUN_ARTIFACT_PATH')
Expand Down Expand Up @@ -177,7 +178,7 @@ def run(url, param, inputs, outputs, in_path, out_path, secrets, uid,
set_item(runobj.spec, outputs, run_keys.outputs, list(outputs))
set_item(runobj.spec, secrets, run_keys.secrets, line2keylist(secrets, 'kind', 'source'))

if kfp:
if kfp or runobj.spec.verbose or verbose:
print('MLRun version: {}'.format(get_version()))
print('Runtime:')
pprint(runtime)
Expand Down
7 changes: 6 additions & 1 deletion mlrun/kfpops.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ def mlrun_op(name: str = '', project: str = '', function=None, func_url=None,
hyperparams: dict = None, param_file: str = '', labels: dict = None,
selector: str = '', inputs: dict = None, outputs: list = None,
in_path: str = '', out_path: str = '', rundb: str = '',
mode: str = '', handler: str = '', more_args: list = None):
mode: str = '', handler: str = '', more_args: list = None,
verbose=None):
"""mlrun KubeFlow pipelines operator, use to form pipeline steps
when using kubeflow pipelines, each step is wrapped in an mlrun_op
Expand Down Expand Up @@ -171,6 +172,7 @@ def mlrun_op(name: str = '', project: str = '', function=None, func_url=None,
:param mode: run mode, e.g. 'noctx' for pushing params as args
:param handler code entry-point/hanfler name
:param job_image name of the image user for the job
:param verbose: add verbose prints/logs
:return: KFP step operation
Expand Down Expand Up @@ -263,6 +265,7 @@ def mlrun_pipeline(
secrets = secrets or runobj.spec.secret_sources
project = project or runobj.metadata.project
labels = runobj.metadata.labels or labels
verbose = verbose or runobj.spec.verbose

if not name:
if not function_name:
Expand Down Expand Up @@ -321,6 +324,8 @@ def mlrun_pipeline(
cmd += ['--image', job_image]
if mode:
cmd += ['--mode', mode]
if verbose:
cmd += ['--verbose']
if more_args:
cmd += more_args

Expand Down
3 changes: 2 additions & 1 deletion mlrun/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class RunSpec(ModelObj):
def __init__(self, parameters=None, hyperparams=None, param_file=None,
selector=None, handler=None, inputs=None, outputs=None,
input_path=None, output_path=None, function=None,
secret_sources=None, data_stores=None):
secret_sources=None, data_stores=None, verbose=None):

self.parameters = parameters or {}
self.hyperparams = hyperparams or {}
Expand All @@ -159,6 +159,7 @@ def __init__(self, parameters=None, hyperparams=None, param_file=None,
self.function = function
self._secret_sources = secret_sources or []
self._data_stores = data_stores
self.verbose = verbose

def to_dict(self, fields=None, exclude=None):
struct = super().to_dict(fields, exclude=['handler'])
Expand Down
3 changes: 2 additions & 1 deletion mlrun/platforms/iguazio.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ def __init__(self, stream_path, shards=1):
path=self._stream_path,
shard_count=shards,
raise_for_status=v3io.dataplane.RaiseForStatus.never)
response.raise_for_status([409, 204])
if not (response.status_code == 400 and 'ResourceInUse' in str(response.body)):
response.raise_for_status([409, 204])

def push(self, data):
if not isinstance(data, list):
Expand Down
12 changes: 9 additions & 3 deletions mlrun/runtimes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ def _get_db(self):
def run(self, runspec: RunObject = None, handler=None, name: str = '',
project: str = '', params: dict = None, inputs: dict = None,
out_path: str = '', workdir: str = '', artifact_path: str = '',
watch: bool = True, schedule: str = ''):
watch: bool = True, schedule: str = '', verbose=None):
"""Run a local or remote task.
:param runspec: run template object or dict (see RunTemplate)
Expand All @@ -191,6 +191,7 @@ def run(self, runspec: RunObject = None, handler=None, name: str = '',
:param workdir: default input artifacts path
:param watch: watch/follow run log
:param schedule: cron string for scheduled jobs
:param verbose: add verbose prints/logs
:return: run context object (dict) with run metadata, results and
status
Expand Down Expand Up @@ -222,6 +223,7 @@ def run(self, runspec: RunObject = None, handler=None, name: str = '',
or self.metadata.project
runspec.spec.parameters = params or runspec.spec.parameters
runspec.spec.inputs = inputs or runspec.spec.inputs
runspec.spec.verbose = verbose or runspec.spec.verbose
runspec.spec.output_path = out_path or artifact_path \
or runspec.spec.output_path
runspec.spec.input_path = workdir or runspec.spec.input_path \
Expand Down Expand Up @@ -388,6 +390,8 @@ def _get_cmd_args(self, runobj, with_mlrun):
extra_env = {'MLRUN_EXEC_CONFIG': runobj.to_json()}
if self.spec.rundb:
extra_env['MLRUN_DBPATH'] = self.spec.rundb or config.dbpath
if self.spec.pythonpath:
extra_env['PYTHONPATH'] = self.spec.pythonpath
args = []
command = self.spec.command
code = self.spec.build.functionSourceCode \
Expand Down Expand Up @@ -510,7 +514,7 @@ def as_step(self, runspec: RunObject = None, handler=None, name: str = '',
project: str = '', params: dict = None, hyperparams=None,
selector='', inputs: dict = None, outputs: dict = None,
workdir: str = '', artifact_path: str = '', image: str = '',
labels: dict = None, use_db=True):
labels: dict = None, use_db=True, verbose=None):
"""Run a local or remote task.
:param runspec: run template object or dict (see RunTemplate)
Expand All @@ -527,6 +531,7 @@ def as_step(self, runspec: RunObject = None, handler=None, name: str = '',
:param image: container image to use
:param labels: labels to tag the job/run with ({key:val, ..})
:param use_db: save function spec in the db (vs the workflow file)
:param verbose: add verbose prints/logs
:return: KubeFlow containerOp
"""
Expand All @@ -544,7 +549,8 @@ def as_step(self, runspec: RunObject = None, handler=None, name: str = '',
runobj=runspec, handler=handler, params=params,
hyperparams=hyperparams, selector=selector,
inputs=inputs, outputs=outputs, job_image=image,
labels=labels, out_path=artifact_path, in_path=workdir)
labels=labels, out_path=artifact_path,
in_path=workdir, verbose=verbose)

def export(self, target='', format='.yaml', secrets=None, strip=True):
"""save function spec to a local/remote path (default to
Expand Down
3 changes: 3 additions & 0 deletions mlrun/runtimes/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,9 @@ def _run(self, runobj: RunObject, execution):
environ['MLRUN_DBPATH'] = self.spec.rundb

handler = runobj.spec.handler
logger.info('starting local run: {} # {}'.format(
self.spec.command, handler or 'main'))

if handler:
if self.spec.pythonpath:
set_paths(self.spec.pythonpath)
Expand Down

0 comments on commit 1150218

Please sign in to comment.