Skip to content

Commit

Permalink
feat(backend + SDK): Add backend and SDK support to use Kubernetes Fi…
Browse files Browse the repository at this point in the history
…eldPath as env (kubeflow#10496)

Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
  • Loading branch information
Tomcli authored and petethegreat committed Mar 29, 2024
1 parent bb36d7a commit 8412015
Show file tree
Hide file tree
Showing 8 changed files with 365 additions and 0 deletions.
13 changes: 13 additions & 0 deletions backend/src/v2/driver/driver.go
Expand Up @@ -572,6 +572,19 @@ func extendPodSpecPatch(
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, k8score.LocalObjectReference{Name: imagePullSecret.GetSecretName()})
}

// Get Kubernetes FieldPath Env information
for _, fieldPathAsEnv := range kubernetesExecutorConfig.GetFieldPathAsEnv() {
fieldPathEnvVar := k8score.EnvVar{
Name: fieldPathAsEnv.GetName(),
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: fieldPathAsEnv.GetFieldPath(),
},
},
}
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, fieldPathEnvVar)
}

return nil
}

Expand Down
93 changes: 93 additions & 0 deletions backend/src/v2/driver/driver_test.go
Expand Up @@ -872,3 +872,96 @@ func Test_extendPodSpecPatch_Tolerations(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_FieldPathAsEnv(t *testing.T) {
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - FieldPathAsEnv",
&kubernetesplatform.KubernetesExecutorConfig{
FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{
{Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
Env: []k8score.EnvVar{
{
Name: "KFP_RUN_NAME",
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']",
},
},
},
},
},
},
},
},
{
"Valid - Mix env values",
&kubernetesplatform.KubernetesExecutorConfig{
SecretAsEnv: []*kubernetesplatform.SecretAsEnv{
{
SecretName: "my-secret",
KeyToEnv: []*kubernetesplatform.SecretAsEnv_SecretKeyToEnvMap{
{
SecretKey: "password",
EnvVar: "SECRET_VAR",
},
},
},
},
FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{
{Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
Env: []k8score.EnvVar{
{
Name: "SECRET_VAR",
ValueFrom: &k8score.EnvVarSource{
SecretKeyRef: &k8score.SecretKeySelector{
k8score.LocalObjectReference{Name: "my-secret"},
"password",
nil,
},
},
},
{
Name: "KFP_RUN_NAME",
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']",
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.NotNil(t, got)
assert.Equal(t, tt.expected, got)
})
}
}
21 changes: 21 additions & 0 deletions kubernetes_platform/python/README.md
Expand Up @@ -166,3 +166,24 @@ def my_pipeline():
annotation_value='123456',
)
```

# Kubernetes Field: Use Kubernetes Field Path as enviornment variable
```python
from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name='KFP_RUN_NAME',
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
```
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Expand Up @@ -22,6 +22,7 @@
'CreatePVC',
'DeletePVC',
'mount_pvc',
'use_field_path_as_env',
'set_image_pull_secrets',
'use_config_map_as_env',
'use_config_map_as_volume',
Expand All @@ -33,6 +34,7 @@
from kfp.kubernetes.config_map import use_config_map_as_volume
from kfp.kubernetes.config_map import use_config_map_as_env
from kfp.kubernetes.node_selector import add_node_selector
from kfp.kubernetes.field import use_field_path_as_env
from kfp.kubernetes.pod_metadata import add_pod_annotation
from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.secret import use_secret_as_env
Expand Down
46 changes: 46 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/field.py
@@ -0,0 +1,46 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def use_field_path_as_env(
task: PipelineTask,
env_name: str,
field_path: str,
) -> PipelineTask:
"""Use a Kubernetes Field Path as an environment variable as described in
https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information
Args:
task: Pipeline task.
env_name: Name of the enviornment variable.
field_path: Kubernetes field path to expose as the enviornment variable.
Returns:
Task object with updated field path as the enviornment variable.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
field_path_as_env = pb.FieldPathAsEnv(
name=env_name,
field_path=field_path,
)
msg.field_path_as_env.append(field_path_as_env)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
36 changes: 36 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/field_path_as_env.py
@@ -0,0 +1,36 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name='KFP_RUN_NAME',
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)


if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
@@ -0,0 +1,58 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
fieldPathAsEnv:
- fieldPath: metadata.annotations['pipelines.kubeflow.org/run_name']
name: KFP_RUN_NAME
96 changes: 96 additions & 0 deletions kubernetes_platform/python/test/unit/test_field.py
@@ -0,0 +1,96 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp import dsl
from kfp import kubernetes


class TestUseFieldPathAsEnv:

def test_use_one(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name="KFP_RUN_NAME",
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'fieldPathAsEnv': [{
'name':
'KFP_RUN_NAME',
'fieldPath':
'metadata.annotations[\'pipelines.kubeflow.org/run_name\']'
}]
}
}
}
}
}
}

def test_use_two(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name="KFP_RUN_NAME",
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
kubernetes.use_field_path_as_env(
task,
env_name="POD_NAME",
field_path="metadata.name"
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'fieldPathAsEnv': [{
'name':
'KFP_RUN_NAME',
'fieldPath':
'metadata.annotations[\'pipelines.kubeflow.org/run_name\']'
},
{
'name':
'POD_NAME',
'fieldPath':
'metadata.name'
}]
}
}
}
}
}
}


@dsl.component
def comp():
pass

0 comments on commit 8412015

Please sign in to comment.