Skip to content

Commit

Permalink
Add v2 model serving (and refactor serving as dir) (#463)
Browse files Browse the repository at this point in the history
  • Loading branch information
yaronha committed Oct 12, 2020
1 parent 94a094c commit 0d1f04d
Show file tree
Hide file tree
Showing 22 changed files with 2,397 additions and 429 deletions.
95 changes: 64 additions & 31 deletions mlrun/__main__.py
Expand Up @@ -34,7 +34,7 @@
from .model import RunTemplate
from .projects import load_project
from .run import new_function, import_function_to_dict, import_function, get_object
from .runtimes import RemoteRuntime, RunError, RuntimeKinds
from .runtimes import RemoteRuntime, RunError, RuntimeKinds, ServingRuntime
from .secrets import SecretsStore
from .utils import (
list2dict,
Expand Down Expand Up @@ -185,23 +185,9 @@ def run(
mlconf.dbpath = db

if func_url:
try:
if func_url.startswith("db://"):
func_url = func_url[5:]
project, name, tag, hash_key = parse_function_uri(func_url)
mldb = get_run_db(mlconf.dbpath).connect()
runtime = mldb.get_function(name, project, tag, hash_key)
else:
func_url = "function.yaml" if func_url == "." else func_url
runtime = import_function_to_dict(func_url, {})
except Exception as e:
print("function {} not found, {}".format(func_url, e))
exit(1)

if not runtime:
print("function {} not found or is null".format(func_url))
runtime = func_url_to_runtime(func_url)
if runtime is None:
exit(1)

kind = get_in(runtime, "kind", "")
if kind not in ["", "local", "dask"] and url:
if path.isfile(url) and url.endswith(".py"):
Expand Down Expand Up @@ -413,38 +399,64 @@ def build(
@main.command(context_settings=dict(ignore_unknown_options=True))
@click.argument("spec", type=str, required=False)
@click.option("--source", "-s", default="", help="location/url of the source")
@click.option(
"--func-url",
"-f",
default="",
help="path/url of function yaml or function " "yaml or db://<project>/<name>[:tag]",
)
@click.option("--dashboard", "-d", default="", help="nuclio dashboard url")
@click.option("--project", "-p", default="", help="project name")
@click.option("--model", "-m", multiple=True, help="model name and path (name=path)")
@click.option("--kind", "-k", default=None, help="runtime sub kind")
@click.option("--tag", default="", help="version tag")
@click.option("--env", "-e", multiple=True, help="environment variables")
@click.option("--verbose", is_flag=True, help="verbose log")
def deploy(spec, source, dashboard, project, model, tag, kind, env, verbose):
def deploy(spec, source, func_url, dashboard, project, model, tag, kind, env, verbose):
"""Deploy model or function"""
if spec:
if func_url:
runtime = func_url_to_runtime(func_url)
if runtime is None:
exit(1)
elif spec:
runtime = py_eval(spec)
else:
runtime = {}
if not isinstance(runtime, dict):
print("runtime parameter must be a dict, not {}".format(type(runtime)))
exit(1)

f = RemoteRuntime.from_dict(runtime)
f.spec.source = source
if kind:
f.spec.function_kind = kind
if verbose:
pprint(runtime)
pprint(model)

# support both v1 & v2+ model struct for backwards compatibility
if runtime and runtime["kind"] == RuntimeKinds.serving:
print("Deploying V2 model server")
function = ServingRuntime.from_dict(runtime)
if model:
# v2+ model struct (list of json obj)
for _model in model:
args = json.loads(_model)
function.add_model(**args)
else:
function = RemoteRuntime.from_dict(runtime)
if kind:
function.spec.function_kind = kind
if model:
# v1 model struct (list of k=v)
models = list2dict(model)
for k, v in models.items():
function.add_model(k, v)

function.spec.source = source
if env:
for k, v in list2dict(env).items():
f.set_env(k, v)
f.verbose = verbose
if model:
models = list2dict(model)
for k, v in models.items():
f.add_model(k, v)
function.set_env(k, v)
function.verbose = verbose

try:
addr = f.deploy(dashboard=dashboard, project=project, tag=tag, kind=kind)
addr = function.deploy(dashboard=dashboard, project=project, tag=tag)
except Exception as err:
print("deploy error: {}".format(err))
exit(1)
Expand All @@ -453,7 +465,7 @@ def deploy(spec, source, dashboard, project, model, tag, kind, env, verbose):
with open("/tmp/output", "w") as fp:
fp.write(addr)
with open("/tmp/name", "w") as fp:
fp.write(f.status.nuclio_name)
fp.write(function.status.nuclio_name)


@main.command(context_settings=dict(ignore_unknown_options=True))
Expand Down Expand Up @@ -883,5 +895,26 @@ def dict_to_str(struct: dict):
return ",".join(["{}={}".format(k, v) for k, v in struct.items()])


def func_url_to_runtime(func_url):
try:
if func_url.startswith("db://"):
func_url = func_url[5:]
project, name, tag, hash_key = parse_function_uri(func_url)
mldb = get_run_db(mlconf.dbpath).connect()
runtime = mldb.get_function(name, project, tag, hash_key)
else:
func_url = "function.yaml" if func_url == "." else func_url
runtime = import_function_to_dict(func_url, {})
except Exception as e:
logger.error("function {} not found, {}".format(func_url, e))
return None

if not runtime:
logger.error("function {} not found or is null".format(func_url))
return None

return runtime


if __name__ == "__main__":
main()
28 changes: 22 additions & 6 deletions mlrun/kfpops.py
Expand Up @@ -16,6 +16,7 @@
from copy import deepcopy
from os import environ

import mlrun
from .db import get_or_set_dburl
from .utils import run_keys, dict_to_yaml, logger, gen_md_table, get_artifact_target
from .config import config
Expand Down Expand Up @@ -409,19 +410,18 @@ def mlrun_pipeline(
def deploy_op(
name,
function,
func_url=None,
source="",
dashboard="",
project="",
models: dict = None,
models: list = None,
env: dict = None,
tag="",
verbose=False,
):
from kfp import dsl

models = {} if models is None else models
runtime = "{}".format(function.to_dict())
cmd = ["python", "-m", "mlrun", "deploy", runtime]
cmd = ["python", "-m", "mlrun", "deploy"]
if source:
cmd += ["-s", source]
if dashboard:
Expand All @@ -432,11 +432,27 @@ def deploy_op(
cmd += ["--verbose"]
if project:
cmd += ["-p", project]
for m, val in models.items():
cmd += ["-m", "{}={}".format(m, val)]

if models:
for m in models:
for key in ["model_path", "model_url", "class_name"]:
if key in m:
m[key] = str(m[key]) # verify we stringify pipeline params
if function.kind == mlrun.runtimes.RuntimeKinds.serving:
cmd += ["-m", json.dumps(m)]
else:
cmd += ["-m", "{}={}".format(m["key"], m["model_path"])]

if env:
for key, val in env.items():
cmd += ["--env", "{}={}".format(key, val)]

if func_url:
cmd += ["-f", func_url]
else:
runtime = f"{function.to_dict()}"
cmd += [runtime]

cop = dsl.ContainerOp(
name=name,
image=config.kfp_image,
Expand Down
5 changes: 3 additions & 2 deletions mlrun/mlutils/__init__.py
Expand Up @@ -2,13 +2,14 @@

from .models import (
get_class_fit,
create_class,
create_function,
gen_sklearn_model,
eval_class_model,
eval_model_v2,
)

# for backwards compatibility
from ..utils.helpers import create_class, create_function

from .plots import (
gcf_clear,
feature_importances,
Expand Down
27 changes: 0 additions & 27 deletions mlrun/mlutils/models.py
Expand Up @@ -48,33 +48,6 @@ def get_class_fit(module_pkg_class: str):
}


def create_class(pkg_class: str):
"""Create a class from a package.module.class string
:param pkg_class: full class location,
e.g. "sklearn.model_selection.GroupKFold"
"""
splits = pkg_class.split(".")
clfclass = splits[-1]
pkg_module = splits[:-1]
class_ = getattr(import_module(".".join(pkg_module)), clfclass)
return class_


def create_function(pkg_func: list):
"""Create a function from a package.module.function string
:param pkg_func: full function location,
e.g. "sklearn.feature_selection.f_classif"
"""
splits = pkg_func.split(".")
pkg_module = ".".join(splits[:-1])
cb_fname = splits[-1]
pkg_module = __import__(pkg_module, fromlist=[cb_fname])
function_ = getattr(pkg_module, cb_fname)
return function_


def gen_sklearn_model(model_pkg, skparams):
"""generate an sklearn model configuration
Expand Down
78 changes: 78 additions & 0 deletions mlrun/model.py
Expand Up @@ -88,6 +88,84 @@ def copy(self):
return deepcopy(self)


# model class for building ModelObj dictionaries
class ObjectDict:
def __init__(self, classes_map, default_kind=""):
self._children = {}
self._default_kind = default_kind
self._classes_map = classes_map

def values(self):
return self._children.values()

def keys(self):
return self._children.keys()

def items(self):
return self._children.items()

def __len__(self):
return len(self._children)

def __iter__(self):
yield from self._children.keys()

def __getitem__(self, name):
return self._children[name]

def __setitem__(self, key, item):
self._children[key] = self._get_child_object(item, key)

def __delitem__(self, key):
del self._children[key]

def to_dict(self):
return {k: v.to_dict() for k, v in self._children.items()}

@classmethod
def from_dict(cls, classes_map: dict, children=None, default_kind=""):
if children is None:
return cls(classes_map, default_kind)
if not isinstance(children, dict):
raise ValueError("children must be a dict")

new_obj = cls(classes_map, default_kind)
for name, child in children.items():
child_obj = new_obj._get_child_object(child, name)
new_obj._children[name] = child_obj

return new_obj

def _get_child_object(self, child, name):
if hasattr(child, "kind") and child.kind in self._classes_map.keys():
child.name = name
return child
elif isinstance(child, dict):
kind = child.get("kind", self._default_kind)
if kind not in self._classes_map.keys():
raise ValueError(f"illegal object kind {kind}")
child_obj = self._classes_map[kind].from_dict(child)
child_obj.name = name
return child_obj
else:
raise ValueError(f"illegal child (should be dict or child kind), {child}")

def to_yaml(self):
return dict_to_yaml(self.to_dict())

def to_json(self):
return dict_to_json(self.to_dict())

def to_str(self):
return "{}".format(self.to_dict())

def __str__(self):
return str(self.to_dict())

def copy(self):
return deepcopy(self)


class BaseMetadata(ModelObj):
def __init__(
self,
Expand Down

0 comments on commit 0d1f04d

Please sign in to comment.