Skip to content

Commit

Permalink
DSL refactor (#619)
Browse files Browse the repository at this point in the history
* add comments

* relocate functions in compiler to aggregate similar functions; move _build_conventional_artifact as a nested function

* reduce sanitize functions into one in the dsl.

* more comments

* move all sanitization(op name, param name) from dsl to compiler

* sanitize pipelineparam name and op_name; remove format check in pipelineparam

* remove unit test for pipelineparam op_name format checking

* fix bug: correctly replace input in the argument list

* fix bug: replace arguments with found ones

* Sanitize the file_output keys, Matches the param in the args/cmds with the whole serialized param str, Verify both param name and container name

* loosen the containerop and param name restrictions
  • Loading branch information
gaoning777 authored and k8s-ci-robot committed Jan 9, 2019
1 parent 76f8b6b commit d3c4add
Show file tree
Hide file tree
Showing 8 changed files with 180 additions and 146 deletions.
3 changes: 2 additions & 1 deletion sdk/python/kfp/_client.py
Expand Up @@ -22,6 +22,7 @@
from datetime import datetime

from .compiler import compiler
from .compiler import _k8s_helper


class Client(object):
Expand Down Expand Up @@ -171,7 +172,7 @@ def run_pipeline(self, experiment_id, job_name, pipeline_package_path, params={}

pipeline_obj = self._extract_pipeline_yaml(pipeline_package_path)
pipeline_json_string = json.dumps(pipeline_obj)
api_params = [kfp_run.ApiParameter(name=compiler.Compiler()._sanitize_name(k), value=str(v))
api_params = [kfp_run.ApiParameter(name=_k8s_helper.K8sHelper.sanitize_k8s_name(k), value=str(v))
for k,v in params.items()]
key = kfp_run.models.ApiResourceKey(id=experiment_id,
type=kfp_run.models.ApiResourceType.EXPERIMENT)
Expand Down
9 changes: 8 additions & 1 deletion sdk/python/kfp/compiler/_k8s_helper.py
Expand Up @@ -17,7 +17,7 @@
from kubernetes import config
import time
import logging

import re

class K8sHelper(object):
""" Kubernetes Helper """
Expand Down Expand Up @@ -119,6 +119,13 @@ def run_job(self, yaml_spec, timeout=600):
self._delete_k8s_job(pod_name, yaml_spec)
return succ

@staticmethod
def sanitize_k8s_name(name):
"""From _make_kubernetes_name
sanitize_k8s_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

@staticmethod
def convert_k8s_obj_to_json(k8s_obj):
"""
Expand Down
274 changes: 151 additions & 123 deletions sdk/python/kfp/compiler/compiler.py
Expand Up @@ -38,132 +38,16 @@ def my_pipeline(a: dsl.PipelineParam, b: dsl.PipelineParam):
```
"""

def _sanitize_name(self, name):
"""From _make_kubernetes_name
_sanitize_name cleans and converts the names in the workflow.
"""
return re.sub('-+', '-', re.sub('[^-0-9a-z]+', '-', name.lower())).lstrip('-').rstrip('-')

def _pipelineparam_full_name(self, param):
"""_pipelineparam_full_name
"""_pipelineparam_full_name converts the names of pipeline parameters
to unique names in the argo yaml
Args:
param(PipelineParam): pipeline parameter
"""
if param.op_name:
return param.op_name + '-' + param.name
return self._sanitize_name(param.name)

def _build_conventional_artifact(self, name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []

processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
for param in argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(
str(param), '{{inputs.parameters.%s}}' % full_name, str(processed_args[i]))

return processed_args

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(self._build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(self._build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts


# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template
return param.name

def _get_groups_for_ops(self, root_group):
"""Helper function to get belonging groups for each op.
Expand Down Expand Up @@ -230,7 +114,7 @@ def _get_inputs_outputs(self, pipeline, root_group, op_groups):
# op's inputs and all params used in conditions for that op are both considered.
for param in op.inputs + list(condition_params[op.name]):
# if the value is already provided (immediate value), then no need to expose
# it as input for its parent groups.
# it as input for its parent groups.
if param.value:
continue

Expand Down Expand Up @@ -327,6 +211,126 @@ def _resolve_value_or_reference(self, value_or_reference, potential_references):
else:
return str(value_or_reference)

def _process_args(self, raw_args, argument_inputs):
if not raw_args:
return []
processed_args = list(map(str, raw_args))
for i, _ in enumerate(processed_args):
# unsanitized_argument_inputs stores a dict: string of sanitized param -> string of unsanitized param
matches = []
match = re.findall(r'{{pipelineparam:op=([\w\s\_-]*);name=([\w\s\_-]+);value=(.*?)}}', str(processed_args[i]))
matches += match
unsanitized_argument_inputs = {}
for x in list(set(matches)):
sanitized_str = str(dsl.PipelineParam(K8sHelper.sanitize_k8s_name(x[1]), K8sHelper.sanitize_k8s_name(x[0]), x[2]))
unsanitized_argument_inputs[sanitized_str] = str(dsl.PipelineParam(x[1], x[0], x[2]))

if argument_inputs:
for param in argument_inputs:
if str(param) in unsanitized_argument_inputs:
full_name = self._pipelineparam_full_name(param)
processed_args[i] = re.sub(unsanitized_argument_inputs[str(param)], '{{inputs.parameters.%s}}' % full_name,
processed_args[i])
return processed_args

def _op_to_template(self, op):
"""Generate template given an operator inherited from dsl.ContainerOp."""

def _build_conventional_artifact(name):
return {
'name': name,
'path': '/' + name + '.json',
's3': {
# TODO: parameterize namespace for minio service
'endpoint': 'minio-service.kubeflow:9000',
'bucket': 'mlpipeline',
'key': 'runs/{{workflow.uid}}/{{pod.name}}/' + name + '.tgz',
'insecure': True,
'accessKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'accesskey',
},
'secretKeySecret': {
'name': 'mlpipeline-minio-artifact',
'key': 'secretkey'
}
},
}

processed_arguments = self._process_args(op.arguments, op.argument_inputs)
processed_command = self._process_args(op.command, op.argument_inputs)

input_parameters = []
for param in op.inputs:
one_parameter = {'name': self._pipelineparam_full_name(param)}
if param.value:
one_parameter['value'] = str(param.value)
input_parameters.append(one_parameter)
# Sort to make the results deterministic.
input_parameters.sort(key=lambda x: x['name'])

output_parameters = []
for param in op.outputs.values():
output_parameters.append({
'name': self._pipelineparam_full_name(param),
'valueFrom': {'path': op.file_outputs[param.name]}
})
output_parameters.sort(key=lambda x: x['name'])

template = {
'name': op.name,
'container': {
'image': op.image,
}
}
if processed_arguments:
template['container']['args'] = processed_arguments
if processed_command:
template['container']['command'] = processed_command
if input_parameters:
template['inputs'] = {'parameters': input_parameters}

template['outputs'] = {}
if output_parameters:
template['outputs'] = {'parameters': output_parameters}

# Generate artifact for metadata output
# The motivation of appending the minio info in the yaml
# is to specify a unique path for the metadata.
# TODO: after argo addresses the issue that configures a unique path
# for the artifact output when default artifact repository is configured,
# this part needs to be updated to use the default artifact repository.
output_artifacts = []
output_artifacts.append(_build_conventional_artifact('mlpipeline-ui-metadata'))
output_artifacts.append(_build_conventional_artifact('mlpipeline-metrics'))
template['outputs']['artifacts'] = output_artifacts

# Set resources.
if op.resource_limits or op.resource_requests:
template['container']['resources'] = {}
if op.resource_limits:
template['container']['resources']['limits'] = op.resource_limits
if op.resource_requests:
template['container']['resources']['requests'] = op.resource_requests

# Set nodeSelector.
if op.node_selector:
template['nodeSelector'] = op.node_selector

if op.env_variables:
template['container']['env'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.env_variables))
if op.volume_mounts:
template['container']['volumeMounts'] = list(map(K8sHelper.convert_k8s_obj_to_json, op.volume_mounts))

if op.pod_annotations or op.pod_labels:
template['metadata'] = {}
if op.pod_annotations:
template['metadata']['annotations'] = op.pod_annotations
if op.pod_labels:
template['metadata']['labels'] = op.pod_labels

return template

def _group_to_template(self, group, inputs, outputs, dependencies):
"""Generate template given an OpsGroup.
Expand Down Expand Up @@ -505,10 +509,10 @@ def _compile(self, pipeline_func):
raise ValueError('Please use a function with @dsl.pipeline decorator.')

pipeline_name, _ = dsl.Pipeline.get_pipeline_functions()[pipeline_func]
pipeline_name = self._sanitize_name(pipeline_name)
pipeline_name = K8sHelper.sanitize_k8s_name(pipeline_name)

# Create the arg list with no default values and call pipeline function.
args_list = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
with dsl.Pipeline(pipeline_name) as p:
pipeline_func(*args_list)
Expand All @@ -517,12 +521,36 @@ def _compile(self, pipeline_func):
self._validate_exit_handler(p)

# Fill in the default values.
args_list_with_defaults = [dsl.PipelineParam(self._sanitize_name(arg_name))
args_list_with_defaults = [dsl.PipelineParam(K8sHelper.sanitize_k8s_name(arg_name))
for arg_name in argspec.args]
if argspec.defaults:
for arg, default in zip(reversed(args_list_with_defaults), reversed(argspec.defaults)):
arg.value = default.value if isinstance(default, dsl.PipelineParam) else default

# Sanitize operator names and param names
sanitized_ops = {}
for op in p.ops.values():
sanitized_name = K8sHelper.sanitize_k8s_name(op.name)
op.name = sanitized_name
for param in op.inputs + op.argument_inputs:
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
for param in op.outputs.values():
param.name = K8sHelper.sanitize_k8s_name(param.name)
if param.op_name:
param.op_name = K8sHelper.sanitize_k8s_name(param.op_name)
if op.output is not None:
op.output.name = K8sHelper.sanitize_k8s_name(op.output.name)
op.output.op_name = K8sHelper.sanitize_k8s_name(op.output.op_name)
if op.file_outputs is not None:
sanitized_file_outputs = {}
for key in op.file_outputs.keys():
sanitized_file_outputs[K8sHelper.sanitize_k8s_name(key)] = op.file_outputs[key]
op.file_outputs = sanitized_file_outputs
sanitized_ops[sanitized_name] = op
p.ops = sanitized_ops

workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow

Expand Down
9 changes: 7 additions & 2 deletions sdk/python/kfp/dsl/_container_op.py
Expand Up @@ -27,7 +27,8 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
"""Create a new instance of ContainerOp.
Args:
name: the name of the op. Has to be unique within a pipeline.
name: the name of the op. It does not have to be unique within a pipeline
because the pipeline will generates a unique new name in case of conflicts.
image: the container image name, such as 'python:3.5-jessie'
command: the command to run in the container.
If None, uses default CMD in defined in container.
Expand All @@ -43,6 +44,10 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None
if not _pipeline.Pipeline.get_default_pipeline():
raise ValueError('Default pipeline not defined.')

valid_name_regex = r'^[A-Za-z][A-Za-z0-9\s_-]*$'
if not re.match(valid_name_regex, name):
raise ValueError('Only letters, numbers, spaces, "_", and "-" are allowed in name. Must begin with letter: %s' % (name))

self.human_name = name
self.name = _pipeline.Pipeline.get_default_pipeline().add_op(self, is_exit_handler)
self.image = image
Expand All @@ -60,7 +65,7 @@ def __init__(self, name: str, image: str, command: str=None, arguments: str=None

matches = []
for arg in (command or []) + (arguments or []):
match = re.findall(r'{{pipelineparam:op=([\w-]*);name=([\w-]+);value=(.*?)}}', str(arg))
match = re.findall(r'{{pipelineparam:op=([\w\s_-]*);name=([\w\s_-]+);value=(.*?)}}', str(arg))
matches += match

self.argument_inputs = [_pipeline_param.PipelineParam(x[1], x[0], x[2])
Expand Down

0 comments on commit d3c4add

Please sign in to comment.