Skip to content

Commit

Permalink
fix get pipelines (return valid dict) (#284)
Browse files Browse the repository at this point in the history
* fix get pipelines (return valid dict)
* add project get_run_status(), add label selector to cli get xx
* add log info
* change --wait to --watch for cli consistency
* add notification (to slack, git, ..) mechanism
  • Loading branch information
yaronha committed May 31, 2020
1 parent 9dd963e commit e27a657
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 14 deletions.
33 changes: 23 additions & 10 deletions mlrun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@
from .db import get_run_db
from .k8s_utils import K8sHelper
from .model import RunTemplate
from .run import new_function, import_function_to_dict, import_function, get_object
from .run import (new_function, import_function_to_dict, import_function,
get_object)
from .runtimes import RemoteRuntime, RunError
from .utils import (list2dict, logger, run_keys, update_in, get_in,
parse_function_uri, dict_to_yaml, pr_comment)
parse_function_uri, dict_to_yaml, pr_comment, RunNotifications)


@click.group()
Expand Down Expand Up @@ -380,6 +381,10 @@ def watch(pod, namespace, timeout):
@click.argument('extra_args', nargs=-1, type=click.UNPROCESSED)
def get(kind, name, selector, namespace, uid, project, tag, db, extra_args):
"""List/get one or more object per kind/class."""

if db:
mlconf.dbpath = db

if kind.startswith('po'):
k8s = K8sHelper(namespace)
if name:
Expand All @@ -399,13 +404,13 @@ def get(kind, name, selector, namespace, uid, project, tag, db, extra_args):
start = i.status.start_time.strftime("%b %d %H:%M:%S")
print('{:10} {:16} {:8} {}'.format(state, start, task, name))
elif kind.startswith('run'):
mldb = get_run_db(db or mlconf.dbpath).connect()
mldb = get_run_db().connect()
if name:
run = mldb.read_run(name, project=project)
print(dict_to_yaml(run))
return

runs = mldb.list_runs(uid=uid, project=project)
runs = mldb.list_runs(uid=uid, project=project, labels=selector)
df = runs.to_df()[['name', 'uid', 'iter', 'start', 'state', 'parameters', 'results']]
#df['uid'] = df['uid'].apply(lambda x: '..{}'.format(x[-6:]))
df['start'] = df['start'].apply(time_str)
Expand All @@ -414,21 +419,21 @@ def get(kind, name, selector, namespace, uid, project, tag, db, extra_args):
print(tabulate(df, headers='keys'))

elif kind.startswith('art'):
mldb = get_run_db(db or mlconf.dbpath).connect()
artifacts = mldb.list_artifacts(name, project=project, tag=tag)
mldb = get_run_db().connect()
artifacts = mldb.list_artifacts(name, project=project, tag=tag, labels=selector)
df = artifacts.to_df()[['tree', 'key', 'iter', 'kind', 'path', 'hash', 'updated']]
df['tree'] = df['tree'].apply(lambda x: '..{}'.format(x[-8:]))
df['hash'] = df['hash'].apply(lambda x: '..{}'.format(x[-6:]))
print(tabulate(df, headers='keys'))

elif kind.startswith('func'):
mldb = get_run_db(db or mlconf.dbpath).connect()
mldb = get_run_db().connect()
if name:
f = mldb.get_function(name, project=project, tag=tag)
print(dict_to_yaml(f))
return

functions = mldb.list_functions(name, project=project)
functions = mldb.list_functions(name, project=project, labels=selector)
lines = []
headers = ['kind', 'state', 'name:tag', 'hash']
for f in functions:
Expand Down Expand Up @@ -503,12 +508,13 @@ def logs(uid, project, offset, db, watch):
@click.option('--init-git', is_flag=True, help='for new projects init git context')
@click.option('--clone', '-c', is_flag=True, help='force override/clone the context dir')
@click.option('--sync', is_flag=True, help='sync functions into db')
@click.option('--watch', '-w', is_flag=True, help='wait for pipeline completion (with -r flag)')
@click.option('--dirty', '-d', is_flag=True, help='allow git with uncommited changes')
@click.option('--git-repo', help='git repo (org/repo) for git comments')
@click.option('--git-issue', type=int, default=None, help='git issue number for git comments')
def project(context, name, url, run, arguments, artifact_path,
param, secrets, namespace, db, init_git, clone, sync,
dirty, git_repo, git_issue):
watch, dirty, git_repo, git_issue):
"""load and/or run a project"""
if db:
mlconf.dbpath = db
Expand Down Expand Up @@ -559,7 +565,8 @@ def project(context, name, url, run, arguments, artifact_path,
print(message)
print('run id: {}'.format(run))

if git_repo and git_issue:
gitops = git_repo and git_issue
if gitops:
if not had_error:
message = 'Pipeline started id={}'.format(run)
if proj.params and 'commit' in proj.params:
Expand All @@ -573,6 +580,12 @@ def project(context, name, url, run, arguments, artifact_path,
if had_error:
exit(1)

if watch:
n = RunNotifications(with_slack=True).print()
if gitops:
n.git_comment(git_repo, git_issue, token=proj.get_secret('GITHUB_TOKEN'))
proj.get_run_status(run, notifiers=n)

elif sync:
print('saving project functions to db ..')
proj.sync_functions(save=True)
Expand Down
2 changes: 2 additions & 0 deletions mlrun/api/api/endpoints/pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ def get_pipeline(run_id,
client = kfclient(namespace=namespace)
try:
run = client.get_run(run_id)
if run:
run = run.to_dict()
except Exception as e:
log_and_raise(HTTPStatus.INTERNAL_SERVER_ERROR, reason="get kfp error: {}".format(e))

Expand Down
42 changes: 40 additions & 2 deletions mlrun/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import shutil

from tabulate import tabulate

from ..db import get_run_db
from ..artifacts import ArtifactManager, ArtifactProducer, dict_to_artifact
from ..secrets import SecretsStore
Expand All @@ -28,12 +30,12 @@
from ..datastore import StoreManager
from ..config import config
from ..run import (import_function, code_to_function, new_function,
download_object, run_pipeline, get_object)
download_object, run_pipeline, get_object, wait_for_pipeline_completion)
import importlib.util as imputil
from urllib.parse import urlparse
from kfp import compiler

from ..utils import update_in, new_pipe_meta, logger
from ..utils import update_in, new_pipe_meta, logger, dict_to_str, is_ipython, RunNotifications
from ..runtimes.utils import add_code_metadata


Expand Down Expand Up @@ -552,6 +554,12 @@ def get_secret(self, key: str):
return self._secrets.get(key)
return None

def get_param(self, key: str, default=None):
"""get project param by key"""
if self.params:
return self.params.get(key)
return None

def run(self, name=None, workflow_path=None, arguments=None,
artifact_path=None, namespace=None, sync=False, dirty=False):
"""run a workflow using kubeflow pipelines
Expand Down Expand Up @@ -656,6 +664,36 @@ def save_workflow(self, name, target, artifact_path=None, ttl=None):
if code:
remove(workflow_path)

def get_run_status(self, workflow_id, timeout=60 * 60,
expected_statuses=None,
notifiers: RunNotifications = None):
status = ''
if timeout:
logger.info('waiting for pipeline run completion')
run_info = wait_for_pipeline_completion(
workflow_id, timeout=timeout,
expected_statuses=expected_statuses)
if run_info:
status = run_info['run'].get('status')

mldb = get_run_db().connect(self._secrets)
runs = mldb.list_runs(project=self.name, labels=f'workflow={workflow_id}')

had_errors = 0
for r in runs:
if r['status'].get('state', '') == 'error':
had_errors += 1

text = f'Workflow {workflow_id} finished'
if had_errors:
text += f' with {had_errors} errors'
if status:
text += f', status={status}'

if notifiers:
notifiers.push(text, runs)
return status, had_errors, text

def clear_context(self):
"""delete all files and clear the context dir"""
if self.context and path.exists(self.context) and path.isdir(self.context):
Expand Down
8 changes: 6 additions & 2 deletions mlrun/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -643,7 +643,7 @@ def wait_for_pipeline_completion(run_id,

def get_pipeline_if_completed(run_id, namespace=namespace):
resp = mldb.get_pipeline(run_id, namespace=namespace)
status = resp['_run']['_status']
status = resp['run']['status']
if status.lower() not in RunStatuses.stable_statuses():

# TODO: think of nicer liveness indication and make it re-usable
Expand All @@ -667,8 +667,10 @@ def get_pipeline_if_completed(run_id, namespace=namespace):
else:
client = Client(namespace=namespace)
resp = client.wait_for_run_completion(run_id, timeout)
if resp:
resp = resp.to_dict()

status = resp['_run']['_status']
status = resp['run']['status'] if resp else 'unknown'
if expected_statuses:
if status not in expected_statuses:
raise RuntimeError(f"run status {status} not in expected statuses")
Expand Down Expand Up @@ -696,6 +698,8 @@ def get_pipeline(run_id, namespace=None):
else:
client = Client(namespace=namespace)
resp = client.get_run(run_id)
if resp:
resp = resp.to_dict()

return resp

Expand Down
118 changes: 118 additions & 0 deletions mlrun/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import numpy as np
import requests
import yaml
from tabulate import tabulate
from yaml.representer import RepresenterError

from .config import config
Expand Down Expand Up @@ -225,6 +226,10 @@ def dict_to_list(struct: dict):
return ['{}={}'.format(k, v) for k, v in struct.items()]


def dict_to_str(struct: dict, sep=','):
return sep.join(dict_to_list(struct))


def numpy_representer_seq(dumper, data):
return dumper.represent_list(data.tolist())

Expand Down Expand Up @@ -474,3 +479,116 @@ def retry_until_successful(interval: int, timeout: int, logger, verbose: bool, _
f" last_exception: {last_exception},"
f" function_name: {_function.__name__},"
f" timeout: {timeout}")


class RunNotifications:
def __init__(self, with_ipython=True, with_slack=False):
self._hooks = []
self._html = ''
self.with_ipython = with_ipython
if with_slack and 'SLACK_WEBHOOK' in environ:
self.slack()

def push(self, message, runs):
for h in self._hooks:
try:
h(message, runs)
except Exception as e:
logger.warning(f'failed to push notification, {e}')
if self.with_ipython and is_ipython:
import IPython
IPython.display.display(IPython.display.HTML(
self._get_html(message, runs)))

def _get_html(self, message, runs):
if self._html:
return self._html

html = '<h2>Run Results</h2>' + message
html += '<br>click the hyper links below to see detailed results<br>'
html += runs.show(display=False, short=True)
self._html = html
return html

def print(self):
def _print(message, runs):
table = []
for r in runs:
state = r['status'].get('state', '')
if state == 'error':
result = r['status'].get('error', '')
else:
result = dict_to_str(r['status'].get('results', {}))

table.append([state,
r['metadata']['name'],
'..' + r['metadata']['uid'][-6:],
result])
print(message + '\n' + tabulate(
table, headers=['status', 'name', 'uid', 'results']))

self._hooks.append(_print)
return self

def slack(self, webhook=''):
emoji = {'completed': ':smiley:',
'running': ':man-running:',
'error': ':x:'}

template = '{}/projects/{}/jobs/{}/info'

webhook = webhook or environ.get('SLACK_WEBHOOK')
if not webhook:
raise ValueError('Slack webhook is not set')

def row(text):
return {'type': 'mrkdwn', 'text': text}

def _slack(message, runs):
fields = [row('*Runs*'), row('*Results*')]
for r in runs:
meta = r['metadata']
if config.ui_url:
url = template.format(config.ui_url, meta.get('project'), meta.get('uid'))
line = f'<{url}|*{meta.get("name")}*>'
else:
line = meta.get("name")
state = r['status'].get('state', '')
line = f'{emoji.get(state, ":question:")} {line}'

fields.append(row(line))
if state == 'error':
result = '*{}*'.format(r['status'].get('error', ''))
else:
result = dict_to_str(r['status'].get('results', {}), ', ')
fields.append(row(result or 'None'))

data = {
'blocks': [
{"type": "section",
"text": {"type": "mrkdwn", "text": message}
}
]
}

for i in range(0, len(fields), 8):
data['blocks'].append({"type": "section",
"fields": fields[i:i + 8]})
response = requests.post(webhook, data=json.dumps(
data), headers={'Content-Type': 'application/json'})
response.raise_for_status()

self._hooks.append(_slack)
return self

def git_comment(self, git_repo=None, git_issue=None, token=None):
def _comment(message, runs):
pr_comment(git_repo or self._get_param('git_repo'),
git_issue or self._get_param('git_issue'),
self._get_html(message, runs),
token=token)

self._hooks.append(_comment)
return self


0 comments on commit e27a657

Please sign in to comment.