Skip to content

Commit

Permalink
fix model monitor and add function uri (#484)
Browse files Browse the repository at this point in the history
* fix model monitor and add function uri
* remove ser_router() (was stale code, not used)
* fix nuclio spec.resources
* add set_metric to v2 serving
Co-authored-by: yaron haviv <yaronh@iguazio.com>
  • Loading branch information
yaronha committed Oct 22, 2020
1 parent 111b8d6 commit 26ef653
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 13 deletions.
2 changes: 2 additions & 0 deletions mlrun/runtimes/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,8 @@ def get_fullname(config, name, project, tag):
handler = self.spec.function_handler
if self.spec.readiness_timeout:
spec.set_config("spec.readinessTimeoutSeconds", self.spec.readiness_timeout)
if self.spec.resources:
spec.set_config("spec.resources", self.spec.resources)
if self.spec.no_cache:
spec.set_config("spec.build.noCache", True)
if self.spec.replicas:
Expand Down
10 changes: 1 addition & 9 deletions mlrun/runtimes/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,7 @@ def __init__(
service_account=None,
readiness_timeout=None,
models=None,
router=None,
graph=None,
router_args=None,
parameters=None,
default_class=None,
load_mode=None,
Expand Down Expand Up @@ -119,9 +117,7 @@ def __init__(
)

self.models = models or {}
self.router = router
self.graph: ServingRouterState = graph
self.router_args = router_args
self.parameters = parameters or {}
self.default_class = default_class
self.load_mode = load_mode
Expand Down Expand Up @@ -153,11 +149,6 @@ def set_topology(
class_name=class_name, class_args=class_args
)

def set_router(self, router, **router_args):
"""set the routing class and router arguments"""
self.spec.router = router
self.spec.router_args = router_args

def set_tracking(self, stream_path, batch=None, sample=None):
"""set tracking log stream parameters"""
self.spec.parameters["log_stream"] = stream_path
Expand Down Expand Up @@ -236,6 +227,7 @@ def deploy(self, dashboard="", project="", tag=""):
# we currently support a minimal topology of one router + multiple child routes/models
# in the future we will extend the support to a full graph, the spec is already built accordingly
serving_spec = {
"function_uri": self._function_uri(),
"version": "v2",
"parameters": self.spec.parameters,
"graph": self.spec.graph.to_dict(),
Expand Down
14 changes: 11 additions & 3 deletions mlrun/serving/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@


class _StreamContext:
def __init__(self, parameters):
def __init__(self, parameters, function_uri):
self.hostname = socket.gethostname()
self.output_stream = None
self.function_uri = function_uri
out_stream = parameters.get("log_stream", "")
self.stream_sample = int(parameters.get("log_stream_sample", "1"))
self.stream_batch = int(parameters.get("log_stream_batch", "1"))
Expand All @@ -42,10 +43,17 @@ class ModelServerHost(ModelObj):
kind = "server"

def __init__(
self, graph=None, parameters=None, load_mode=None, verbose=False, version=None
self,
graph=None,
function_uri=None,
parameters=None,
load_mode=None,
verbose=False,
version=None,
):
self._graph = None
self.graph: ServingRouterState = graph
self.function_uri = function_uri
self.parameters = parameters or {}
self.verbose = verbose
self.load_mode = load_mode or "sync"
Expand Down Expand Up @@ -73,7 +81,7 @@ def init(self, context, namespace):
self.context = context
# enrich the context with classes and methods which will be used when
# initializing classes or handling the event
setattr(context, "stream", _StreamContext(self.parameters))
setattr(context, "stream", _StreamContext(self.parameters, self.function_uri))
setattr(context, "merge_root_params", self.merge_root_params)
setattr(context, "verbose", self.verbose)

Expand Down
8 changes: 7 additions & 1 deletion mlrun/serving/v2_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ def get_param(self, key: str, default=None):
"""get param by key (specified in the model or the function)"""
return self._params.get(key, default)

def set_metric(self, name: str, value):
"""set real time metric (for model monitoring)"""
self.metrics[name] = value

def get_model(self, suffix=""):
"""get the model file(s) and metadata from model store
Expand Down Expand Up @@ -254,6 +258,7 @@ class _ModelLogPusher:
def __init__(self, model, context, output_stream=None):
self.model = model
self.hostname = context.stream.hostname
self.function_uri = context.stream.function_uri
self.stream_batch = context.stream.stream_batch
self.stream_sample = context.stream.stream_sample
self.output_stream = output_stream or context.stream.output_stream
Expand All @@ -265,10 +270,11 @@ def __init__(self, model, context, output_stream=None):
def base_data(self):
base_data = {
"class": self.model.__class__.__name__,
"worker": self.worker,
"worker": self._worker,
"model": self.model.name,
"version": self.model.version,
"host": self.hostname,
"function_uri": self.function_uri,
}
if getattr(self.model, "labels", None):
base_data["labels"] = self.model.labels
Expand Down

0 comments on commit 26ef653

Please sign in to comment.