Skip to content

Commit

Permalink
SDK/Components - Removed outputs from task factory function signature
Browse files Browse the repository at this point in the history
This realizes the outputs handling roadmap and solves problems with input and output name clashes.
  • Loading branch information
Ark-kun committed Dec 3, 2018
1 parent 351d9cc commit dae7b13
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 48 deletions.
31 changes: 10 additions & 21 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
]

import sys
from collections import OrderedDict
from ._yaml_utils import load_yaml, dump_yaml
from ._structures import ComponentSpec

Expand Down Expand Up @@ -191,8 +192,7 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo
inputs_dict = {port.name: port for port in inputs_list}

input_name_to_pythonic = {}
output_name_to_pythonic = {}
pythonic_name_to_original = {}
pythonic_name_to_input_name = {}

input_name_to_kubernetes = {}
output_name_to_kubernetes = {}
Expand All @@ -201,21 +201,16 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo

for io_port in inputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_original, '_')
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_input_name, '_')
input_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_original[pythonic_name] = io_port.name
pythonic_name_to_input_name[pythonic_name] = io_port.name

kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_input_name, '-')
input_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_input_name[kubernetes_name] = io_port.name

for io_port in outputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_original, '_')
output_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_original[pythonic_name] = io_port.name

kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[io_port.name] = kubernetes_name
Expand All @@ -225,9 +220,11 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo
container_image = container_spec.image

file_inputs={}
file_outputs_from_def={}
file_outputs_from_def = OrderedDict()
if container_spec.file_outputs != None:
file_outputs_from_def = {output_name_to_kubernetes[param]: path for param, path in container_spec.file_outputs.items()}
for param, path in container_spec.file_outputs.items():
output_key = output_name_to_kubernetes[param]
file_outputs_from_def[output_key] = path

def create_container_op_with_expanded_arguments(pythonic_input_argument_values):
file_outputs = file_outputs_from_def.copy()
Expand Down Expand Up @@ -280,12 +277,7 @@ def expand_command_part(arg): #input values with original names
elif func_name == 'output':
assert isinstance(func_argument, str)
port_name = func_argument
pythonic_port_name = output_name_to_pythonic[port_name]
if pythonic_port_name in pythonic_input_argument_values and pythonic_input_argument_values[pythonic_port_name] is not None:
output_filename = str(pythonic_input_argument_values[pythonic_port_name])
else:
output_filename = _generate_output_file_name(port_name)
#We need to pass the file mapping to file_outputs
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
Expand Down Expand Up @@ -364,10 +356,7 @@ def expand_argument_list(argument_list):
#Reordering the inputs since in Python optional parameters must come after reuired parameters
reordered_input_list = [input for input in inputs_list if not input.optional] + [input for input in inputs_list if input.optional]
input_parameters = [_dynamic.KwParameter(input_name_to_pythonic[port.name], annotation=(_try_get_object_by_name(port.type) if port.type else inspect.Parameter.empty), default=(None if port.optional else inspect.Parameter.empty)) for port in reordered_input_list]
output_parameters = [_dynamic.KwParameter(output_name_to_pythonic[port.name], annotation=('OutputFile[{}]'.format(port.type) if port.type else inspect.Parameter.empty), default=None) for port in outputs_list]

#Still allowing to set the output parameters, but make them optional and auto-generate if missing.
factory_function_parameters = input_parameters + output_parameters
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.

return _dynamic.create_function_from_parameters(
create_container_op_with_expanded_arguments,
Expand Down
19 changes: 2 additions & 17 deletions sdk/python/tests/components/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_loading_minimal_component(self):
task1 = task_factory1()
assert task1.image == component_dict['implementation']['container']['image']

@unittest.expectedFailure
@unittest.expectedFailure #TODO: Check this in the ComponentSpec class, not during materialization.
def test_fail_on_duplicate_input_names(self):
component_text = '''\
inputs:
Expand All @@ -85,6 +85,7 @@ def test_fail_on_duplicate_input_names(self):
'''
task_factory1 = comp.load_component_from_text(component_text)

@unittest.skip #TODO: Fix in the ComponentSpec class
@unittest.expectedFailure
def test_fail_on_duplicate_output_names(self):
component_text = '''\
Expand Down Expand Up @@ -301,22 +302,6 @@ def test_input_value_resolving(self):

self.assertEqual(task1.arguments, ['--data', 'some-data'])

def test_output_resolving(self):
component_text = '''\
outputs:
- {name: Data}
implementation:
container:
image: busybox
args:
- --output-data
- output: Data
'''
task_factory1 = comp.load_component(text=component_text)
task1 = task_factory1(data='/outputs/some-data')

self.assertEqual(task1.arguments, ['--output-data', '/outputs/some-data'])

def test_automatic_output_resolving(self):
component_text = '''\
outputs:
Expand Down
29 changes: 19 additions & 10 deletions sdk/python/tests/components/test_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import subprocess
import tempfile
import unittest
from contextlib import contextmanager
from pathlib import Path

import kfp.components as comp
Expand All @@ -24,6 +25,15 @@ def add_two_numbers(a: float, b: float) -> float:
return a + b


@contextmanager
def components_local_output_dir_context(output_dir: str):
old_dir = comp._components._outputs_dir
try:
comp._components._outputs_dir = output_dir
yield output_dir
finally:
comp._components._outputs_dir = old_dir

class PythonOpTestCase(unittest.TestCase):
def helper_test_2_in_1_out_component_using_local_call(self, func, op):
arg1 = float(3)
Expand All @@ -35,15 +45,15 @@ def helper_test_2_in_1_out_component_using_local_call(self, func, op):
expected_str = str(expected)

with tempfile.TemporaryDirectory() as temp_dir_name:
output_path = Path(temp_dir_name).joinpath('output1')

task = op(arg1, arg2, str(output_path))
with components_local_output_dir_context(temp_dir_name):
task = op(arg1, arg2)

full_command = task.command + task.arguments

process = subprocess.run(full_command)

actual_str = output_path.read_text()
output_path = list(task.file_outputs.values())[0]
actual_str = Path(output_path).read_text()

self.assertEqual(float(actual_str), float(expected_str))

Expand All @@ -56,17 +66,16 @@ def helper_test_2_in_2_out_component_using_local_call(self, func, op):
expected2_str = str(expected_tuple[1])

with tempfile.TemporaryDirectory() as temp_dir_name:
output_path1 = Path(temp_dir_name).joinpath('output1')
output_path2 = Path(temp_dir_name).joinpath('output2')

task = op(arg1, arg2, str(output_path1), str(output_path2))
with components_local_output_dir_context(temp_dir_name):
task = op(arg1, arg2)

full_command = task.command + task.arguments

process = subprocess.run(full_command)

actual1_str = output_path1.read_text()
actual2_str = output_path2.read_text()
(output_path1, output_path2) = list(task.file_outputs.values()) #Relying on the fact the file_outputs dict should be preserving the insertion order which should match the outputs order for python components. Otherwise a component spec is needed to generate the mapping.
actual1_str = Path(output_path1).read_text()
actual2_str = Path(output_path2).read_text()

self.assertEqual(float(actual1_str), float(expected1_str))
self.assertEqual(float(actual2_str), float(expected2_str))
Expand Down

0 comments on commit dae7b13

Please sign in to comment.