Skip to content

Commit

Permalink
Remove cops and rops pipeline attributes (#1298)
Browse files Browse the repository at this point in the history
* Remove the separated dictionaries for ContainerOps and ResourceOps
* Fix the sanitization performed by the compiler to iterate through ops
  dict and do type-check for the special fields file_outputs and
  attribute_outputs

Signed-off-by: Ilias Katsakioris <elikatsis@arrikto.com>
  • Loading branch information
elikatsis authored and k8s-ci-robot committed May 10, 2019
1 parent ce60661 commit b675e02
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 34 deletions.
37 changes: 9 additions & 28 deletions sdk/python/kfp/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,8 +601,8 @@ def _compile(self, pipeline_func):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

# Sanitize operator names and param names
sanitized_cops = {}
for op in p.cops.values():
sanitized_ops = {}
for op in p.ops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.outputs.values():
Expand All @@ -614,38 +614,19 @@ def _compile(self, pipeline_func):
op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name)
if op.dependent_names:
op.dependent_names = [K8sHelper.sanitize_k8s_name(name) for name in op.dependent_names]
if op.file_outputs is not None:
if isinstance(op, dsl.ContainerOp) and op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
sanitized_cops[sanitized_name] = op
p.cops = sanitized_cops
p.ops = dict(sanitized_cops)

# Sanitize operator names and param names of ResourceOps
sanitized_rops = {}
for rop in p.rops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(rop.name)
rop.name = sanitized_name
for param in rop.outputs.values():
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
if rop.output is not None:
rop.output.name = K8sHelper.sanitize_k8s_name(rop.output.name)
rop.output.op_name = K8sHelper.sanitize_k8s_name(rop.output.op_name)
if rop.dependent_names:
rop.dependent_names = [K8sHelper.sanitize_k8s_name(name) for name in rop.dependent_names]
if rop.attribute_outputs is not None:
elif isinstance(op, dsl.ResourceOp) and op.attribute_outputs is not None:
sanitized_attribute_outputs = {}
for key in rop.attribute_outputs.keys():
for key in op.attribute_outputs.keys():
sanitized_attribute_outputs[K8sHelper.sanitize_k8s_name(key)] = \
rop.attribute_outputs[key]
rop.attribute_outputs = sanitized_attribute_outputs
sanitized_rops[sanitized_name] = rop
p.rops = sanitized_rops
p.ops.update(dict(sanitized_rops))
op.attribute_outputs[key]
op.attribute_outputs = sanitized_attribute_outputs
sanitized_ops[sanitized_name] = op
p.ops = sanitized_ops

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow
Expand Down
6 changes: 0 additions & 6 deletions sdk/python/kfp/dsl/_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ def __init__(self, name: str):
"""
self.name = name
self.ops = {}
self.cops = {}
self.rops = {}
# Add the root group.
self.groups = [_ops_group.OpsGroup('pipeline', name=name)]
self.group_id = 0
Expand Down Expand Up @@ -148,10 +146,6 @@ def add_op(self, op: _container_op.BaseOp, define_only: bool):
op_name = _make_name_unique_by_adding_index(op.human_name, list(self.ops.keys()), ' ')

self.ops[op_name] = op
if isinstance(op, _container_op.ContainerOp):
self.cops[op_name] = op
elif isinstance(op, _resource_op.ResourceOp):
self.rops[op_name] = op
if not define_only:
self.groups[-1].ops.append(op)

Expand Down

0 comments on commit b675e02

Please sign in to comment.