Skip to content

Commit

Permalink
SDK/Components - Removed outputs from task factory function signature (
Browse files Browse the repository at this point in the history
…#388)

This realizes the outputs handling roadmap and solves problems with input and output name clashes.
  • Loading branch information
Ark-kun authored and k8s-ci-robot committed Dec 3, 2018
1 parent d355a15 commit 96ec194
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 48 deletions.
31 changes: 10 additions & 21 deletions sdk/python/kfp/components/_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
]

import sys
from collections import OrderedDict
from ._yaml_utils import load_yaml, dump_yaml
from ._structures import ComponentSpec

Expand Down Expand Up @@ -191,8 +192,7 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo
inputs_dict = {port.name: port for port in inputs_list}

input_name_to_pythonic = {}
output_name_to_pythonic = {}
pythonic_name_to_original = {}
pythonic_name_to_input_name = {}

input_name_to_kubernetes = {}
output_name_to_kubernetes = {}
Expand All @@ -201,21 +201,16 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo

for io_port in inputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_original, '_')
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_input_name, '_')
input_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_original[pythonic_name] = io_port.name
pythonic_name_to_input_name[pythonic_name] = io_port.name

kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_input_name, '-')
input_name_to_kubernetes[io_port.name] = kubernetes_name
kubernetes_name_to_input_name[kubernetes_name] = io_port.name

for io_port in outputs_list:
pythonic_name = _sanitize_python_function_name(io_port.name)
pythonic_name = _make_name_unique_by_adding_index(pythonic_name, pythonic_name_to_original, '_')
output_name_to_pythonic[io_port.name] = pythonic_name
pythonic_name_to_original[pythonic_name] = io_port.name

kubernetes_name = _sanitize_kubernetes_resource_name(io_port.name)
kubernetes_name = _make_name_unique_by_adding_index(kubernetes_name, kubernetes_name_to_output_name, '-')
output_name_to_kubernetes[io_port.name] = kubernetes_name
Expand All @@ -225,9 +220,11 @@ def _create_task_factory_from_component_spec(component_spec:ComponentSpec, compo
container_image = container_spec.image

file_inputs={}
file_outputs_from_def={}
file_outputs_from_def = OrderedDict()
if container_spec.file_outputs != None:
file_outputs_from_def = {output_name_to_kubernetes[param]: path for param, path in container_spec.file_outputs.items()}
for param, path in container_spec.file_outputs.items():
output_key = output_name_to_kubernetes[param]
file_outputs_from_def[output_key] = path

def create_container_op_with_expanded_arguments(pythonic_input_argument_values):
file_outputs = file_outputs_from_def.copy()
Expand Down Expand Up @@ -280,12 +277,7 @@ def expand_command_part(arg): #input values with original names
elif func_name == 'output':
assert isinstance(func_argument, str)
port_name = func_argument
pythonic_port_name = output_name_to_pythonic[port_name]
if pythonic_port_name in pythonic_input_argument_values and pythonic_input_argument_values[pythonic_port_name] is not None:
output_filename = str(pythonic_input_argument_values[pythonic_port_name])
else:
output_filename = _generate_output_file_name(port_name)
#We need to pass the file mapping to file_outputs
output_filename = _generate_output_file_name(port_name)
output_key = output_name_to_kubernetes[port_name]
if output_key in file_outputs:
if file_outputs[output_key] != output_filename:
Expand Down Expand Up @@ -364,10 +356,7 @@ def expand_argument_list(argument_list):
#Reordering the inputs since in Python optional parameters must come after reuired parameters
reordered_input_list = [input for input in inputs_list if not input.optional] + [input for input in inputs_list if input.optional]
input_parameters = [_dynamic.KwParameter(input_name_to_pythonic[port.name], annotation=(_try_get_object_by_name(port.type) if port.type else inspect.Parameter.empty), default=(None if port.optional else inspect.Parameter.empty)) for port in reordered_input_list]
output_parameters = [_dynamic.KwParameter(output_name_to_pythonic[port.name], annotation=('OutputFile[{}]'.format(port.type) if port.type else inspect.Parameter.empty), default=None) for port in outputs_list]

#Still allowing to set the output parameters, but make them optional and auto-generate if missing.
factory_function_parameters = input_parameters + output_parameters
factory_function_parameters = input_parameters #Outputs are no longer part of the task factory function signature. The paths are always generated by the system.

return _dynamic.create_function_from_parameters(
create_container_op_with_expanded_arguments,
Expand Down
19 changes: 2 additions & 17 deletions sdk/python/tests/components/test_components.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def test_loading_minimal_component(self):
task1 = task_factory1()
assert task1.image == component_dict['implementation']['container']['image']

@unittest.expectedFailure
@unittest.expectedFailure #TODO: Check this in the ComponentSpec class, not during materialization.
def test_fail_on_duplicate_input_names(self):
component_text = '''\
inputs:
Expand All @@ -85,6 +85,7 @@ def test_fail_on_duplicate_input_names(self):
'''
task_factory1 = comp.load_component_from_text(component_text)

@unittest.skip #TODO: Fix in the ComponentSpec class
@unittest.expectedFailure
def test_fail_on_duplicate_output_names(self):
component_text = '''\
Expand Down Expand Up @@ -301,22 +302,6 @@ def test_input_value_resolving(self):

self.assertEqual(task1.arguments, ['--data', 'some-data'])

def test_output_resolving(self):
component_text = '''\
outputs:
- {name: Data}
implementation:
container:
image: busybox
args:
- --output-data
- output: Data
'''
task_factory1 = comp.load_component(text=component_text)
task1 = task_factory1(data='/outputs/some-data')

self.assertEqual(task1.arguments, ['--output-data', '/outputs/some-data'])

def test_automatic_output_resolving(self):
component_text = '''\
outputs:
Expand Down
29 changes: 19 additions & 10 deletions sdk/python/tests/components/test_python_op.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import subprocess
import tempfile
import unittest
from contextlib import contextmanager
from pathlib import Path

import kfp.components as comp
Expand All @@ -24,6 +25,15 @@ def add_two_numbers(a: float, b: float) -> float:
return a + b


@contextmanager
def components_local_output_dir_context(output_dir: str):
old_dir = comp._components._outputs_dir
try:
comp._components._outputs_dir = output_dir
yield output_dir
finally:
comp._components._outputs_dir = old_dir

class PythonOpTestCase(unittest.TestCase):
def helper_test_2_in_1_out_component_using_local_call(self, func, op):
arg1 = float(3)
Expand All @@ -35,15 +45,15 @@ def helper_test_2_in_1_out_component_using_local_call(self, func, op):
expected_str = str(expected)

with tempfile.TemporaryDirectory() as temp_dir_name:
output_path = Path(temp_dir_name).joinpath('output1')

task = op(arg1, arg2, str(output_path))
with components_local_output_dir_context(temp_dir_name):
task = op(arg1, arg2)

full_command = task.command + task.arguments

process = subprocess.run(full_command)

actual_str = output_path.read_text()
output_path = list(task.file_outputs.values())[0]
actual_str = Path(output_path).read_text()

self.assertEqual(float(actual_str), float(expected_str))

Expand All @@ -56,17 +66,16 @@ def helper_test_2_in_2_out_component_using_local_call(self, func, op):
expected2_str = str(expected_tuple[1])

with tempfile.TemporaryDirectory() as temp_dir_name:
output_path1 = Path(temp_dir_name).joinpath('output1')
output_path2 = Path(temp_dir_name).joinpath('output2')

task = op(arg1, arg2, str(output_path1), str(output_path2))
with components_local_output_dir_context(temp_dir_name):
task = op(arg1, arg2)

full_command = task.command + task.arguments

process = subprocess.run(full_command)

actual1_str = output_path1.read_text()
actual2_str = output_path2.read_text()
(output_path1, output_path2) = list(task.file_outputs.values()) #Relying on the fact the file_outputs dict should be preserving the insertion order which should match the outputs order for python components. Otherwise a component spec is needed to generate the mapping.
actual1_str = Path(output_path1).read_text()
actual2_str = Path(output_path2).read_text()

self.assertEqual(float(actual1_str), float(expected1_str))
self.assertEqual(float(actual2_str), float(expected2_str))
Expand Down

0 comments on commit 96ec194

Please sign in to comment.