Skip to content

Commit

Permalink
[Runtimes] Adding a modifier and auto-mount option to configure multi…
Browse files Browse the repository at this point in the history
…ple env variables (#2276)
  • Loading branch information
theSaarco committed Aug 29, 2022
1 parent bb0fa8d commit 68f1b4e
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 1 deletion.
2 changes: 1 addition & 1 deletion mlrun/config.py
Expand Up @@ -383,7 +383,7 @@
},
},
"storage": {
# What type of auto-mount to use for functions. Can be one of: none, auto, v3io_credentials, v3io_fuse, pvc, s3.
# What type of auto-mount to use for functions. One of: none, auto, v3io_credentials, v3io_fuse, pvc, s3, env.
# Default is auto - which is v3io_credentials when running on Iguazio. If not Iguazio: pvc if the
# MLRUN_PVC_MOUNT env is configured or auto_mount_params contain "pvc_name". Otherwise will do nothing (none).
"auto_mount_type": "auto",
Expand Down
1 change: 1 addition & 0 deletions mlrun/platforms/__init__.py
Expand Up @@ -33,6 +33,7 @@
mount_pvc,
mount_s3,
mount_secret,
set_env_variables,
)


Expand Down
35 changes: 35 additions & 0 deletions mlrun/platforms/other.py
Expand Up @@ -14,6 +14,9 @@
#
# this file is based on the code from kubeflow pipelines git
import os
from typing import Dict

import kfp.dsl

import mlrun
from mlrun.config import config
Expand Down Expand Up @@ -258,3 +261,35 @@ def _use_s3_cred(container_op):
)

return _use_s3_cred


def set_env_variables(env_vars_dict: Dict[str, str] = None, **kwargs):
"""
Modifier function to apply a set of environment variables to a runtime. Variables may be passed
as either a dictionary of name-value pairs, or as arguments to the function.
See `KubeResource.apply` for more information on modifiers.
Usage::
function.apply(set_env_variables({"ENV1": "value1", "ENV2": "value2"}))
or
function.apply(set_env_variables(ENV1=value1, ENV2=value2))
:param env_vars_dict: dictionary of env. variables
:param kwargs: environment variables passed as args
"""

env_data = env_vars_dict.copy() if env_vars_dict else {}
for key, value in kwargs.items():
env_data[key] = value

def _set_env_variables(container_op: kfp.dsl.ContainerOp):
from kubernetes import client as k8s_client

for _key, _value in env_data.items():
container_op.container.add_env_variable(
k8s_client.V1EnvVar(name=_key, value=_value)
)
return container_op

return _set_env_variables
12 changes: 12 additions & 0 deletions mlrun/runtimes/pod.py
Expand Up @@ -794,6 +794,7 @@ class AutoMountType(str, Enum):
v3io_fuse = "v3io_fuse"
pvc = "pvc"
s3 = "s3"
env = "env"

@classmethod
def _missing_(cls, value):
Expand All @@ -817,6 +818,7 @@ def all_mount_modifiers(cls):
mlrun.platforms.other.mount_pvc.__name__,
mlrun.auto_mount.__name__,
mlrun.platforms.mount_s3.__name__,
mlrun.platforms.set_env_variables.__name__,
]

@classmethod
Expand Down Expand Up @@ -850,6 +852,7 @@ def get_modifier(self):
AutoMountType.pvc: mlrun.platforms.other.mount_pvc,
AutoMountType.auto: self._get_auto_modifier(),
AutoMountType.s3: mlrun.platforms.mount_s3,
AutoMountType.env: mlrun.platforms.set_env_variables,
}[self]


Expand Down Expand Up @@ -892,6 +895,15 @@ def to_dict(self, fields=None, exclude=None, strip=False):
return struct

def apply(self, modify):
"""
Apply a modifier to the runtime which is used to change the runtime's k8s object's spec.
Modifiers can be either KFP modifiers or MLRun modifiers (which are compatible with KFP). All modifiers accept
a `kfp.dsl.ContainerOp` object, apply some changes on its spec and return it so modifiers can be chained
one after the other.
:param modify: a modifier runnable object
:return: the runtime (self) after the modifications
"""

# Kubeflow pipeline have a hook to add the component to the DAG on ContainerOp init
# we remove the hook to suppress kubeflow op registration and return it after the apply()
Expand Down
7 changes: 7 additions & 0 deletions tests/common_fixtures.py
Expand Up @@ -337,6 +337,13 @@ def assert_s3_mount_configured(self, s3_params):
expected_envs["S3_NON_ANONYMOUS"] = "true"
assert expected_envs == env_dict

def assert_env_variables(self, expected_env_dict):
env_list = self._function["spec"]["env"]
env_dict = {item["name"]: item["value"] for item in env_list}

for key, value in expected_env_dict.items():
assert env_dict[key] == value

def verify_authorization(
self,
authorization_verification_input: mlrun.api.schemas.AuthorizationVerificationInput,
Expand Down
30 changes: 30 additions & 0 deletions tests/platforms/test_other.py
Expand Up @@ -114,3 +114,33 @@ def test_mount_s3():
"secretKeyRef": {"key": "AWS_SECRET_ACCESS_KEY", "name": "s"}
},
}


def test_set_env_variables():
env_variables = {
"some_env_1": "some-value",
"SOMETHING": "ELSE",
"and_another": "like_this",
}

function = mlrun.new_function(
"function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job
)
assert function.spec.env == []

# Using a dictionary
function.apply(mlrun.platforms.set_env_variables(env_variables))
env_dict = {var["name"]: var.get("value") for var in function.spec.env}

assert env_dict == env_variables

function = mlrun.new_function(
"function-name", "function-project", kind=mlrun.runtimes.RuntimeKinds.job
)
assert function.spec.env == []

# And using key=value parameters
function.apply(mlrun.platforms.set_env_variables(**env_variables))
env_dict = {var["name"]: var.get("value") for var in function.spec.env}

assert env_dict == env_variables
26 changes: 26 additions & 0 deletions tests/runtimes/test_base.py
Expand Up @@ -197,3 +197,29 @@ def test_auto_mount_s3(self, use_secret, non_anonymous, rundb_mock):
runtime = self._generate_runtime()
self._execute_run(runtime)
rundb_mock.assert_s3_mount_configured(s3_params)

def test_auto_mount_env(self, rundb_mock):
expected_env = {
"VAR1": "value1",
"some_var_2": "some_value2",
"one-more": "one more!!!",
}

mlconf.storage.auto_mount_type = "env"
# Pass key=value pairs to the function
mlconf.storage.auto_mount_params = ",".join(
[f"{key}={value}" for key, value in expected_env.items()]
)
print(f"Auto mount params: {mlconf.storage.auto_mount_params}")
runtime = self._generate_runtime()
self._execute_run(runtime)
rundb_mock.assert_env_variables(expected_env)

# Try with a base64 json dictionary
pvc_params_str = base64.b64encode(json.dumps(expected_env).encode())
mlconf.storage.auto_mount_params = pvc_params_str
print(f"Auto mount params: {mlconf.storage.auto_mount_params}")
runtime = self._generate_runtime()
rundb_mock.reset()
self._execute_run(runtime)
rundb_mock.assert_env_variables(expected_env)

0 comments on commit 68f1b4e

Please sign in to comment.