Skip to content

Commit

Permalink
Add backend and sdk support for pod spec timeout
Browse files Browse the repository at this point in the history
Signed-off-by: Tommy Li <Tommy.chaoping.li@ibm.com>
  • Loading branch information
Tomcli committed Feb 16, 2024
1 parent 1fcc681 commit 8452755
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 7 deletions.
6 changes: 6 additions & 0 deletions backend/src/v2/driver/driver.go
Expand Up @@ -517,6 +517,12 @@ func extendPodSpecPatch(
podSpec.ImagePullSecrets = append(podSpec.ImagePullSecrets, k8score.LocalObjectReference{Name: imagePullSecret.GetSecretName()})
}

// Get container timeout information
timeout := kubernetesExecutorConfig.GetActiveDeadlineSeconds()
if timeout > 0 {
podSpec.ActiveDeadlineSeconds = &timeout
}

return nil
}

Expand Down
62 changes: 62 additions & 0 deletions backend/src/v2/driver/driver_test.go
Expand Up @@ -671,3 +671,65 @@ func Test_extendPodSpecPatch_ImagePullSecrets(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_ActiveDeadlineSeconds(t *testing.T) {
var timeoutSeconds int64 = 20
var NegativeTimeoutSeconds int64 = -20
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - With ActiveDeadlineSeconds",
&kubernetesplatform.KubernetesExecutorConfig{
ActiveDeadlineSeconds: timeoutSeconds,
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
ActiveDeadlineSeconds: &timeoutSeconds,
},
},
{
"Valid - Negative input ignored",
&kubernetesplatform.KubernetesExecutorConfig{
ActiveDeadlineSeconds: NegativeTimeoutSeconds,
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
{
"Valid - No ActiveDeadlineSeconds",
&kubernetesplatform.KubernetesExecutorConfig{},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.NotNil(t, got)
assert.Equal(t, tt.expected, got)
})
}
}
2 changes: 1 addition & 1 deletion backend/third_party_licenses/apiserver.csv
Expand Up @@ -61,7 +61,7 @@ github.com/klauspost/cpuid,https://github.com/klauspost/cpuid/blob/v1.3.1/LICENS
github.com/klauspost/pgzip,https://github.com/klauspost/pgzip/blob/v1.2.5/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/758c91f76784/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/f51dc39614e4/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/1fcc68121cd0/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/lann/builder,https://github.com/lann/builder/blob/47ae307949d0/LICENSE,MIT
github.com/lann/ps,https://github.com/lann/ps/blob/62de8c46ede0/LICENSE,MIT
Expand Down
2 changes: 1 addition & 1 deletion backend/third_party_licenses/driver.csv
Expand Up @@ -31,7 +31,7 @@ github.com/josharian/intern,https://github.com/josharian/intern/blob/v1.0.0/lice
github.com/json-iterator/go,https://github.com/json-iterator/go/blob/v1.1.12/LICENSE,MIT
github.com/kubeflow/pipelines/api/v2alpha1/go,https://github.com/kubeflow/pipelines/blob/758c91f76784/api/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/backend,https://github.com/kubeflow/pipelines/blob/HEAD/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/f51dc39614e4/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/kubernetes_platform/go/kubernetesplatform,https://github.com/kubeflow/pipelines/blob/1fcc68121cd0/kubernetes_platform/LICENSE,Apache-2.0
github.com/kubeflow/pipelines/third_party/ml-metadata/go/ml_metadata,https://github.com/kubeflow/pipelines/blob/e1f0c010f800/third_party/ml-metadata/LICENSE,Apache-2.0
github.com/mailru/easyjson,https://github.com/mailru/easyjson/blob/v0.7.7/LICENSE,MIT
github.com/modern-go/concurrent,https://github.com/modern-go/concurrent/blob/bacd9c7ef1dd/LICENSE,Apache-2.0
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -31,7 +31,7 @@ require (
github.com/jinzhu/inflection v1.0.0 // indirect
github.com/jinzhu/now v1.1.4 // indirect
github.com/kubeflow/pipelines/api v0.0.0-20230331215358-758c91f76784
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240207171236-f51dc39614e4
github.com/kubeflow/pipelines/kubernetes_platform v0.0.0-20240213001904-1fcc68121cd0
github.com/kubeflow/pipelines/third_party/ml-metadata v0.0.0-20230810215105-e1f0c010f800
github.com/lestrrat-go/strftime v1.0.4
github.com/mattn/go-sqlite3 v1.14.16
Expand Down
4 changes: 2 additions & 2 deletions go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 16 additions & 1 deletion kubernetes_platform/python/README.md
Expand Up @@ -127,4 +127,19 @@ def my_pipeline():
annotation_key='run_id',
annotation_value='123456',
)
```
```

### Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds
```python
from kfp import dsl
from kfp import kubernetes

@dsl.component
def comp():
pass

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(task, 20)
```
4 changes: 3 additions & 1 deletion kubernetes_platform/python/kfp/kubernetes/__init__.py
Expand Up @@ -23,7 +23,8 @@
'add_node_selector',
'add_pod_label',
'add_pod_annotation',
'set_image_pull_secrets'
'set_image_pull_secrets',
'set_timeout'
]

from kfp.kubernetes.pod_metadata import add_pod_label
Expand All @@ -35,3 +36,4 @@
from kfp.kubernetes.volume import DeletePVC
from kfp.kubernetes.volume import mount_pvc
from kfp.kubernetes.image import set_image_pull_secrets
from kfp.kubernetes.timeout import set_timeout
47 changes: 47 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/timeout.py
@@ -0,0 +1,47 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common


def set_timeout(
task: PipelineTask,
seconds: int,
) -> PipelineTask:
"""Add timeout to the task Pod's `active_deadline_seconds
<https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#podspec-v1-core>`_.
Timeout an integer greater than 0, corresponding to the podspec active_deadline_seconds <https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#podspec-v1-core`_ field.
Integer 0 means removing the timeout fields from previous functions.
Args:
task: Pipeline task.
seconds: Value of the active_deadline_seconds.
Returns:
Task object with an updated active_deadline_seconds.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
if seconds >= 0:
msg.active_deadline_seconds = seconds
else:
raise ValueError(
f'Argument for "seconds" must be an integer greater or equals to 0. Got invalid input: {seconds}. '
)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
32 changes: 32 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/timeout.py
@@ -0,0 +1,32 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(task, 20)


if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
56 changes: 56 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/timeout.yaml
@@ -0,0 +1,56 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)
printf "%s" "$0" > "$program_path/ephemeral_component.py"
_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"
'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
activeDeadlineSeconds: '20'
92 changes: 92 additions & 0 deletions kubernetes_platform/python/test/unit/test_timeout.py
@@ -0,0 +1,92 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp import dsl
from kfp import kubernetes
import pytest


class TestTimeout:

def test_timeout(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=20
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'activeDeadlineSeconds': '20'
}
}
}
}
}
}

def test_reset_timeout(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=20
)
kubernetes.set_timeout(
task,
seconds=0
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
}
}
}
}
}
}

def test_bad_value_timeout(self):

with pytest.raises(
ValueError,
match=r'Argument for "seconds" must be an integer greater or equals to 0. Got invalid input: -20.',
):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=-20
)


@dsl.component
def comp():
pass

0 comments on commit 8452755

Please sign in to comment.