Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Serving] Add flow topology support #621

Merged
merged 13 commits into from
Dec 29, 2020
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ pytest-alembic~=0.2
requests-mock~=1.8
# needed for system tests
matplotlib~=3.0
storey @ git+https://github.com/mlrun/storey.git
191 changes: 153 additions & 38 deletions mlrun/runtimes/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,22 @@
# limitations under the License.

import json
from typing import List, Union
import mlrun
import nuclio

from ..model import ObjectList
from .function import RemoteRuntime, NuclioSpec
from ..utils import logger
from ..serving.server import create_mock_server
from .function_reference import FunctionReference
from ..utils import logger, get_caller_globals
from ..serving.server import create_graph_server, GraphServer
from ..serving.states import (
ServingRouterState,
RouterState,
StateKinds,
RootFlowState,
graph_root_setter,
new_remote_endpoint,
new_model_endpoint,
StateKinds,
)

serving_subkind = "serving_v2"
Expand Down Expand Up @@ -91,6 +97,10 @@ def __init__(
parameters=None,
default_class=None,
load_mode=None,
build=None,
function_refs=None,
graph_initializer=None,
error_stream=None,
):

super().__init__(
Expand All @@ -115,22 +125,37 @@ def __init__(
function_kind=serving_subkind,
service_account=service_account,
readiness_timeout=readiness_timeout,
build=build,
)

self.models = models or {}
self._graph = None
self.graph: ServingRouterState = graph
self.graph: Union[RouterState, RootFlowState] = graph
self.parameters = parameters or {}
self.default_class = default_class
self.load_mode = load_mode
self._function_refs: ObjectList = None
self.function_refs = function_refs or []
self.graph_initializer = graph_initializer
self.error_stream = error_stream

@property
def graph(self) -> ServingRouterState:
def graph(self) -> Union[RouterState, RootFlowState]:
"""states graph, holding the serving workflow/DAG topology"""
return self._graph

@graph.setter
def graph(self, graph):
self._graph = self._verify_dict(graph, "graph", ServingRouterState)
graph_root_setter(self, graph)

@property
def function_refs(self) -> List[FunctionReference]:
""""function references, list of optional child function refs"""
yaronha marked this conversation as resolved.
Show resolved Hide resolved
return self._function_refs

@function_refs.setter
def function_refs(self, function_refs: List[FunctionReference]):
self._function_refs = ObjectList.from_list(FunctionReference, function_refs)


class ServingRuntime(RemoteRuntime):
Expand All @@ -145,19 +170,43 @@ def spec(self, spec):
self._spec = self._verify_dict(spec, "spec", ServingSpec)

def set_topology(
self, topology=None, class_name=None, exist_ok=False, **class_args
):
"""set the serving graph topology (router/flow/endpoint) and root class"""
self, topology=None, class_name=None, engine=None, exist_ok=False, **class_args,
) -> Union[RootFlowState, RouterState]:
"""set the serving graph topology (router/flow) and root class or params

e.g.:
graph = fn.set_topology("flow", engine="async")
graph.to("MyClass").to(name="to_json", handler="json.dumps").respond()

topology can be:
router - root router + multiple child route states/models
route is usually determined by the path (route key/name)
can specify special router class and router arguments

flow - workflow (DAG) with a chain of states
flow support "sync" and "async" engines, branches are note allowed in sync mode
yaronha marked this conversation as resolved.
Show resolved Hide resolved
when using async mode .respond() can indicate the state which will
yaronha marked this conversation as resolved.
Show resolved Hide resolved
generate the (REST) call response

:param topology: - graph topology, router or flow
:param class_name: - optional for router, router class name/path
:param engine: - optional for flow, sync or async engine (default to async)
:param exist_ok: - allow overriding existing topology
:param class_args: - optional, router/flow class init args

:return graph object (fn.spec.graph)
"""
topology = topology or StateKinds.router
if self.spec.graph and not exist_ok:
raise ValueError("graph topology is already set")
raise ValueError("graph topology is already set, cannot be overwritten")
yaronha marked this conversation as resolved.
Show resolved Hide resolved

# currently we only support router topology
if topology != StateKinds.router:
raise NotImplementedError("currently only supporting router topology")
self.spec.graph = ServingRouterState(
class_name=class_name, class_args=class_args
)
if topology == StateKinds.router:
self.spec.graph = RouterState(class_name=class_name, class_args=class_args)
elif topology == StateKinds.flow:
self.spec.graph = RootFlowState(engine=engine)
else:
raise ValueError(f"unsupported topology {topology}, use 'router' or 'flow'")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
raise ValueError(f"unsupported topology {topology}, use 'router' or 'flow'")
raise mlrun.errors.MLRunInvalidArgumentError(f"unsupported topology {topology}, use 'router' or 'flow'")

This is repeating, please replace in other places as well

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return self.spec.graph

def set_tracking(self, stream_path, batch=None, sample=None):
"""set tracking log stream parameters"""
Expand All @@ -176,14 +225,17 @@ def add_model(
handler=None,
**class_args,
):
"""add ml model and/or route to the function
"""add ml model and/or route to the function.

Example, create a function (from the notebook), add a model class, and deploy:

fn = code_to_function(kind='serving')
fn.add_model('boost', model_path, model_class='MyClass', my_arg=5)
fn.deploy()

only works with router topology, for nested topologies (model under router under flow)
need to add router to flow and use router.add_route()

:param key: model api key (or name:version), will determine the relative url/path
:param model_path: path to mlrun model artifact or model directory file/object path
:param class_name: V2 Model python class name
Expand All @@ -193,10 +245,17 @@ def add_model(
:param class_args: extra kwargs to pass to the model serving class __init__
(can be read in the model using .get_param(key) method)
"""
graph = self.spec.graph
if not graph:
self.set_topology()

if graph.kind != StateKinds.router:
raise ValueError("models can only be added under router state")

if not model_path and not model_url:
raise ValueError("model_path or model_url must be provided")
class_name = class_name or self.spec.default_class
if not isinstance(class_name, str):
if class_name and not isinstance(class_name, str):
raise ValueError(
"class name must be a string (name ot module.submodule.name)"
)
Expand All @@ -205,58 +264,114 @@ def add_model(
if model_path:
model_path = str(model_path)

if not self.spec.graph:
self.set_topology()

if model_url:
route = new_remote_endpoint(model_url, **class_args)
state = new_remote_endpoint(model_url, **class_args)
else:
route = new_model_endpoint(class_name, model_path, handler, **class_args)
self.spec.graph.add_route(key, route)
state = new_model_endpoint(class_name, model_path, handler, **class_args)

def remove_models(self, keys: list):
"""remove one, multiple, or all models from the spec (blank list for all)"""
return graph.add_route(key, state)

def add_child_function(
self, name, url=None, image=None, requirements=None, kind=None
Hedingber marked this conversation as resolved.
Show resolved Hide resolved
):
"""in a multi-function pipeline add child function"""
function_reference = FunctionReference(
url, image, requirements=requirements, kind=kind or "serving"
)
self._spec.function_refs.update(function_reference, name)
func = function_reference.to_function(self.kind)
func.set_env("SERVING_CURRENT_FUNCTION", function_reference.name)
return func

def _add_ref_triggers(self):
"""add stream trigger to downstream child functions"""
for function, stream in self.spec.graph.get_queue_links().items():
yaronha marked this conversation as resolved.
Show resolved Hide resolved
if stream.path:
if function not in self._spec.function_refs.keys():
raise ValueError(f"function reference {function} not present")
group = stream.options.get("group", "serving")

child_function = self._spec.function_refs[function]
child_function.function_object().add_stream_trigger(
stream.path, group=group, shards=stream.shards
)

def _deploy_function_refs(self):
"""set metadata and deploy child functions"""
for function in self._spec.function_refs.values():
yaronha marked this conversation as resolved.
Show resolved Hide resolved
logger.info(f"deploy child function {function.name} ...")
function_object = function.function_object
function_object.metadata.name = function.fullname(self)
function_object.metadata.project = self.metadata.project
function_object.metadata.tag = self.metadata.tag
function_object.spec.graph = self.spec.graph
# todo: may want to copy parent volumes to child functions
function_object.apply(mlrun.v3io_cred())
function.db_uri = function_object._function_uri()
function_object.verbose = self.verbose
function_object.deploy()

def remove_states(self, keys: list):
"""remove one, multiple, or all states/models from the spec (blank list for all)"""
if self.spec.graph:
self.spec.graph.clear_routes(keys)
self.spec.graph.clear_children(keys)

def deploy(self, dashboard="", project="", tag=""):
def deploy(self, dashboard="", project="", tag="", verbose=False):
"""deploy model serving function to a local/remote cluster

:param dashboard: remote nuclio dashboard url (blank for local or auto detection)
:param project: optional, overide function specified project name
:param tag: specify unique function tag (a different function service is created for every tag)
:param verbose: verbose logging
"""
load_mode = self.spec.load_mode
if load_mode and load_mode not in ["sync", "async"]:
raise ValueError(f"illegal model loading mode {load_mode}")
if not self.spec.graph:
raise ValueError("nothing to deploy, .spec.graph is none, use .add_model()")
return super().deploy(dashboard, project, tag)

if self.spec.graph.kind != StateKinds.router:
# initialize or create required streams/queues
self.spec.graph.check_and_process_graph()
self.spec.graph.init_queues()
if self._spec.function_refs:
# deploy child functions
self._add_ref_triggers()
self._deploy_function_refs()
logger.info(f"deploy root function {self.metadata.name} ...")
return super().deploy(dashboard, project, tag, verbose=verbose)

def _get_runtime_env(self):
# we currently support a minimal topology of one router + multiple child routes/models
# in the future we will extend the support to a full graph, the spec is already built accordingly

functions = {f.name: f.uri(self) for f in self.spec.function_refs}
yaronha marked this conversation as resolved.
Show resolved Hide resolved
serving_spec = {
"function_uri": self._function_uri(),
"version": "v2",
"parameters": self.spec.parameters,
"graph": self.spec.graph.to_dict(),
"load_mode": self.spec.load_mode,
"verbose": self.verbose,
"functions": functions,
"graph_initializer": self.spec.graph_initializer,
"error_stream": self.spec.error_stream,
}
return {"SERVING_SPEC_ENV": json.dumps(serving_spec)}

def to_mock_server(self, namespace=None, log_level="debug"):
def to_mock_server(
self, namespace=None, current_function=None, **kwargs
) -> GraphServer:
"""create mock server object for local testing/emulation

:param namespace: classes search namespace, use globals() for current notebook
:param log_level: log level (error | info | debug)
:param current_function: specify if you want to simulate a child function
"""
return create_mock_server(
server = create_graph_server(
parameters=self.spec.parameters,
load_mode=self.spec.load_mode,
graph=self.spec.graph,
namespace=namespace,
logger=logger,
level=log_level,
verbose=self.verbose,
current_function=current_function,
**kwargs,
)
server.init(None, namespace or get_caller_globals(), logger=logger)
return server