Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend + SDK): Add backend and SDK support to use Kubernetes FieldPath as env #10496

Merged
merged 2 commits into from Mar 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 13 additions & 0 deletions backend/src/v2/driver/driver.go
Expand Up @@ -572,6 +572,19 @@ func extendPodSpecPatch(
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, k8score.LocalObjectReference{Name: imagePullSecret.GetSecretName()})
}

// Get Kubernetes FieldPath Env information
for _, fieldPathAsEnv := range kubernetesExecutorConfig.GetFieldPathAsEnv() {
fieldPathEnvVar := k8score.EnvVar{
Name: fieldPathAsEnv.GetName(),
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: fieldPathAsEnv.GetFieldPath(),
},
},
}
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, fieldPathEnvVar)
}

return nil
}

Expand Down
93 changes: 93 additions & 0 deletions backend/src/v2/driver/driver_test.go
Expand Up @@ -872,3 +872,96 @@ func Test_extendPodSpecPatch_Tolerations(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_FieldPathAsEnv(t *testing.T) {
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - FieldPathAsEnv",
&kubernetesplatform.KubernetesExecutorConfig{
FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{
{Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
Env: []k8score.EnvVar{
{
Name: "KFP_RUN_NAME",
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']",
},
},
},
},
},
},
},
},
{
"Valid - Mix env values",
&kubernetesplatform.KubernetesExecutorConfig{
SecretAsEnv: []*kubernetesplatform.SecretAsEnv{
{
SecretName: "my-secret",
KeyToEnv: []*kubernetesplatform.SecretAsEnv_SecretKeyToEnvMap{
{
SecretKey: "password",
EnvVar: "SECRET_VAR",
},
},
},
},
FieldPathAsEnv: []*kubernetesplatform.FieldPathAsEnv{
{Name: "KFP_RUN_NAME", FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']"},
},
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
Env: []k8score.EnvVar{
{
Name: "SECRET_VAR",
ValueFrom: &k8score.EnvVarSource{
SecretKeyRef: &k8score.SecretKeySelector{
k8score.LocalObjectReference{Name: "my-secret"},
"password",
nil,
},
},
},
{
Name: "KFP_RUN_NAME",
ValueFrom: &k8score.EnvVarSource{
FieldRef: &k8score.ObjectFieldSelector{
FieldPath: "metadata.annotations['pipelines.kubeflow.org/run_name']",
},
},
},
},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.NotNil(t, got)
assert.Equal(t, tt.expected, got)
})
}
}
21 changes: 21 additions & 0 deletions kubernetes_platform/python/README.md
Expand Up @@ -166,3 +166,24 @@ def my_pipeline():
annotation_value='123456',
)
```

# Kubernetes Field: Use Kubernetes Field Path as enviornment variable
```python
from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name='KFP_RUN_NAME',
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
```
2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Expand Up @@ -22,6 +22,7 @@
'CreatePVC',
'DeletePVC',
'mount_pvc',
'use_field_path_as_env',
'set_image_pull_secrets',
'use_config_map_as_env',
'use_config_map_as_volume',
Expand All @@ -33,6 +34,7 @@
from kfp.kubernetes.config_map import use_config_map_as_volume
from kfp.kubernetes.config_map import use_config_map_as_env
from kfp.kubernetes.node_selector import add_node_selector
from kfp.kubernetes.field import use_field_path_as_env
from kfp.kubernetes.pod_metadata import add_pod_annotation
from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.secret import use_secret_as_env
Expand Down
46 changes: 46 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/field.py
@@ -0,0 +1,46 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb


def use_field_path_as_env(
task: PipelineTask,
env_name: str,
field_path: str,
) -> PipelineTask:
"""Use a Kubernetes Field Path as an environment variable as described in
https://kubernetes.io/docs/tasks/inject-data-application/environment-variable-expose-pod-information

Args:
task: Pipeline task.
env_name: Name of the enviornment variable.
field_path: Kubernetes field path to expose as the enviornment variable.

Returns:
Task object with updated field path as the enviornment variable.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
field_path_as_env = pb.FieldPathAsEnv(
name=env_name,
field_path=field_path,
)
msg.field_path_as_env.append(field_path_as_env)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
36 changes: 36 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/field_path_as_env.py
@@ -0,0 +1,36 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name='KFP_RUN_NAME',
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)


if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
@@ -0,0 +1,58 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)


printf "%s" "$0" > "$program_path/ephemeral_component.py"

_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
fieldPathAsEnv:
- fieldPath: metadata.annotations['pipelines.kubeflow.org/run_name']
name: KFP_RUN_NAME
96 changes: 96 additions & 0 deletions kubernetes_platform/python/test/unit/test_field.py
@@ -0,0 +1,96 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp import dsl
from kfp import kubernetes


class TestUseFieldPathAsEnv:

def test_use_one(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name="KFP_RUN_NAME",
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'fieldPathAsEnv': [{
'name':
'KFP_RUN_NAME',
'fieldPath':
'metadata.annotations[\'pipelines.kubeflow.org/run_name\']'
}]
}
}
}
}
}
}

def test_use_two(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.use_field_path_as_env(
task,
env_name="KFP_RUN_NAME",
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
kubernetes.use_field_path_as_env(
task,
env_name="POD_NAME",
field_path="metadata.name"
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'fieldPathAsEnv': [{
'name':
'KFP_RUN_NAME',
'fieldPath':
'metadata.annotations[\'pipelines.kubeflow.org/run_name\']'
},
{
'name':
'POD_NAME',
'fieldPath':
'metadata.name'
}]
}
}
}
}
}
}


@dsl.component
def comp():
pass