Skip to content

Commit

Permalink
feat(sdk): add local execution skeleton #localexecution (#10292)
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy committed Dec 9, 2023
1 parent 5c60d37 commit 5cd708d
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 5 deletions.
79 changes: 74 additions & 5 deletions sdk/python/kfp/dsl/pipeline_task.py
Expand Up @@ -14,6 +14,8 @@
"""Pipeline task class and operations."""

import copy
import enum
import functools
import inspect
import itertools
import re
Expand All @@ -26,12 +28,43 @@
from kfp.dsl import structures
from kfp.dsl import utils
from kfp.dsl.types import type_utils
from kfp.local import task_dispatcher
from kfp.pipeline_spec import pipeline_spec_pb2

TEMPORARILY_BLOCK_LOCAL_EXECUTION = True

_register_task_handler = lambda task: utils.maybe_rename_for_k8s(
task.component_spec.name)


class TaskState(enum.Enum):
FUTURE = 'FUTURE'
FINAL = 'FINAL'


def block_if_final(custom_message: Optional[str] = None):

def actual_decorator(method):
method_name = method.__name__

@functools.wraps(method)
def wrapper(self: 'PipelineTask', *args, **kwargs):
if self.state == TaskState.FINAL:
raise Exception(
custom_message or
f"Task configuration methods are not supported for local execution. Got call to '.{method_name}()'."
)
elif self.state == TaskState.FUTURE:
return method(self, *args, **kwargs)
else:
raise ValueError(
f'Got unknown {TaskState.__name__}: {self.state}.')

return wrapper

return actual_decorator


class PipelineTask:
"""Represents a pipeline task (instantiated component).
Expand Down Expand Up @@ -65,12 +98,12 @@ def my_pipeline():
def __init__(
self,
component_spec: structures.ComponentSpec,
args: Mapping[str, Any],
args: Dict[str, Any],
):
"""Initilizes a PipelineTask instance."""
# import within __init__ to avoid circular import
from kfp.dsl.tasks_group import TasksGroup

self.state = TaskState.FUTURE
self.parent_task_group: Union[None, TasksGroup] = None
args = args or {}

Expand Down Expand Up @@ -148,7 +181,27 @@ def validate_placeholder_types(
if not isinstance(value, pipeline_channel.PipelineChannel)
])

from kfp.dsl import pipeline_context

# TODO: remove feature flag
if not TEMPORARILY_BLOCK_LOCAL_EXECUTION and pipeline_context.Pipeline.get_default_pipeline(
) is None:
self._execute_locally()

def _execute_locally(self) -> None:
"""Execute the pipeline task locally.
Set the task state to FINAL and update the outputs.
"""
self._outputs = task_dispatcher.run_single_component(
pipeline_spec=self.pipeline_spec,
arguments=self.args,
)
self.state = TaskState.FINAL

@property
@block_if_final(
'Platform-specific features are not supported for local execution.')
def platform_spec(self) -> pipeline_spec_pb2.PlatformSpec:
"""PlatformSpec for all tasks in the pipeline as task.
Expand All @@ -173,9 +226,9 @@ def name(self) -> str:
@property
def inputs(
self
) -> List[Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""The list of actual inputs passed to the task."""
) -> Dict[str, Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""The inputs passed to the task."""
return self._inputs

@property
Expand Down Expand Up @@ -208,6 +261,8 @@ def outputs(self) -> Mapping[str, pipeline_channel.PipelineChannel]:
return self._outputs

@property
@block_if_final(
'Task has no dependent tasks since it is executed independently.')
def dependent_tasks(self) -> List[str]:
"""A list of the dependent task names."""
return self._task_spec.dependent_tasks
Expand Down Expand Up @@ -236,6 +291,7 @@ def _extract_container_spec_and_convert_placeholders(
]
return container_spec

@block_if_final()
def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
"""Sets caching options for the task.
Expand Down Expand Up @@ -280,6 +336,7 @@ def _validate_cpu_request_limit(self, cpu: str) -> float:

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)

@block_if_final()
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.
Expand All @@ -304,6 +361,7 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':

return self

@block_if_final()
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
Expand All @@ -328,6 +386,7 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':

return self

@block_if_final()
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
"""Sets accelerator limit (maximum) for the task. Only applies if
accelerator type is also set via .set_accelerator_type().
Expand All @@ -353,6 +412,7 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':

return self

@block_if_final()
def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
"""Sets GPU limit (maximum) for the task. Only applies if accelerator
type is also set via .add_accelerator_type().
Expand Down Expand Up @@ -422,6 +482,7 @@ def _validate_memory_request_limit(self, memory: str) -> float:

return memory

@block_if_final()
def set_memory_request(self, memory: str) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.
Expand All @@ -445,6 +506,7 @@ def set_memory_request(self, memory: str) -> 'PipelineTask':

return self

@block_if_final()
def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
Expand All @@ -468,6 +530,7 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':

return self

@block_if_final()
def set_retry(self,
num_retries: int,
backoff_duration: Optional[str] = None,
Expand All @@ -492,6 +555,7 @@ def set_retry(self,
)
return self

@block_if_final()
def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Expand All @@ -506,6 +570,7 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_type(accelerator)

@block_if_final()
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Expand All @@ -527,6 +592,7 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':

return self

@block_if_final()
def set_display_name(self, name: str) -> 'PipelineTask':
"""Sets display name for the task.
Expand All @@ -539,6 +605,7 @@ def set_display_name(self, name: str) -> 'PipelineTask':
self._task_spec.display_name = name
return self

@block_if_final()
def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
"""Sets environment variable for the task.
Expand All @@ -557,6 +624,7 @@ def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
self.container_spec.env = {name: value}
return self

@block_if_final()
def after(self, *tasks) -> 'PipelineTask':
"""Specifies an explicit dependency on other tasks by requiring this
task be executed after other tasks finish completion.
Expand All @@ -580,6 +648,7 @@ def my_pipeline():
self._task_spec.dependent_tasks.append(task.name)
return self

@block_if_final()
def ignore_upstream_failure(self) -> 'PipelineTask':
"""If called, the pipeline task will run when any specified upstream
tasks complete, even if unsuccessful.
Expand Down
129 changes: 129 additions & 0 deletions sdk/python/kfp/dsl/pipeline_task_test.py
Expand Up @@ -377,5 +377,134 @@ def my_pipeline():
t.platform_spec


class TestTaskInFinalState(unittest.TestCase):
"""Tests PipelineTask in the state FINAL.
Many properties and methods will be blocked.
Also tests that the .output and .outputs behavior behaves as expected when the outputs are values, not placeholders, as will be the case when PipelineTask is in the state FINAL.
"""

def test_output_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
task._outputs = {'Output': 1}
self.assertEqual(task.output, 1)
self.assertEqual(task.outputs['Output'], 1)

def test_outputs_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
task._outputs = {
'int_output':
1,
'str_output':
'foo',
'dataset_output':
dsl.Dataset(
name='dataset_output',
uri='foo/bar/dataset_output',
metadata={'key': 'value'})
}
self.assertEqual(task.outputs['int_output'], 1)
self.assertEqual(task.outputs['str_output'], 'foo')
assert_artifacts_equal(
self,
task.outputs['dataset_output'],
dsl.Dataset(
name='dataset_output',
uri='foo/bar/dataset_output',
metadata={'key': 'value'}),
)

def test_platform_spec_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
Exception,
r'Platform-specific features are not supported for local execution\.'
):
task.platform_spec

def test_name_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
self.assertEqual(task.name, 'component1')

def test_inputs_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
self.assertEqual(task.inputs, {'input1': 'value'})

def test_dependent_tasks_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
Exception,
r'Task has no dependent tasks since it is executed independently\.'
):
task.dependent_tasks

def test_sampling_of_task_configuration_methods(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
Exception,
r"Task configuration methods are not supported for local execution\. Got call to '\.set_caching_options\(\)'\."
):
task.set_caching_options(enable_caching=True)
with self.assertRaisesRegex(
Exception,
r"Task configuration methods are not supported for local execution\. Got call to '\.set_env_variable\(\)'\."
):
task.set_env_variable(name='foo', value='BAR')
with self.assertRaisesRegex(
Exception,
r"Task configuration methods are not supported for local execution\. Got call to '\.ignore_upstream_failure\(\)'\."
):
task.ignore_upstream_failure()


def assert_artifacts_equal(
test_class: unittest.TestCase,
a1: dsl.Artifact,
a2: dsl.Artifact,
) -> None:
test_class.assertEqual(a1.name, a2.name)
test_class.assertEqual(a1.uri, a2.uri)
test_class.assertEqual(a1.metadata, a2.metadata)
test_class.assertEqual(a1.schema_title, a2.schema_title)
test_class.assertEqual(a1.schema_version, a2.schema_version)
test_class.assertIsInstance(a1, type(a2))


if __name__ == '__main__':
unittest.main()
34 changes: 34 additions & 0 deletions sdk/python/kfp/local/task_dispatcher.py
@@ -0,0 +1,34 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code for dispatching a local task execution."""
from typing import Any, Dict

from kfp.pipeline_spec import pipeline_spec_pb2


def run_single_component(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
arguments: Dict[str, Any],
) -> Dict[str, Any]:
"""Runs a single component from its compiled PipelineSpec.
Args:
pipeline_spec: The PipelineSpec of the component to run.
arguments: The runtime arguments.
Returns:
A LocalTask instance.
"""
# TODO: implement and return outputs
return {}

0 comments on commit 5cd708d

Please sign in to comment.