Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Model Monitoring] Add endpoint registration to v2 serving + config changes #856

Merged
merged 10 commits into from
Apr 12, 2021
14 changes: 9 additions & 5 deletions mlrun/api/api/endpoints/grafana_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@

from mlrun.api.api import deps
from mlrun.api.crud.model_endpoints import (
ENDPOINT_EVENTS_TABLE_PATH,
EVENTS,
ModelEndpoints,
get_access_key,
parse_store_prefix,
)
from mlrun.api.schemas import (
Format,
Expand Down Expand Up @@ -292,16 +293,19 @@ async def grafana_incoming_features(
)
return time_series

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=project, kind=EVENTS
)
_, container, path = parse_store_prefix(path)

client = get_frames_client(
token=access_key,
address=config.v3io_framesd,
container=config.model_endpoint_monitoring.container,
token=access_key, address=config.v3io_framesd, container=container,
)

data: pd.DataFrame = await run_in_threadpool(
client.read,
backend="tsdb",
table=f"{project}/{ENDPOINT_EVENTS_TABLE_PATH}",
table=path,
columns=feature_names,
filter=f"endpoint_id=='{endpoint_id}'",
start=start,
Expand Down
63 changes: 47 additions & 16 deletions mlrun/api/crud/model_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,19 @@
from mlrun.utils.helpers import logger
from mlrun.utils.v3io_clients import get_frames_client, get_v3io_client

ENDPOINTS_TABLE_PATH = "model-endpoints/endpoints"
ENDPOINT_EVENTS_TABLE_PATH = "model-endpoints/events"
ENDPOINTS = "endpoints"
EVENTS = "events"


class ModelEndpoints:
@staticmethod
async def create_or_patch(access_key: str, model_endpoint: ModelEndpoint):
"""
Creates or updates a KV record with the given model_endpoint record
Creates or patch a KV record with the given model_endpoint record

:param access_key: V3IO access key for managing user permissions
:param model_endpoint: An object representing a model endpoint
"""

if model_endpoint.spec.model_uri or model_endpoint.status.feature_stats:
logger.info(
"Getting feature metadata",
Expand Down Expand Up @@ -116,10 +115,15 @@ async def delete_endpoint_record(access_key: str, project: str, endpoint_id: str
logger.info("Clearing model endpoint table", endpoint_id=endpoint_id)
client = get_v3io_client(endpoint=config.v3io_api)

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=project, kind=ENDPOINTS
)
_, container, path = parse_store_prefix(path)

await run_in_threadpool(
client.kv.delete,
container=config.model_endpoint_monitoring.container,
table_path=f"{project}/{ENDPOINTS_TABLE_PATH}",
container=container,
table_path=path,
key=endpoint_id,
access_key=access_key,
)
Expand Down Expand Up @@ -172,9 +176,15 @@ async def list_endpoints(
)

client = get_v3io_client(endpoint=config.v3io_api)

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=project, kind=ENDPOINTS
)
_, container, path = parse_store_prefix(path)

cursor = client.kv.new_cursor(
container=config.model_endpoint_monitoring.container,
table_path=f"{project}/{ENDPOINTS_TABLE_PATH}",
container=container,
table_path=path,
access_key=access_key,
filter_expression=build_kv_cursor_filter_expression(
project, function, model, labels
Expand Down Expand Up @@ -227,10 +237,16 @@ async def get_endpoint(
)

client = get_v3io_client(endpoint=config.v3io_api)

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=project, kind=ENDPOINTS
)
_, container, path = parse_store_prefix(path)

endpoint = await run_in_threadpool(
client.kv.get,
container=config.model_endpoint_monitoring.container,
table_path=f"{project}/{ENDPOINTS_TABLE_PATH}",
container=container,
table_path=path,
key=endpoint_id,
access_key=access_key,
raise_for_status=RaiseForStatus.never,
Expand Down Expand Up @@ -327,10 +343,16 @@ async def write_endpoint_to_kv(

client = get_v3io_client(endpoint=config.v3io_api)
function = client.kv.update if update else client.kv.put

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=endpoint.metadata.project, kind=ENDPOINTS
)
_, container, path = parse_store_prefix(path)

await run_in_threadpool(
function,
container=config.model_endpoint_monitoring.container,
table_path=f"{endpoint.metadata.project}/{ENDPOINTS_TABLE_PATH}",
container=container,
table_path=path,
key=endpoint.metadata.uid,
access_key=access_key,
attributes={
Expand Down Expand Up @@ -377,16 +399,19 @@ async def get_endpoint_metrics(
if not metrics:
raise MLRunInvalidArgumentError("Metric names must be provided")

path = config.model_endpoint_monitoring.store_prefixes.default.format(
project=project, kind=EVENTS
)
_, container, path = parse_store_prefix(path)

client = get_frames_client(
token=access_key,
address=config.v3io_framesd,
container=config.model_endpoint_monitoring.container,
token=access_key, address=config.v3io_framesd, container=container,
)

data = await run_in_threadpool(
client.read,
backend="tsdb",
table=f"{project}/{ENDPOINT_EVENTS_TABLE_PATH}",
table=path,
columns=["endpoint_id", *metrics],
filter=f"endpoint_id=='{endpoint_id}'",
start=start,
Expand Down Expand Up @@ -456,6 +481,12 @@ def build_kv_cursor_filter_expression(
return " AND ".join(filter_expression)


def parse_store_prefix(store_prefix: str):
scheme, path = store_prefix.split("///", 1)
Michaelliv marked this conversation as resolved.
Show resolved Hide resolved
container, path = path.split("/", 1)
return scheme, container, path


def get_access_key(request_headers: Mapping):
access_key = request_headers.get("X-V3io-Session-Key")
if not access_key:
Expand Down
5 changes: 3 additions & 2 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,9 @@
"v3io_framesd": "",
},
"model_endpoint_monitoring": {
"container": "projects",
"stream_url": "v3io:///projects/{project}/model-endpoints/stream",
"store_prefixes": {
"default": "v3io:///projects/{project}/model-endpoints/{kind}"
}
},
"secret_stores": {
"vault": {
Expand Down
2 changes: 1 addition & 1 deletion mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -1904,7 +1904,7 @@ def create_or_patch(
self.api_call(
method="PUT",
path=path,
body=model_endpoint.dict(),
body=model_endpoint.json(),
headers={"X-V3io-Session-Key": access_key},
)

Expand Down
22 changes: 15 additions & 7 deletions mlrun/serving/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,22 +39,30 @@ class _StreamContext:
def __init__(self, enabled, parameters, function_uri):
self.enabled = False
self.hostname = socket.gethostname()
self.output_stream = None
self.function_uri = function_uri
self.output_stream = None
self.stream_uri = None

log_stream = parameters.get("log_stream", "")
stream_url = config.model_endpoint_monitoring.stream_url
if ((enabled and stream_url) or log_stream) and function_uri:
stream_uri = config.model_endpoint_monitoring.store_prefixes.default

if ((enabled and stream_uri) or log_stream) and function_uri:
self.enabled = True
log_stream = log_stream or stream_url

project, _, _, _ = parse_versioned_object_uri(
function_uri, config.default_project
)

if stream_uri:
Michaelliv marked this conversation as resolved.
Show resolved Hide resolved
stream_uri = stream_uri.format(project=project, kind="stream")
if log_stream:
stream_uri = log_stream.format(project=project)

stream_args = parameters.get("stream_args", {})
self.output_stream = get_stream_pusher(
log_stream.format(project=project), **stream_args
)

self.stream_uri = stream_uri

self.output_stream = get_stream_pusher(stream_uri, **stream_args)


class GraphServer(ModelObj):
Expand Down
44 changes: 43 additions & 1 deletion mlrun/serving/v2_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@
from typing import Dict

import mlrun
from mlrun.utils import now_date
from mlrun.api.schemas import (
ModelEndpoint,
ModelEndpointMetadata,
ModelEndpointSpec,
ModelEndpointStatus,
)
from mlrun.datastore import _DummyStream
from mlrun.utils import now_date, parse_versioned_object_uri


class V2ModelServer:
Expand Down Expand Up @@ -103,6 +110,11 @@ def post_init(self, mode="sync"):
else:
self._load_and_update_state()

if self._model_logger is not None and not isinstance(
self._model_logger.output_stream, _DummyStream
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

better move the condition to the init_endpoint_record func

):
self._model_logger.init_endpoint_record()

def get_param(self, key: str, default=None):
"""get param by key (specified in the model or the function)"""
if key in self._params:
Expand Down Expand Up @@ -291,6 +303,7 @@ def __init__(self, model, context, output_stream=None):
self.verbose = context.verbose
self.hostname = context.stream.hostname
self.function_uri = context.stream.function_uri
self.stream_path = context.stream.stream_uri
self.stream_batch = int(context.get_param("log_stream_batch", 1))
self.stream_sample = int(context.get_param("log_stream_sample", 1))
self.output_stream = output_stream or context.stream.output_stream
Expand All @@ -299,6 +312,35 @@ def __init__(self, model, context, output_stream=None):
self._batch_iter = 0
self._batch = []

def init_endpoint_record(self):
project, uri, tag, hash_key = parse_versioned_object_uri(self.function_uri)
Michaelliv marked this conversation as resolved.
Show resolved Hide resolved

if self.model.version:
model = f"{self.model.name}:{self.model.version}"
else:
model = self.model.name

model_endpoint = ModelEndpoint(
metadata=ModelEndpointMetadata(project=project, labels=self.model.labels),
spec=ModelEndpointSpec(
function_uri=self.function_uri,
model=model,
model_class=self.model.__class__.__name__,
model_uri=self.model.model_path,
stream_path=self.stream_path,
active=True,
),
status=ModelEndpointStatus(),
)

db = mlrun.get_run_db()

db.create_or_patch(
project=project,
endpoint_id=model_endpoint.metadata.uid,
model_endpoint=model_endpoint,
)

def base_data(self):
base_data = {
"class": self.model.__class__.__name__,
Expand Down