Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(backend + SDK): Add Backend and SDK support for timeout in pod spec #10481

Merged
merged 3 commits into from Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
6 changes: 6 additions & 0 deletions backend/src/v2/driver/driver.go
Expand Up @@ -585,6 +585,12 @@ func extendPodSpecPatch(
podSpec.Containers[0].Env = append(podSpec.Containers[0].Env, fieldPathEnvVar)
}

// Get container timeout information
timeout := kubernetesExecutorConfig.GetActiveDeadlineSeconds()
if timeout > 0 {
podSpec.ActiveDeadlineSeconds = &timeout
}

return nil
}

Expand Down
62 changes: 62 additions & 0 deletions backend/src/v2/driver/driver_test.go
Expand Up @@ -965,3 +965,65 @@ func Test_extendPodSpecPatch_FieldPathAsEnv(t *testing.T) {
})
}
}

func Test_extendPodSpecPatch_ActiveDeadlineSeconds(t *testing.T) {
var timeoutSeconds int64 = 20
var NegativeTimeoutSeconds int64 = -20
tests := []struct {
name string
k8sExecCfg *kubernetesplatform.KubernetesExecutorConfig
expected *k8score.PodSpec
}{
{
"Valid - With ActiveDeadlineSeconds",
&kubernetesplatform.KubernetesExecutorConfig{
ActiveDeadlineSeconds: timeoutSeconds,
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
ActiveDeadlineSeconds: &timeoutSeconds,
},
},
{
"Valid - Negative input ignored",
&kubernetesplatform.KubernetesExecutorConfig{
ActiveDeadlineSeconds: NegativeTimeoutSeconds,
},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
{
"Valid - No ActiveDeadlineSeconds",
&kubernetesplatform.KubernetesExecutorConfig{},
&k8score.PodSpec{
Containers: []k8score.Container{
{
Name: "main",
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := &k8score.PodSpec{Containers: []k8score.Container{
{
Name: "main",
},
}}
err := extendPodSpecPatch(got, tt.k8sExecCfg, nil, nil)
assert.Nil(t, err)
assert.NotNil(t, got)
assert.Equal(t, tt.expected, got)
})
}
}
16 changes: 16 additions & 0 deletions kubernetes_platform/python/README.md
Expand Up @@ -187,3 +187,19 @@ def my_pipeline():
field_path="metadata.annotations['pipelines.kubeflow.org/run_name']"
)
```

### Timeout: Set timeout in seconds defined as pod spec's activeDeadlineSeconds
```python
from kfp import dsl
from kfp import kubernetes

@dsl.component
def comp():
pass

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(task, 20)
```

2 changes: 2 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/__init__.py
Expand Up @@ -24,6 +24,7 @@
'mount_pvc',
'use_field_path_as_env',
'set_image_pull_secrets',
'set_timeout',
'use_config_map_as_env',
'use_config_map_as_volume',
'use_secret_as_env',
Expand All @@ -39,6 +40,7 @@
from kfp.kubernetes.pod_metadata import add_pod_label
from kfp.kubernetes.secret import use_secret_as_env
from kfp.kubernetes.secret import use_secret_as_volume
from kfp.kubernetes.timeout import set_timeout
from kfp.kubernetes.toleration import add_toleration
from kfp.kubernetes.volume import CreatePVC
from kfp.kubernetes.volume import DeletePVC
Expand Down
47 changes: 47 additions & 0 deletions kubernetes_platform/python/kfp/kubernetes/timeout.py
@@ -0,0 +1,47 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp.dsl import PipelineTask
from kfp.kubernetes import common


def set_timeout(
task: PipelineTask,
seconds: int,
) -> PipelineTask:
"""Add timeout to the task Pod's `active_deadline_seconds
<https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#podspec-v1-core>`_.

Timeout an integer greater than 0, corresponding to the podspec active_deadline_seconds <https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.26/#podspec-v1-core`_ field.
Integer 0 means removing the timeout fields from previous functions.

Args:
task: Pipeline task.
seconds: Value of the active_deadline_seconds.

Returns:
Task object with an updated active_deadline_seconds.
"""

msg = common.get_existing_kubernetes_config_as_message(task)
if seconds >= 0:
msg.active_deadline_seconds = seconds
else:
raise ValueError(
f'Argument for "seconds" must be an integer greater or equals to 0. Got invalid input: {seconds}. '
)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)

return task
32 changes: 32 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/timeout.py
@@ -0,0 +1,32 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from kfp import dsl
from kfp import kubernetes


@dsl.component
def comp():
pass


@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(task, 20)


if __name__ == '__main__':
from kfp import compiler
compiler.Compiler().compile(my_pipeline, __file__.replace('.py', '.yaml'))
56 changes: 56 additions & 0 deletions kubernetes_platform/python/test/snapshot/data/timeout.yaml
@@ -0,0 +1,56 @@
# PIPELINE DEFINITION
# Name: my-pipeline
components:
comp-comp:
executorLabel: exec-comp
deploymentSpec:
executors:
exec-comp:
container:
args:
- --executor_input
- '{{$}}'
- --function_to_execute
- comp
command:
- sh
- -c
- "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\
\ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\
\ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.6.0'\
\ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\
$0\" \"$@\"\n"
- sh
- -ec
- 'program_path=$(mktemp -d)


printf "%s" "$0" > "$program_path/ephemeral_component.py"

_KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@"

'
- "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\
\ *\n\ndef comp():\n pass\n\n"
image: python:3.7
pipelineInfo:
name: my-pipeline
root:
dag:
tasks:
comp:
cachingOptions:
enableCache: true
componentRef:
name: comp-comp
taskInfo:
name: comp
schemaVersion: 2.1.0
sdkVersion: kfp-2.6.0
---
platforms:
kubernetes:
deploymentSpec:
executors:
exec-comp:
activeDeadlineSeconds: '20'
92 changes: 92 additions & 0 deletions kubernetes_platform/python/test/unit/test_timeout.py
@@ -0,0 +1,92 @@
# Copyright 2024 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.protobuf import json_format
from kfp import dsl
from kfp import kubernetes
import pytest


class TestTimeout:

def test_timeout(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=20
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
'activeDeadlineSeconds': '20'
}
}
}
}
}
}

def test_reset_timeout(self):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=20
)
kubernetes.set_timeout(
task,
seconds=0
)

assert json_format.MessageToDict(my_pipeline.platform_spec) == {
'platforms': {
'kubernetes': {
'deploymentSpec': {
'executors': {
'exec-comp': {
}
}
}
}
}
}

def test_bad_value_timeout(self):

with pytest.raises(
ValueError,
match=r'Argument for "seconds" must be an integer greater or equals to 0. Got invalid input: -20.',
):

@dsl.pipeline
def my_pipeline():
task = comp()
kubernetes.set_timeout(
task,
seconds=-20
)


@dsl.component
def comp():
pass