Skip to content

Commit

Permalink
feat(sdk): add local execution skeleton #localexecution
Browse files Browse the repository at this point in the history
  • Loading branch information
connor-mccarthy committed Dec 7, 2023
1 parent 0d7913c commit 0552bae
Show file tree
Hide file tree
Showing 3 changed files with 230 additions and 5 deletions.
78 changes: 73 additions & 5 deletions sdk/python/kfp/dsl/pipeline_task.py
Expand Up @@ -14,6 +14,8 @@
"""Pipeline task class and operations."""

import copy
import enum
import functools
import inspect
import itertools
import re
Expand All @@ -26,12 +28,43 @@
from kfp.dsl import structures
from kfp.dsl import utils
from kfp.dsl.types import type_utils
from kfp.local import task_dispatcher
from kfp.pipeline_spec import pipeline_spec_pb2

TEMPORARILY_BLOCK_LOCAL_EXECUTION = True

_register_task_handler = lambda task: utils.maybe_rename_for_k8s(
task.component_spec.name)


class TaskState(enum.Enum):
FUTURE = 'FUTURE'
FINAL = 'FINAL'


def block_if_final(custom_message: Optional[str] = None):

def actual_decorator(method):
method_name = method.__name__

@functools.wraps(method)
def wrapper(self: 'PipelineTask', *args, **kwargs):
if self.state == TaskState.FINAL:
raise NotImplementedError(
custom_message or
f"Task configuration methods are not supported for local execution. Got call to '.{method_name}()'."
)
elif self.state == TaskState.FUTURE:
return method(self, *args, **kwargs)
else:
raise ValueError(
f'Got unknown {TaskState.__name__}: {self.state}.')

return wrapper

return actual_decorator


class PipelineTask:
"""Represents a pipeline task (instantiated component).
Expand Down Expand Up @@ -65,12 +98,12 @@ def my_pipeline():
def __init__(
self,
component_spec: structures.ComponentSpec,
args: Mapping[str, Any],
args: Dict[str, Any],
):
"""Initilizes a PipelineTask instance."""
# import within __init__ to avoid circular import
from kfp.dsl.tasks_group import TasksGroup

self.state = TaskState.FUTURE
self.parent_task_group: Union[None, TasksGroup] = None
args = args or {}

Expand Down Expand Up @@ -148,7 +181,26 @@ def validate_placeholder_types(
if not isinstance(value, pipeline_channel.PipelineChannel)
])

# TODO: remove feature flag
from kfp.dsl import pipeline_context
if not TEMPORARILY_BLOCK_LOCAL_EXECUTION and pipeline_context.Pipeline.get_default_pipeline(
) is None:
self._execute_locally()

def _execute_locally(self) -> None:
"""Execute the pipeline task locally.
Set the task state to FINAL and update the outputs
"""
self._outputs = task_dispatcher.run_single_component(
pipeline_spec=self.pipeline_spec,
arguments=self.args,
)
self.state = TaskState.FINAL

@property
@block_if_final(
'Platform-specific features are not supported for local execution.')
def platform_spec(self) -> pipeline_spec_pb2.PlatformSpec:
"""PlatformSpec for all tasks in the pipeline as task.
Expand All @@ -173,9 +225,9 @@ def name(self) -> str:
@property
def inputs(
self
) -> List[Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""The list of actual inputs passed to the task."""
) -> Dict[str, Union[type_utils.PARAMETER_TYPES,
pipeline_channel.PipelineChannel]]:
"""The inputs passed to the task."""
return self._inputs

@property
Expand Down Expand Up @@ -208,6 +260,8 @@ def outputs(self) -> Mapping[str, pipeline_channel.PipelineChannel]:
return self._outputs

@property
@block_if_final(
'Task has no dependent tasks since it is executed independently.')
def dependent_tasks(self) -> List[str]:
"""A list of the dependent task names."""
return self._task_spec.dependent_tasks
Expand Down Expand Up @@ -236,6 +290,7 @@ def _extract_container_spec_and_convert_placeholders(
]
return container_spec

@block_if_final()
def set_caching_options(self, enable_caching: bool) -> 'PipelineTask':
"""Sets caching options for the task.
Expand Down Expand Up @@ -280,6 +335,7 @@ def _validate_cpu_request_limit(self, cpu: str) -> float:

return float(cpu[:-1]) / 1000 if cpu.endswith('m') else float(cpu)

@block_if_final()
def set_cpu_request(self, cpu: str) -> 'PipelineTask':
"""Sets CPU request (minimum) for the task.
Expand All @@ -304,6 +360,7 @@ def set_cpu_request(self, cpu: str) -> 'PipelineTask':

return self

@block_if_final()
def set_cpu_limit(self, cpu: str) -> 'PipelineTask':
"""Sets CPU limit (maximum) for the task.
Expand All @@ -328,6 +385,7 @@ def set_cpu_limit(self, cpu: str) -> 'PipelineTask':

return self

@block_if_final()
def set_accelerator_limit(self, limit: int) -> 'PipelineTask':
"""Sets accelerator limit (maximum) for the task. Only applies if
accelerator type is also set via .set_accelerator_type().
Expand All @@ -353,6 +411,7 @@ def set_accelerator_limit(self, limit: int) -> 'PipelineTask':

return self

@block_if_final()
def set_gpu_limit(self, gpu: str) -> 'PipelineTask':
"""Sets GPU limit (maximum) for the task. Only applies if accelerator
type is also set via .add_accelerator_type().
Expand Down Expand Up @@ -422,6 +481,7 @@ def _validate_memory_request_limit(self, memory: str) -> float:

return memory

@block_if_final()
def set_memory_request(self, memory: str) -> 'PipelineTask':
"""Sets memory request (minimum) for the task.
Expand All @@ -445,6 +505,7 @@ def set_memory_request(self, memory: str) -> 'PipelineTask':

return self

@block_if_final()
def set_memory_limit(self, memory: str) -> 'PipelineTask':
"""Sets memory limit (maximum) for the task.
Expand All @@ -468,6 +529,7 @@ def set_memory_limit(self, memory: str) -> 'PipelineTask':

return self

@block_if_final()
def set_retry(self,
num_retries: int,
backoff_duration: Optional[str] = None,
Expand All @@ -492,6 +554,7 @@ def set_retry(self,
)
return self

@block_if_final()
def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Expand All @@ -506,6 +569,7 @@ def add_node_selector_constraint(self, accelerator: str) -> 'PipelineTask':
category=DeprecationWarning)
return self.set_accelerator_type(accelerator)

@block_if_final()
def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':
"""Sets accelerator type to use when executing this task.
Expand All @@ -527,6 +591,7 @@ def set_accelerator_type(self, accelerator: str) -> 'PipelineTask':

return self

@block_if_final()
def set_display_name(self, name: str) -> 'PipelineTask':
"""Sets display name for the task.
Expand All @@ -539,6 +604,7 @@ def set_display_name(self, name: str) -> 'PipelineTask':
self._task_spec.display_name = name
return self

@block_if_final()
def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
"""Sets environment variable for the task.
Expand All @@ -557,6 +623,7 @@ def set_env_variable(self, name: str, value: str) -> 'PipelineTask':
self.container_spec.env = {name: value}
return self

@block_if_final()
def after(self, *tasks) -> 'PipelineTask':
"""Specifies an explicit dependency on other tasks by requiring this
task be executed after other tasks finish completion.
Expand All @@ -580,6 +647,7 @@ def my_pipeline():
self._task_spec.dependent_tasks.append(task.name)
return self

@block_if_final()
def ignore_upstream_failure(self) -> 'PipelineTask':
"""If called, the pipeline task will run when any specified upstream
tasks complete, even if unsuccessful.
Expand Down
123 changes: 123 additions & 0 deletions sdk/python/kfp/dsl/pipeline_task_test.py
Expand Up @@ -377,5 +377,128 @@ def my_pipeline():
t.platform_spec


class TestTaskInFinalState(unittest.TestCase):

def test_output_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
task._outputs = {'Output': 1}
self.assertEqual(task.output, 1)
self.assertEqual(task.outputs['Output'], 1)

def test_outputs_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
task._outputs = {
'int_output':
1,
'str_output':
'foo',
'dataset_output':
dsl.Dataset(
name='dataset_output',
uri='foo/bar/dataset_output',
metadata={'key': 'value'})
}
self.assertEqual(task.outputs['int_output'], 1)
self.assertEqual(task.outputs['str_output'], 'foo')
assert_artifacts_equal(
self,
task.outputs['dataset_output'],
dsl.Dataset(
name='dataset_output',
uri='foo/bar/dataset_output',
metadata={'key': 'value'}),
)

def test_platform_spec_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
NotImplementedError,
r'Platform-specific features are not supported for local execution\.'
):
task.platform_spec

def test_name_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
self.assertEqual(task.name, 'component1')

def test_inputs_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
self.assertEqual(task.inputs, {'input1': 'value'})

def test_dependent_tasks_property(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
NotImplementedError,
r'Task has no dependent tasks since it is executed independently\.'
):
task.dependent_tasks

def test_sampling_of_task_configuration_methods(self):
task = pipeline_task.PipelineTask(
component_spec=structures.ComponentSpec.from_yaml_documents(
V2_YAML),
args={'input1': 'value'},
)
task.state = pipeline_task.TaskState.FINAL
with self.assertRaisesRegex(
NotImplementedError,
r"Task configuration methods are not supported for local execution\. Got call to '\.set_caching_options\(\)'\."
):
task.set_caching_options(enable_caching=True)
with self.assertRaisesRegex(
NotImplementedError,
r"Task configuration methods are not supported for local execution\. Got call to '\.set_env_variable\(\)'\."
):
task.set_env_variable(name='foo', value='BAR')
with self.assertRaisesRegex(
NotImplementedError,
r"Task configuration methods are not supported for local execution\. Got call to '\.ignore_upstream_failure\(\)'\."
):
task.ignore_upstream_failure()


def assert_artifacts_equal(
test_class: unittest.TestCase,
a1: dsl.Artifact,
a2: dsl.Artifact,
) -> None:
test_class.assertEqual(a1.name, a2.name)
test_class.assertEqual(a1.uri, a2.uri)
test_class.assertEqual(a1.metadata, a2.metadata)
test_class.assertEqual(a1.schema_title, a2.schema_title)
test_class.assertEqual(a1.schema_version, a2.schema_version)
test_class.assertIsInstance(a1, type(a2))


if __name__ == '__main__':
unittest.main()
34 changes: 34 additions & 0 deletions sdk/python/kfp/local/task_dispatcher.py
@@ -0,0 +1,34 @@
# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Code for dispatching a local task execution."""
from typing import Any, Dict

from kfp.pipeline_spec import pipeline_spec_pb2


def run_single_component(
pipeline_spec: pipeline_spec_pb2.PipelineSpec,
arguments: Dict[str, Any],
) -> Dict[str, Any]:
"""Runs a single component from its compiled PipelineSpec.
Args:
pipeline_spec: The PipelineSpec of the component to run.
arguments: The runtime arguments.
Returns:
A LocalTask instance.
"""
# TODO: implement and return outputs
return {}

0 comments on commit 0552bae

Please sign in to comment.