Skip to content

Commit

Permalink
Add support for dsl.ResourceOp (kubeflow#89)
Browse files Browse the repository at this point in the history
  • Loading branch information
fenglixa committed Apr 11, 2020
1 parent cf8e998 commit ebe05e3
Show file tree
Hide file tree
Showing 7 changed files with 283 additions and 22 deletions.
4 changes: 3 additions & 1 deletion sdk/README.md
Expand Up @@ -97,4 +97,6 @@ To compile Kubeflow Pipelines as Tekton pipelineRun, simply add the `--generate-
- `dsl-compile-tekton --py sdk/python/tests/compiler/testdata/tolerations.py --output pipeline.yaml --generate-pipelinerun`
## Troubleshooting
- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline.
- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline.
- When you add test cases to compiler_tests, the output of pipeline/pipelinerun yaml may has uncertain values or orders, then you can define a lambda function as normalize_compiler_output_function to pass the testing.
104 changes: 84 additions & 20 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Expand Up @@ -16,7 +16,6 @@
from kfp.compiler._k8s_helper import convert_k8s_obj_to_json
from kfp.compiler._op_to_template import _process_obj, _inputs_to_json, _outputs_to_json
from kfp import dsl
from kfp.dsl import ArtifactLocation
from kfp.dsl._container_op import BaseOp

from .. import tekton_api_version
Expand Down Expand Up @@ -106,21 +105,81 @@ def _op_to_template(op: BaseOp):
}

elif isinstance(op, dsl.ResourceOp):
# # no output artifacts
# output_artifacts = []
#
# # workflow template
# processed_op.resource["manifest"] = yaml.dump(
# convert_k8s_obj_to_json(processed_op.k8s_resource),
# default_flow_style=False
# )
# template = {
# 'name': processed_op.name,
# 'resource': convert_k8s_obj_to_json(
# processed_op.resource
# )
# }
raise NotImplementedError("dsl.ResourceOp is not yet implemented")
# no output artifacts
output_artifacts = []

# task template
template = {
'apiVersion': tekton_api_version,
'kind': 'Task',
'metadata': {'name': processed_op.name},
'spec': {
"params": [
{
"description": "Action on the resource",
"name": "action",
"type": "string"
},
{
"default": "strategic",
"description": "Merge strategy when using action patch",
"name": "merge-strategy",
"type": "string"
},
{
"description": "Content of the resource to deploy",
"name": "manifest",
"type": "string"
},
{
"default": "",
"description": "An express to retrieval data from resource.",
"name": "output",
"type": "string"
},
{
"default": "",
"description": "A label selector express to decide if the action on resource is success.",
"name": "success-condition",
"type": "string"
},
{
"default": "",
"description": "A label selector express to decide if the action on resource is failure.",
"name": "failure-condition",
"type": "string"
},
{
"default": "index.docker.io/fenglixa/kubeclient:v0.0.1", # Todo: The image need to be replaced, once there are official images from tekton
"description": "Kubectl wrapper image",
"name": "image",
"type": "string"
},
{
"default": "false",
"description": "Enable set owner reference for created resource.",
"name": "set-ownerreference",
"type": "string"
}
],
'steps': [
{
"args": [
"--action=$(params.action)",
"--merge-strategy=$(params.merge-strategy)",
"--manifest=$(params.manifest)",
"--output=$(params.output)",
"--success-condition=$(params.success-condition)",
"--failure-condition=$(params.failure-condition)",
"--set-ownerreference=$(params.set-ownerreference)"
],
"image": "$(params.image)",
"name": processed_op.name,
"resources": {}
}
]
}
}

# initContainers
if processed_op.init_containers:
Expand All @@ -133,16 +192,21 @@ def _op_to_template(op: BaseOp):
artifact_arguments = processed_op.artifact_arguments if isinstance(processed_op, dsl.ContainerOp) else None
inputs = _inputs_to_json(processed_op.inputs, input_artifact_paths, artifact_arguments)
if 'parameters' in inputs:
template['spec']['params'] = inputs['parameters']
if isinstance(processed_op, dsl.ContainerOp):
template['spec']['params'] = inputs['parameters']
elif isinstance(op, dsl.ResourceOp):
template['spec']['params'].extend(inputs['parameters'])
elif 'artifacts' in inputs:
raise NotImplementedError("input artifacts are not yet implemented")

# outputs
if isinstance(op, dsl.ContainerOp):
op_outputs = processed_op.outputs
param_outputs = processed_op.file_outputs
elif isinstance(op, dsl.ResourceOp):
param_outputs = processed_op.attribute_outputs
outputs_dict = _outputs_to_json(op, processed_op.outputs, param_outputs, output_artifacts)
op_outputs = {}
param_outputs = {}
outputs_dict = _outputs_to_json(op, op_outputs, param_outputs, output_artifacts)
if outputs_dict:
"""
Since Tekton results need to be under /tekton/results. If file output paths cannot be
Expand All @@ -167,7 +231,7 @@ def _op_to_template(op: BaseOp):
volume_mount_step_template = []
volume_template = []
mounted_paths = []
for name, path in processed_op.file_outputs.items():
for name, path in param_outputs.items():
name = name.replace('_', '-') # replace '_' to '-' since tekton results doesn't support underscore
template['spec']['results'].append({
'name': name,
Expand Down
31 changes: 31 additions & 0 deletions sdk/python/kfp_tekton/compiler/compiler.py
Expand Up @@ -320,6 +320,37 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
if op.timeout:
task['timeout'] = '%ds' % op.timeout

# handle resourceOp cases in pipeline
for task in task_refs:
op = pipeline.ops.get(task['name'])
if isinstance(op, dsl.ResourceOp):
action = op.resource.get('action')
merge_strategy = op.resource.get('merge_strategy')
success_condition = op.resource.get('success_condition')
failure_condition = op.resource.get('failure_condition')
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != "image"]
if not merge_strategy:
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != 'merge-strategy']
if not success_condition:
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != 'success-condition']
if not failure_condition:
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != "failure-condition"]
for tp in task.get('params', []):
if tp.get('name') == "action" and action:
tp['value'] = action
if tp.get('name') == "merge-strategy" and merge_strategy:
tp['value'] = merge_strategy
if tp.get('name') == "success-condition" and success_condition:
tp['value'] = success_condition
if tp.get('name') == "failure-condition" and failure_condition:
tp['value'] = failure_condition
if tp.get('name') == "manifest":
manifest = yaml.dump(convert_k8s_obj_to_json(op.k8s_resource), default_flow_style=False)
tp['value'] = manifest
if tp.get('name') == "output":
output_values = ','.join(set(list(op.attribute_outputs.values())))
tp['value'] = output_values

# process loop parameters, keep this section in the behind of other processes, ahead of gen pipeline
root_group = pipeline.groups[0]
op_name_to_for_loop_op = self._get_for_loop_ops(root_group)
Expand Down
8 changes: 8 additions & 0 deletions sdk/python/tests/compiler/compiler_tests.py
Expand Up @@ -124,6 +124,14 @@ def test_timeout_workflow(self):
from .testdata.timeout import timeout_sample_pipeline
self._test_pipeline_workflow(timeout_sample_pipeline, 'timeout.yaml')

def test_resourceOp_workflow(self):
"""
Test compiling a resourceOp basic workflow.
"""
from .testdata.resourceop_basic import resourceop_basic
nf = lambda f: re.sub("{},{.metadata.name}", "{.metadata.name},{}", f)
self._test_pipeline_workflow(resourceop_basic, 'resourceop_basic.yaml', normalize_compiler_output_function=nf)

def test_hidden_output_file_workflow(self):
"""
Test compiling a workflow with non configurable output file.
Expand Down
67 changes: 67 additions & 0 deletions sdk/python/tests/compiler/testdata/resourceop_basic.py
@@ -0,0 +1,67 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


"""
This example demonstrates how to use ResourceOp to specify the value of env var.
"""

import json
import kfp.dsl as dsl

_CONTAINER_MANIFEST = """
{
"apiVersion": "batch/v1",
"kind": "Job",
"metadata": {
"generateName": "resourceop-basic-job-"
},
"spec": {
"template": {
"metadata": {
"name": "resource-basic"
},
"spec": {
"containers": [{
"name": "sample-container",
"image": "k8s.gcr.io/busybox",
"command": ["/usr/bin/env"]
}],
"restartPolicy": "Never"
}
},
"backoffLimit": 4
}
}
"""


@dsl.pipeline(
name="ResourceOp Basic",
description="A Basic Example on ResourceOp Usage."
)
def resourceop_basic():

# Start a container. Print out env vars.
op = dsl.ResourceOp(
name='test-step',
k8s_resource=json.loads(_CONTAINER_MANIFEST),
action='create'
)


if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(resourceop_basic, __file__.replace('.py', '.yaml'))
89 changes: 89 additions & 0 deletions sdk/python/tests/compiler/testdata/resourceop_basic.yaml
@@ -0,0 +1,89 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: test-step
spec:
params:
- description: Action on the resource
name: action
type: string
- default: strategic
description: Merge strategy when using action patch
name: merge-strategy
type: string
- description: Content of the resource to deploy
name: manifest
type: string
- default: ''
description: An express to retrieval data from resource.
name: output
type: string
- default: ''
description: A label selector express to decide if the action on resource is success.
name: success-condition
type: string
- default: ''
description: A label selector express to decide if the action on resource is failure.
name: failure-condition
type: string
- default: index.docker.io/fenglixa/kubeclient:v0.0.1
description: Kubectl wrapper image
name: image
type: string
- default: 'false'
description: Enable set owner reference for created resource.
name: set-ownerreference
type: string
steps:
- args:
- --action=$(params.action)
- --merge-strategy=$(params.merge-strategy)
- --manifest=$(params.manifest)
- --output=$(params.output)
- --success-condition=$(params.success-condition)
- --failure-condition=$(params.failure-condition)
- --set-ownerreference=$(params.set-ownerreference)
image: $(params.image)
name: test-step
resources: {}
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A Basic Example on ResourceOp
Usage.", "name": "ResourceOp Basic"}'
name: resourceop-basic
spec:
params: []
tasks:
- name: test-step
params:
- name: action
value: create
- name: manifest
value: "apiVersion: batch/v1\nkind: Job\nmetadata:\n generateName: resourceop-basic-job-\n\
spec:\n backoffLimit: 4\n template:\n metadata:\n name: resource-basic\n\
\ spec:\n containers:\n - command:\n - /usr/bin/env\n\
\ image: k8s.gcr.io/busybox\n name: sample-container\n \
\ restartPolicy: Never\n"
- name: output
value: '{.metadata.name},{}'
- name: set-ownerreference
value: 'false'
taskRef:
name: test-step
2 changes: 1 addition & 1 deletion sdk/python/tests/test_kfp_samples_report.txt
Expand Up @@ -12,7 +12,7 @@ FAILURE: param_substitutions.py
SUCCESS: pipelineparams.py
FAILURE: recursive_do_while.py
SUCCESS: recursive_while.py
FAILURE: resourceop_basic.py
SUCCESS: resourceop_basic.py
SUCCESS: sidecar.py
SUCCESS: timeout.py
SUCCESS: volume.py
Expand Down

0 comments on commit ebe05e3

Please sign in to comment.