Skip to content

Commit

Permalink
[FrontendSpec] Add internal_labels to frontend_spec (#5542)
Browse files Browse the repository at this point in the history
  • Loading branch information
roei3000b committed May 26, 2024
1 parent c58f85d commit 48485ec
Show file tree
Hide file tree
Showing 66 changed files with 693 additions and 365 deletions.
7 changes: 5 additions & 2 deletions mlrun/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from tabulate import tabulate

import mlrun
import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas
from mlrun.common.helpers import parse_versioned_object_uri

Expand Down Expand Up @@ -256,8 +257,10 @@ def run(
runobj.metadata.labels[k] = v

if workflow:
runobj.metadata.labels["workflow"] = workflow
runobj.metadata.labels["mlrun/runner-pod"] = socket.gethostname()
runobj.metadata.labels[mlrun_constants.MLRunInternalLabels.workflow] = workflow
runobj.metadata.labels[mlrun_constants.MLRunInternalLabels.runner_pod] = (
socket.gethostname()
)

if db:
mlconf.dbpath = db
Expand Down
62 changes: 61 additions & 1 deletion mlrun/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,72 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

IMAGE_NAME_ENRICH_REGISTRY_PREFIX = "." # prefix for image name to enrich with registry
MLRUN_CREATED_LABEL = "mlrun-created"
MLRUN_MODEL_CONF = "model-conf"
MLRUN_SERVING_SPEC_MOUNT_PATH = f"/tmp/mlrun/{MLRUN_MODEL_CONF}"
MLRUN_SERVING_SPEC_FILENAME = "serving_spec.json"
MLRUN_SERVING_SPEC_PATH = (
f"{MLRUN_SERVING_SPEC_MOUNT_PATH}/{MLRUN_SERVING_SPEC_FILENAME}"
)
MYSQL_MEDIUMBLOB_SIZE_BYTES = 16 * 1024 * 1024
MLRUN_LABEL_PREFIX = "mlrun/"
DASK_LABEL_PREFIX = "dask.org/"
NUCLIO_LABEL_PREFIX = "nuclio.io/"


class MLRunInternalLabels:
### dask
dask_cluster_name = f"{DASK_LABEL_PREFIX}cluster-name"
dask_component = f"{DASK_LABEL_PREFIX}component"

### spark
spark_role = "spark-role"

### mpi
mpi_job_name = "mpi-job-name"
mpi_job_role = "mpi-job-role"
mpi_role_type = "mpi_role_type"

### nuclio
nuclio_project_name = f"{NUCLIO_LABEL_PREFIX}project-name"
nuclio_class = f"{NUCLIO_LABEL_PREFIX}class"

### mlrun
mlrun_auth_key = "mlrun-auth-key"
mlrun_class = f"{MLRUN_LABEL_PREFIX}class"
client_python_version = f"{MLRUN_LABEL_PREFIX}client_python_version"
client_version = f"{MLRUN_LABEL_PREFIX}client_version"
function = f"{MLRUN_LABEL_PREFIX}function"
job = f"{MLRUN_LABEL_PREFIX}job"
name = f"{MLRUN_LABEL_PREFIX}name"
mlrun_owner = f"{MLRUN_LABEL_PREFIX}owner"
owner_domain = f"{MLRUN_LABEL_PREFIX}owner_domain"
project = f"{MLRUN_LABEL_PREFIX}project"
runner_pod = f"{MLRUN_LABEL_PREFIX}runner-pod"
schedule_name = f"{MLRUN_LABEL_PREFIX}schedule-name"
scrape_metrics = f"{MLRUN_LABEL_PREFIX}scrape-metrics"
tag = f"{MLRUN_LABEL_PREFIX}tag"
uid = f"{MLRUN_LABEL_PREFIX}uid"
username = f"{MLRUN_LABEL_PREFIX}username"
username_domain = f"{MLRUN_LABEL_PREFIX}username_domain"
task_name = f"{MLRUN_LABEL_PREFIX}task-name"
host = "host"
job_type = "job-type"
kind = "kind"
component = "component"
resource_name = "resource_name"
created = "mlrun-created"

owner = "owner"
v3io_user = "v3io_user"
workflow = "workflow"
feature_vector = "feature-vector"

@classmethod
def all(cls):
return [
value
for key, value in cls.__dict__.items()
if not key.startswith("__") and isinstance(value, str)
]
11 changes: 7 additions & 4 deletions mlrun/common/runtimes/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import enum
import typing

import mlrun.common.constants as mlrun_constants


class PodPhases:
"""
Expand Down Expand Up @@ -122,8 +124,8 @@ def default():
@staticmethod
def role_label_by_version(version):
return {
MPIJobCRDVersions.v1alpha1: "mpi_role_type",
MPIJobCRDVersions.v1: "mpi-job-role",
MPIJobCRDVersions.v1alpha1: mlrun_constants.MLRunInternalLabels.mpi_role_type,
MPIJobCRDVersions.v1: mlrun_constants.MLRunInternalLabels.mpi_job_role,
}[version]


Expand Down Expand Up @@ -192,9 +194,10 @@ def not_allowed_for_deletion_states():
]


# TODO: remove this class in 1.9.0 - use only MlrunInternalLabels
class RunLabels(enum.Enum):
owner = "owner"
v3io_user = "v3io_user"
owner = mlrun_constants.MLRunInternalLabels.owner
v3io_user = mlrun_constants.MLRunInternalLabels.v3io_user

@staticmethod
def all():
Expand Down
1 change: 1 addition & 0 deletions mlrun/common/schemas/frontend_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,3 +70,4 @@ class FrontendSpec(pydantic.BaseModel):
feature_store_data_prefixes: typing.Optional[dict[str, str]]
allowed_artifact_path_prefixes_list: list[str]
ce: typing.Optional[dict]
internal_labels: list[str] = []
5 changes: 5 additions & 0 deletions mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import semver
import yaml

import mlrun.common.constants
import mlrun.common.schemas
import mlrun.errors

Expand Down Expand Up @@ -967,6 +968,10 @@ def resolve_chief_api_url(self) -> str:
self.httpdb.clusterization.chief.url = chief_api_url
return self.httpdb.clusterization.chief.url

@staticmethod
def internal_labels():
return mlrun.common.constants.MLRunInternalLabels.all()

@staticmethod
def get_storage_auto_mount_params():
auto_mount_params = {}
Expand Down
22 changes: 16 additions & 6 deletions mlrun/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from dateutil import parser

import mlrun
import mlrun.common.constants as mlrun_constants
from mlrun.artifacts import ModelArtifact
from mlrun.datastore.store_resources import get_store_resource
from mlrun.errors import MLRunInvalidArgumentError
Expand Down Expand Up @@ -129,7 +130,9 @@ def uid(self):
@property
def tag(self):
"""Run tag (uid or workflow id if exists)"""
return self._labels.get("workflow") or self._uid
return (
self._labels.get(mlrun_constants.MLRunInternalLabels.workflow) or self._uid
)

@property
def state(self):
Expand Down Expand Up @@ -329,8 +332,10 @@ def get_meta(self) -> dict:
"uri": uri,
"owner": get_in(self._labels, "owner"),
}
if "workflow" in self._labels:
resp["workflow"] = self._labels["workflow"]
if mlrun_constants.MLRunInternalLabels.workflow in self._labels:
resp[mlrun_constants.MLRunInternalLabels.workflow] = self._labels[
mlrun_constants.MLRunInternalLabels.workflow
]
return resp

@classmethod
Expand Down Expand Up @@ -396,7 +401,7 @@ def from_dict(
self._set_input(k, v)

if host and not is_api:
self.set_label("host", host)
self.set_label(mlrun_constants.MLRunInternalLabels.host, host)

start = get_in(attrs, "status.start_time")
if start:
Expand Down Expand Up @@ -990,10 +995,15 @@ def is_logging_worker(self):
# If it's a OpenMPI job, get the global rank and compare to the logging rank (worker) set in MLRun's
# configuration:
labels = self.labels
if "host" in labels and labels.get("kind", "job") == "mpijob":
if (
mlrun_constants.MLRunInternalLabels.host in labels
and labels.get(mlrun_constants.MLRunInternalLabels.kind, "job") == "mpijob"
):
# The host (pod name) of each worker is created by k8s, and by default it uses the rank number as the id in
# the following template: ...-worker-<rank>
rank = int(labels["host"].rsplit("-", 1)[1])
rank = int(
labels[mlrun_constants.MLRunInternalLabels.host].rsplit("-", 1)[1]
)
return rank == mlrun.mlconf.packagers.logging_worker

# Single worker is always the logging worker:
Expand Down
13 changes: 7 additions & 6 deletions mlrun/feature_store/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pandas as pd

import mlrun
import mlrun.common.constants as mlrun_constants
from mlrun.datastore.sources import get_source_from_dict, get_source_step
from mlrun.datastore.targets import (
add_target_steps,
Expand Down Expand Up @@ -263,13 +264,13 @@ def run_ingestion_job(name, featureset, run_config, schedule=None, spark_service
out_path=featureset.spec.output_path,
)
task.spec.secret_sources = run_config.secret_sources
task.set_label("job-type", "feature-ingest").set_label(
"feature-set", featureset.uri
)
task.set_label(
mlrun_constants.MLRunInternalLabels.job_type, "feature-ingest"
).set_label("feature-set", featureset.uri)
if run_config.owner:
task.set_label("owner", run_config.owner).set_label(
"v3io_user", run_config.owner
)
task.set_label(
mlrun_constants.MLRunInternalLabels.owner, run_config.owner
).set_label(mlrun_constants.MLRunInternalLabels.v3io_user, run_config.owner)

# set run UID and save in the feature set status (linking the features et to the job)
task.metadata.uid = uuid.uuid4().hex
Expand Down
5 changes: 4 additions & 1 deletion mlrun/feature_store/retrieval/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import uuid

import mlrun
import mlrun.common.constants as mlrun_constants
from mlrun.config import config as mlconf
from mlrun.model import DataTargetBase, new_task
from mlrun.runtimes.function_reference import FunctionReference
Expand Down Expand Up @@ -122,7 +123,9 @@ def set_default_resources(resources, setter_function):
inputs={"entity_rows": entity_rows} if entity_rows is not None else {},
)
task.spec.secret_sources = run_config.secret_sources
task.set_label("job-type", "feature-merge").set_label("feature-vector", vector.uri)
task.set_label(
mlrun_constants.MLRunInternalLabels.job_type, "feature-merge"
).set_label(mlrun_constants.MLRunInternalLabels.feature_vector, vector.uri)
task.metadata.uid = uuid.uuid4().hex
vector.status.run_uri = task.metadata.uid
vector.save()
Expand Down
5 changes: 4 additions & 1 deletion mlrun/frameworks/tf_keras/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from tensorflow import keras

import mlrun
import mlrun.common.constants as mlrun_constants

from .callbacks import MLRunLoggingCallback, TensorboardLoggingCallback
from .mlrun_interface import TFKerasMLRunInterface
Expand Down Expand Up @@ -126,7 +127,9 @@ def apply_mlrun(
# # Use horovod:
if use_horovod is None:
use_horovod = (
context.labels.get("kind", "") == "mpijob" if context is not None else False
context.labels.get(mlrun_constants.MLRunInternalLabels.kind, "") == "mpijob"
if context is not None
else False
)

# Create a model handler:
Expand Down
6 changes: 4 additions & 2 deletions mlrun/launcher/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import IPython

import mlrun.common.constants as mlrun_constants
import mlrun.errors
import mlrun.launcher.base as launcher
import mlrun.lists
Expand Down Expand Up @@ -69,13 +70,14 @@ def prepare_image_for_deploy(runtime: "mlrun.runtimes.BaseRuntime"):
def _store_function(
runtime: "mlrun.runtimes.BaseRuntime", run: "mlrun.run.RunObject"
):
run.metadata.labels["kind"] = runtime.kind
run.metadata.labels[mlrun_constants.MLRunInternalLabels.kind] = runtime.kind
mlrun.runtimes.utils.enrich_run_labels(
run.metadata.labels, [mlrun.common.runtimes.constants.RunLabels.owner]
)
if run.spec.output_path:
run.spec.output_path = run.spec.output_path.replace(
"{{run.user}}", run.metadata.labels["owner"]
"{{run.user}}",
run.metadata.labels[mlrun_constants.MLRunInternalLabels.owner],
)
db = runtime._get_db()
if db and runtime.kind != "handler":
Expand Down
10 changes: 8 additions & 2 deletions mlrun/launcher/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import pathlib
from typing import Callable, Optional, Union

import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas.schedule
import mlrun.errors
import mlrun.launcher.client as launcher
Expand Down Expand Up @@ -132,8 +133,13 @@ def _execute(
runtime: "mlrun.runtimes.BaseRuntime",
run: Optional[Union["mlrun.run.RunTemplate", "mlrun.run.RunObject"]] = None,
):
if "V3IO_USERNAME" in os.environ and "v3io_user" not in run.metadata.labels:
run.metadata.labels["v3io_user"] = os.environ.get("V3IO_USERNAME")
if (
"V3IO_USERNAME" in os.environ
and mlrun_constants.MLRunInternalLabels.v3io_user not in run.metadata.labels
):
run.metadata.labels[mlrun_constants.MLRunInternalLabels.v3io_user] = (
os.environ.get("V3IO_USERNAME")
)

# store function object in db unless running from within a run pod
if not runtime.is_child:
Expand Down
10 changes: 8 additions & 2 deletions mlrun/launcher/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import pandas as pd
import requests

import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas.schedule
import mlrun.db
import mlrun.errors
Expand Down Expand Up @@ -100,8 +101,13 @@ def launch(
if runtime.verbose:
logger.info(f"runspec:\n{run.to_yaml()}")

if "V3IO_USERNAME" in os.environ and "v3io_user" not in run.metadata.labels:
run.metadata.labels["v3io_user"] = os.environ.get("V3IO_USERNAME")
if (
"V3IO_USERNAME" in os.environ
and mlrun_constants.MLRunInternalLabels.v3io_user not in run.metadata.labels
):
run.metadata.labels[mlrun_constants.MLRunInternalLabels.v3io_user] = (
os.environ.get("V3IO_USERNAME")
)

logger.info(
"Storing function",
Expand Down
6 changes: 5 additions & 1 deletion mlrun/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import pydantic.error_wrappers

import mlrun
import mlrun.common.constants as mlrun_constants
import mlrun.common.schemas.notification

from .utils import (
Expand Down Expand Up @@ -770,7 +771,10 @@ def iteration(self, iteration):
def is_workflow_runner(self):
if not self.labels:
return False
return self.labels.get("job-type", "") == "workflow-runner"
return (
self.labels.get(mlrun_constants.MLRunInternalLabels.job_type, "")
== "workflow-runner"
)


class HyperParamStrategies:
Expand Down
5 changes: 4 additions & 1 deletion mlrun/projects/operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from mlrun_pipelines.models import PipelineNodeWrapper

import mlrun
import mlrun.common.constants as mlrun_constants
from mlrun.utils import hub_prefix

from .pipelines import enrich_function_object, pipeline_context
Expand Down Expand Up @@ -190,7 +191,9 @@ def my_pipe(url=""):
local = pipeline_context.is_run_local(local)
task.metadata.labels = task.metadata.labels or labels or {}
if pipeline_context.workflow_id:
task.metadata.labels["workflow"] = pipeline_context.workflow_id
task.metadata.labels[mlrun_constants.MLRunInternalLabels.workflow] = (
pipeline_context.workflow_id
)
if function.kind == "local":
command, function = mlrun.run.load_func_code(function)
function.spec.command = command
Expand Down
Loading

0 comments on commit 48485ec

Please sign in to comment.