Skip to content

Commit

Permalink
[Runtimes] Support configuring auto mount type (#1191)
Browse files Browse the repository at this point in the history
  • Loading branch information
theSaarco committed Aug 11, 2021
1 parent d577871 commit 36bc5ce
Show file tree
Hide file tree
Showing 20 changed files with 452 additions and 31 deletions.
5 changes: 5 additions & 0 deletions mlrun/api/api/endpoints/healthz.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ def health():
"default_function_node_selector": _get_config_value_if_not_default(
"default_function_node_selector"
),
"igz_version": _get_config_value_if_not_default("igz_version"),
"auto_mount_type": _get_config_value_if_not_default("storage.auto_mount_type"),
"auto_mount_params": _get_config_value_if_not_default(
"storage.auto_mount_params"
),
}


Expand Down
36 changes: 35 additions & 1 deletion mlrun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"""

import base64
import binascii
import copy
import json
import os
Expand All @@ -34,7 +35,7 @@
import yaml

env_prefix = "MLRUN_"
env_file_key = f"{env_prefix}CONIFG_FILE"
env_file_key = f"{env_prefix}CONFIG_FILE"
_load_lock = Lock()
_none_type = type(None)

Expand Down Expand Up @@ -242,6 +243,16 @@
"channel": "master",
},
},
"storage": {
# What type of auto-mount to use for functions. Can be one of: none, auto, v3io_credentials, v3io_fuse, pvc.
# Default is auto - which is v3io_credentials when running on Iguazio. If not Iguazio: pvc if the
# MLRUN_PVC_MOUNT env is configured or auto_mount_params contain "pvc_name". Otherwise will do nothing (none).
"auto_mount_type": "auto",
# Extra parameters to pass to the mount call (will be passed as kwargs). Parameters can be either:
# 1. A string of comma-separated parameters, using this format: "param1=value1,param2=value2"
# 2. A base-64 encoded json dictionary containing the list of parameters
"auto_mount_params": "",
},
}


Expand Down Expand Up @@ -316,6 +327,29 @@ def get_default_function_node_selector():

return default_function_node_selector

@staticmethod
def get_storage_auto_mount_params():
auto_mount_params = {}
if config.storage.auto_mount_params:
try:
auto_mount_params = base64.b64decode(
config.storage.auto_mount_params, validate=True
).decode()
auto_mount_params = json.loads(auto_mount_params)
except binascii.Error:
# Importing here to avoid circular dependencies
from .utils import list2dict

# String wasn't base64 encoded. Parse it using a 'p1=v1,p2=v2' format.
mount_params = config.storage.auto_mount_params.split(",")
auto_mount_params = list2dict(mount_params)
if not isinstance(auto_mount_params, dict):
raise TypeError(
f"data in storage.auto_mount_params does not resolve to a dictionary: {auto_mount_params}"
)

return auto_mount_params

def to_dict(self):
return copy.copy(self._cfg)

Expand Down
8 changes: 8 additions & 0 deletions mlrun/db/httpdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,14 @@ def connect(self, secrets=None):
server_cfg.get("default_function_node_selector")
or config.default_function_node_selector
)
config.igz_version = server_cfg.get("igz_version") or config.igz_version
config.storage.auto_mount_type = (
server_cfg.get("auto_mount_type") or config.storage.auto_mount_type
)
config.storage.auto_mount_params = (
server_cfg.get("auto_mount_params") or config.storage.auto_mount_params
)

except Exception:
pass
return self
Expand Down
4 changes: 4 additions & 0 deletions mlrun/platforms/iguazio.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,13 +301,17 @@ def _use_v3io_cred(task):
web_api = api or environ.get("V3IO_API") or mlconf.v3io_api
_user = user or environ.get("V3IO_USERNAME")
_access_key = access_key or environ.get("V3IO_ACCESS_KEY")
v3io_framesd = mlconf.v3io_framesd or environ.get("V3IO_FRAMESD")

return (
task.add_env_variable(k8s_client.V1EnvVar(name="V3IO_API", value=web_api))
.add_env_variable(k8s_client.V1EnvVar(name="V3IO_USERNAME", value=_user))
.add_env_variable(
k8s_client.V1EnvVar(name="V3IO_ACCESS_KEY", value=_access_key)
)
.add_env_variable(
k8s_client.V1EnvVar(name="V3IO_FRAMESD", value=v3io_framesd)
)
)

return _use_v3io_cred
Expand Down
28 changes: 18 additions & 10 deletions mlrun/platforms/other.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
# this file is based on the code from kubeflow pipelines git
import os

from mlrun.errors import MLRunInvalidArgumentError

from .iguazio import mount_v3io


def mount_pvc(pvc_name, volume_name="pipeline", volume_mount_path="/mnt/pipeline"):
def mount_pvc(pvc_name=None, volume_name="pipeline", volume_mount_path="/mnt/pipeline"):
"""
Modifier function to apply to a Container Op to simplify volume, volume mount addition and
enable better reuse of volumes, volume claims across container ops.
Expand All @@ -28,6 +30,20 @@ def mount_pvc(pvc_name, volume_name="pipeline", volume_mount_path="/mnt/pipeline
train = train_op(...)
train.apply(mount_pvc('claim-name', 'pipeline', '/mnt/pipeline'))
"""
if "MLRUN_PVC_MOUNT" in os.environ:
mount = os.environ.get("MLRUN_PVC_MOUNT")
items = mount.split(":")
if len(items) != 2:
raise MLRunInvalidArgumentError(
"MLRUN_PVC_MOUNT should include <pvc-name>:<mount-path>"
)
pvc_name = items[0]
volume_mount_path = items[1]

if not pvc_name:
raise MLRunInvalidArgumentError(
"No PVC name: use the pvc_name parameter or configure the MLRUN_PVC_MOUNT environment variable"
)

def _mount_pvc(task):
from kubernetes import client as k8s_client
Expand Down Expand Up @@ -57,15 +73,7 @@ def auto_mount(pvc_name="", volume_mount_path="", volume_name=None):
volume_name=volume_name or "pvc",
)
if "MLRUN_PVC_MOUNT" in os.environ:
mount = os.environ.get("MLRUN_PVC_MOUNT")
items = mount.split(":")
if len(items) != 2:
raise ValueError("MLRUN_PVC_MOUNT should include <pvc-name>:<mount-path>")
return mount_pvc(
pvc_name=items[0],
volume_mount_path=items[1],
volume_name=volume_name or "pvc",
)
return mount_pvc(volume_name=volume_name or "pvc",)
if "V3IO_ACCESS_KEY" in os.environ:
return mount_v3io(name=volume_name or "v3io")
raise ValueError("failed to auto mount, need to set env vars")
Expand Down
2 changes: 2 additions & 0 deletions mlrun/projects/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,8 @@ def _create_pipeline(project, pipeline, funcs, secrets=None):
# verify all functions are in this project
for f in functions.values():
f.metadata.project = project.metadata.name
# After the user had a chance to initialize the functions (in init_functions), attempt to auto-mount them.
f.try_auto_mount_based_on_config()

if not hasattr(mod, "kfpipeline"):
raise ValueError("pipeline function (kfpipeline) not found")
Expand Down
11 changes: 11 additions & 0 deletions mlrun/runtimes/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def __init__(
workdir=None,
default_handler=None,
pythonpath=None,
mount_applied=False,
):

self.command = command or ""
Expand All @@ -108,6 +109,7 @@ def __init__(
self.default_handler = default_handler
# TODO: type verification (FunctionEntrypoint dict)
self.entry_points = entry_points or {}
self.mount_applied = mount_applied

@property
def build(self) -> ImageBuilder:
Expand Down Expand Up @@ -218,6 +220,11 @@ def _get_db(self):
self._db_conn = get_run_db(self.spec.rundb, secrets=self._secrets)
return self._db_conn

# This function is different than the auto_mount function, as it mounts to runtimes based on the configuration.
# That's why it's named differently.
def try_auto_mount_based_on_config(self):
pass

def run(
self,
runspec: RunObject = None,
Expand Down Expand Up @@ -270,6 +277,10 @@ def run(
if self.spec.mode and self.spec.mode not in run_modes:
raise ValueError(f'run mode can only be {",".join(run_modes)}')

# Perform auto-mount if necessary - make sure it only runs on client side (when using remote API)
if self._use_remote_api():
self.try_auto_mount_based_on_config()

if local:

if schedule is not None:
Expand Down
13 changes: 8 additions & 5 deletions mlrun/runtimes/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import json
import typing
from datetime import datetime
from os import environ, getenv
from os import getenv
from time import sleep

import nuclio
Expand All @@ -37,7 +37,7 @@
from ..kfpops import deploy_op
from ..lists import RunList
from ..model import RunObject
from ..platforms.iguazio import mount_v3io, parse_v3io_path, split_path
from ..platforms.iguazio import mount_v3io, parse_v3io_path, split_path, v3io_cred
from ..utils import enrich_image_url, get_in, logger, update_in
from .base import FunctionStatus, RunError
from .constants import NuclioIngressAddTemplatedIngressModes
Expand Down Expand Up @@ -124,6 +124,7 @@ def __init__(
node_name=None,
node_selector=None,
affinity=None,
mount_applied=False,
):

super().__init__(
Expand All @@ -145,6 +146,7 @@ def __init__(
node_name=node_name,
node_selector=node_selector,
affinity=affinity,
mount_applied=mount_applied,
)

self.base_spec = base_spec or ""
Expand Down Expand Up @@ -350,11 +352,10 @@ def with_source_archive(
return self

def with_v3io(self, local="", remote=""):
for key in ["V3IO_FRAMESD", "V3IO_USERNAME", "V3IO_ACCESS_KEY", "V3IO_API"]:
if key in environ:
self.set_env(key, environ[key])
if local and remote:
self.apply(mount_v3io(remote=remote, mount_path=local))
else:
self.apply(v3io_cred())
return self

def with_http(
Expand Down Expand Up @@ -461,6 +462,8 @@ def deploy(

save_record = False
if not dashboard:
# Attempt auto-mounting, before sending to remote build
self.try_auto_mount_based_on_config()
db = self._get_db()
logger.info("Starting remote function deploy")
data = db.remote_builder(self, False)
Expand Down
69 changes: 68 additions & 1 deletion mlrun/runtimes/pod.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import typing
import uuid
from copy import deepcopy
from enum import Enum

from kfp.dsl import ContainerOp
from kubernetes import client
Expand Down Expand Up @@ -58,6 +59,7 @@ def __init__(
node_name=None,
node_selector=None,
affinity=None,
mount_applied=False,
):
super().__init__(
command=command,
Expand All @@ -70,6 +72,7 @@ def __init__(
workdir=workdir,
default_handler=default_handler,
pythonpath=pythonpath,
mount_applied=mount_applied,
)
self._volumes = {}
self._volume_mounts = {}
Expand Down Expand Up @@ -175,6 +178,55 @@ def _set_volume_mount(self, volume_mount):
self._volume_mounts[volume_mount_key] = volume_mount


class AutoMountType(str, Enum):
none = "none"
auto = "auto"
v3io_credentials = "v3io_credentials"
v3io_fuse = "v3io_fuse"
pvc = "pvc"

@classmethod
def _missing_(cls, value):
return AutoMountType.default()

@staticmethod
def default():
return AutoMountType.auto

# Any modifier that configures a mount on a runtime should be included here. These modifiers, if applied to the
# runtime, will suppress the auto-mount functionality.
@classmethod
def all_mount_modifiers(cls):
return [
mlrun.v3io_cred.__name__,
mlrun.mount_v3io.__name__,
mlrun.platforms.other.mount_pvc.__name__,
mlrun.auto_mount.__name__,
]

@staticmethod
def _get_auto_modifier():
# If we're running on Iguazio - use v3io_cred
if mlconf.igz_version != "":
return mlrun.v3io_cred
# Else, either pvc mount if it's configured or do nothing otherwise
pvc_configured = (
"MLRUN_PVC_MOUNT" in os.environ
or "pvc_name" in mlconf.get_storage_auto_mount_params()
)
return mlrun.platforms.other.mount_pvc if pvc_configured else None

def get_modifier(self):

return {
AutoMountType.none: None,
AutoMountType.v3io_credentials: mlrun.v3io_cred,
AutoMountType.v3io_fuse: mlrun.mount_v3io,
AutoMountType.pvc: mlrun.platforms.other.mount_pvc,
AutoMountType.auto: self._get_auto_modifier(),
}[self]


class KubeResource(BaseRuntime):
kind = "job"
_is_nested = True
Expand Down Expand Up @@ -442,6 +494,21 @@ def _add_vault_params_to_spec(self, runobj=None, project=None):
{"name": "MLRUN_SECRET_STORES__VAULT__URL", "value": vault_url}
)

def try_auto_mount_based_on_config(self):
if self.spec.mount_applied:
logger.debug("Mount already applied - not performing auto-mount")
return

auto_mount_type = AutoMountType(mlconf.storage.auto_mount_type)
modifier = auto_mount_type.get_modifier()
if not modifier:
logger.debug("Auto mount disabled due to user selection")
return

mount_params_dict = mlconf.get_storage_auto_mount_params()

self.apply(modifier(**mount_params_dict))


def kube_resource_spec_to_pod_spec(
kube_resource_spec: KubeResourceSpec, container: client.V1Container
Expand Down
2 changes: 2 additions & 0 deletions mlrun/runtimes/serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(
track_models=None,
secret_sources=None,
default_content_type=None,
mount_applied=False,
):

super().__init__(
Expand All @@ -132,6 +133,7 @@ def __init__(
service_account=service_account,
readiness_timeout=readiness_timeout,
build=build,
mount_applied=mount_applied,
)

self.models = models or {}
Expand Down

0 comments on commit 36bc5ce

Please sign in to comment.