diff --git a/backend/src/v2/driver/driver.go b/backend/src/v2/driver/driver.go index 8203ccab5e2..b504a56f471 100644 --- a/backend/src/v2/driver/driver.go +++ b/backend/src/v2/driver/driver.go @@ -572,6 +572,19 @@ func extendPodSpecPatch( podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, k8score.LocalObjectReference{Name: imagePullSecret.GetSecretName()}) } + // Get Kubernetes FieldPath Env information + for _, fieldPathAsEnv := range kubernetesExecutorConfig.GetFieldPathAsEnv() { + fieldPathEnvVar := k8score.EnvVar{ + Name: fieldPathAsEnv.GetName(), + ValueFrom: &k8score.EnvVarSource{ + FieldRef: &k8score.ObjectFieldSelector{ + FieldPath: fieldPathAsEnv.GetFieldPath(), + }, + }, + } + podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, fieldPathEnvVar) + } + return nil } diff --git a/backend/src/v2/driver/driver_test.go b/backend/src/v2/driver/driver_test.go index fdad05d24e8..f4bacddd06e 100644 --- a/backend/src/v2/driver/driver_test.go +++ b/backend/src/v2/driver/driver_test.go @@ -872,3 +872,96 @@ func Test_extendPodSpecPatch_Tolerations(t *testing.T) { }) } } + +func Test_extendPodSpecPatch_FieldPathAsEnv(t *testing.T) { + tests := []struct { + name string + k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig + expected *k8score.PodSpec + }{ + { + "Valid - FieldPathAsEnv", + &kubernetesplatform.KubernetesExecutorConfig{ + FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{ + {Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"}, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + Env: []k8score.EnvVar{ + { + Name: "KFP_RUN_NAME", + ValueFrom: &k8score.EnvVarSource{ + FieldRef: &k8score.ObjectFieldSelector{ + FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']", + }, + }, + }, + }, + }, + }, + }, + }, + { + "Valid - Mix env values", + &kubernetesplatform.KubernetesExecutorConfig{ + SecretAsEnv: []*kubernetesplatform.SecretAsEnv{ + { + SecretName: "my-secret", + KeyToEnv: []*kubernetesplatform.SecretAsEnv_SecretKeyToEnvMap{ + { + SecretKey: "password", + EnvVar: "SECRET_VAR", + }, + }, + }, + }, + FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{ + {Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"}, + }, + }, + &k8score.PodSpec{ + Containers: []k8score.Container{ + { + Name: "main", + Env: []k8score.EnvVar{ + { + Name: "SECRET_VAR", + ValueFrom: &k8score.EnvVarSource{ + SecretKeyRef: &k8score.SecretKeySelector{ + k8score.LocalObjectReference{Name: "my-secret"}, + "password", + nil, + }, + }, + }, + { + Name: "KFP_RUN_NAME", + ValueFrom: &k8score.EnvVarSource{ + FieldRef: &k8score.ObjectFieldSelector{ + FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']", + }, + }, + }, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := &k8score.PodSpec{Containers: []k8score.Container{ + { + Name: "main", + }, + }} + err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil) + assert.Nil(t, err) + assert.NotNil(t, got) + assert.Equal(t, tt.expected, got) + }) + } +} diff --git a/kubernetes_platform/python/README.md b/kubernetes_platform/python/README.md index 9203b937ddd..8333ab9db75 100644 --- a/kubernetes_platform/python/README.md +++ b/kubernetes_platform/python/README.md @@ -166,3 +166,24 @@ def my_pipeline(): annotation_value='123456', ) ``` + +# Kubernetes Field: Use Kubernetes Field Path as enviornment variable +```python +from kfp import dsl +from kfp import kubernetes + + +@dsl.component +def comp(): + pass + + +@dsl.pipeline +def my_pipeline(): + task = comp() + kubernetes.use_field_path_as_env( + task, + env_name='KFP_RUN_NAME', + field_path="metadata.annotations['pipelines.kubeflow.org/run_name']" + ) +``` diff --git a/kubernetes_platform/python/kfp/kubernetes/__init__.py b/kubernetes_platform/python/kfp/kubernetes/__init__.py index 7499c8fc67e..7b8d3ca4129 100644 --- a/kubernetes_platform/python/kfp/kubernetes/__init__.py +++ b/kubernetes_platform/python/kfp/kubernetes/__init__.py @@ -22,6 +22,7 @@ 'CreatePVC', 'DeletePVC', 'mount_pvc', + 'use_field_path_as_env', 'set_image_pull_secrets', 'use_config_map_as_env', 'use_config_map_as_volume', @@ -33,6 +34,7 @@ from kfp.kubernetes.config_map import use_config_map_as_volume from kfp.kubernetes.config_map import use_config_map_as_env from kfp.kubernetes.node_selector import add_node_selector +from kfp.kubernetes.field import use_field_path_as_env from kfp.kubernetes.pod_metadata import add_pod_annotation from kfp.kubernetes.pod_metadata import add_pod_label from kfp.kubernetes.secret import use_secret_as_env diff --git a/kubernetes_platform/python/kfp/kubernetes/field.py b/kubernetes_platform/python/kfp/kubernetes/field.py new file mode 100644 index 00000000000..6c58337bce2 --- /dev/null +++ b/kubernetes_platform/python/kfp/kubernetes/field.py @@ -0,0 +1,46 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.protobuf import json_format +from kfp.dsl import PipelineTask +from kfp.kubernetes import common +from kfp.kubernetes import kubernetes_executor_config_pb2 as pb + + +def use_field_path_as_env( + task: PipelineTask, + env_name: str, + field_path: str, +) -> PipelineTask: + """Use a Kubernetes Field Path as an environment variable as described in + https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information + + Args: + task: Pipeline task. + env_name: Name of the enviornment variable. + field_path: Kubernetes field path to expose as the enviornment variable. + + Returns: + Task object with updated field path as the enviornment variable. + """ + + msg = common.get_existing_kubernetes_config_as_message(task) + field_path_as_env = pb.FieldPathAsEnv( + name=env_name, + field_path=field_path, + ) + msg.field_path_as_env.append(field_path_as_env) + task.platform_config['kubernetes'] = json_format.MessageToDict(msg) + + return task diff --git a/kubernetes_platform/python/test/snapshot/data/field_path_as_env.py b/kubernetes_platform/python/test/snapshot/data/field_path_as_env.py new file mode 100644 index 00000000000..fcdbd72f803 --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/field_path_as_env.py @@ -0,0 +1,36 @@ +# Copyright 2023 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from kfp import dsl +from kfp import kubernetes + + +@dsl.component +def comp(): + pass + + +@dsl.pipeline +def my_pipeline(): + task = comp() + kubernetes.use_field_path_as_env( + task, + env_name='KFP_RUN_NAME', + field_path="metadata.annotations['pipelines.kubeflow.org/run_name']" + ) + + +if __name__ == '__main__': + from kfp import compiler + compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml')) diff --git a/kubernetes_platform/python/test/snapshot/data/field_path_as_env.yaml b/kubernetes_platform/python/test/snapshot/data/field_path_as_env.yaml new file mode 100644 index 00000000000..e2e6fa17584 --- /dev/null +++ b/kubernetes_platform/python/test/snapshot/data/field_path_as_env.yaml @@ -0,0 +1,58 @@ +# PIPELINE DEFINITION +# Name: my-pipeline +components: + comp-comp: + executorLabel: exec-comp +deploymentSpec: + executors: + exec-comp: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - comp + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef comp():\n pass\n\n" + image: python:3.7 +pipelineInfo: + name: my-pipeline +root: + dag: + tasks: + comp: + cachingOptions: + enableCache: true + componentRef: + name: comp-comp + taskInfo: + name: comp +schemaVersion: 2.1.0 +sdkVersion: kfp-2.6.0 +--- +platforms: + kubernetes: + deploymentSpec: + executors: + exec-comp: + fieldPathAsEnv: + - fieldPath: metadata.annotations['pipelines.kubeflow.org/run_name'] + name: KFP_RUN_NAME diff --git a/kubernetes_platform/python/test/unit/test_field.py b/kubernetes_platform/python/test/unit/test_field.py new file mode 100644 index 00000000000..adec5facbd5 --- /dev/null +++ b/kubernetes_platform/python/test/unit/test_field.py @@ -0,0 +1,96 @@ +# Copyright 2024 The Kubeflow Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.protobuf import json_format +from kfp import dsl +from kfp import kubernetes + + +class TestUseFieldPathAsEnv: + + def test_use_one(self): + + @dsl.pipeline + def my_pipeline(): + task = comp() + kubernetes.use_field_path_as_env( + task, + env_name="KFP_RUN_NAME", + field_path="metadata.annotations['pipelines.kubeflow.org/run_name']" + ) + + assert json_format.MessageToDict(my_pipeline.platform_spec) == { + 'platforms': { + 'kubernetes': { + 'deploymentSpec': { + 'executors': { + 'exec-comp': { + 'fieldPathAsEnv': [{ + 'name': + 'KFP_RUN_NAME', + 'fieldPath': + 'metadata.annotations[\'pipelines.kubeflow.org/run_name\']' + }] + } + } + } + } + } + } + + def test_use_two(self): + + @dsl.pipeline + def my_pipeline(): + task = comp() + kubernetes.use_field_path_as_env( + task, + env_name="KFP_RUN_NAME", + field_path="metadata.annotations['pipelines.kubeflow.org/run_name']" + ) + kubernetes.use_field_path_as_env( + task, + env_name="POD_NAME", + field_path="metadata.name" + ) + + assert json_format.MessageToDict(my_pipeline.platform_spec) == { + 'platforms': { + 'kubernetes': { + 'deploymentSpec': { + 'executors': { + 'exec-comp': { + 'fieldPathAsEnv': [{ + 'name': + 'KFP_RUN_NAME', + 'fieldPath': + 'metadata.annotations[\'pipelines.kubeflow.org/run_name\']' + }, + { + 'name': + 'POD_NAME', + 'fieldPath': + 'metadata.name' + }] + } + } + } + } + } + } + + +@dsl.component +def comp(): + pass