Skip to content

Commit

Permalink
Add volumeop and volumesnapshot support (#93)
Browse files Browse the repository at this point in the history
* add volumeop support workaround

* add volumesnapshot workaround

* update resourceop integration

* add sorting to avoid randomness

* fix bug

* update sample

* minor code cleanup
  • Loading branch information
Tomcli committed Apr 21, 2020
1 parent 3d57d61 commit a47ae08
Show file tree
Hide file tree
Showing 10 changed files with 792 additions and 38 deletions.
3 changes: 0 additions & 3 deletions sdk/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,3 @@ Waiting for logs to be available...
## Troubleshooting
- Please be aware that defined Affinity, Node Selector, and Tolerations are applied to all the tasks in the same pipeline because there's only one podTemplate allowed in each pipeline.
- When you add test cases to compiler_tests, the output of pipeline/pipelinerun yaml may has uncertain values or orders, then you can define a lambda function as normalize_compiler_output_function to pass the testing.
32 changes: 25 additions & 7 deletions sdk/python/kfp_tekton/compiler/_op_to_template.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from kfp.dsl import ArtifactLocation
from urllib.parse import urlparse
import textwrap
import yaml
import re
import os

Expand Down Expand Up @@ -116,6 +117,16 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
# no output artifacts
output_artifacts = []

# Flatten manifest because it needs to replace Argo variables
manifest = yaml.dump(convert_k8s_obj_to_json(processed_op.k8s_resource), default_flow_style=False)
argo_var = False
if manifest.find('{{workflow.name}}') != -1:
# Kubernetes Pod arguments only take $() as environment variables
manifest = manifest.replace('{{workflow.name}}', "$(PIPELINERUN)")
# Remove yaml quote in order to read bash variables
manifest = re.sub('name: \'([^\']+)\'', 'name: \g<1>', manifest)
argo_var = True

# task template
template = {
'apiVersion': tekton_api_version,
Expand All @@ -134,11 +145,6 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
"name": "merge-strategy",
"type": "string"
},
{
"description": "Content of the resource to deploy",
"name": "manifest",
"type": "string"
},
{
"default": "",
"description": "An express to retrieval data from resource.",
Expand All @@ -158,7 +164,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
"type": "string"
},
{
"default": "index.docker.io/fenglixa/kubeclient:v0.0.1", # Todo: The image need to be replaced, once there are official images from tekton
"default": "index.docker.io/aipipeline/kubeclient:v0.0.2", # Todo: The image need to be replaced, once there are official images from tekton
"description": "Kubectl wrapper image",
"name": "image",
"type": "string"
Expand All @@ -175,7 +181,7 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
"args": [
"--action=$(params.action)",
"--merge-strategy=$(params.merge-strategy)",
"--manifest=$(params.manifest)",
"--manifest=%s" % manifest,
"--output=$(params.output)",
"--success-condition=$(params.success-condition)",
"--failure-condition=$(params.failure-condition)",
Expand All @@ -189,6 +195,18 @@ def _op_to_template(op: BaseOp, enable_artifacts=False):
}
}

# Inject Argo variable replacement as env variables.
if argo_var:
template['spec']['steps'][0]['env'] = [
{'name': 'PIPELINERUN', 'valueFrom': {'fieldRef': {'fieldPath': "metadata.labels['tekton.dev/pipelineRun']"}}}
]

# Add results if exist.
if op.attribute_outputs.items():
template['spec']['results'] = []
for output_item in sorted(list(op.attribute_outputs.items()), key=lambda x: x[0]):
template['spec']['results'].append({'name': output_item[0], 'description': output_item[1]})

# initContainers
if processed_op.init_containers:
steps = processed_op.init_containers.copy()
Expand Down
18 changes: 11 additions & 7 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import itertools
import zipfile
import re
import textwrap
from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional

from ._op_to_template import _op_to_template, literal_str
Expand Down Expand Up @@ -334,8 +335,8 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
if isinstance(op, dsl.ResourceOp):
action = op.resource.get('action')
merge_strategy = op.resource.get('merge_strategy')
success_condition = op.resource.get('success_condition')
failure_condition = op.resource.get('failure_condition')
success_condition = op.resource.get('successCondition')
failure_condition = op.resource.get('failureCondition')
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != "image"]
if not merge_strategy:
task['params'] = [tp for tp in task.get('params', []) if tp.get('name') != 'merge-strategy']
Expand All @@ -352,12 +353,15 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli
tp['value'] = success_condition
if tp.get('name') == "failure-condition" and failure_condition:
tp['value'] = failure_condition
if tp.get('name') == "manifest":
manifest = yaml.dump(convert_k8s_obj_to_json(op.k8s_resource), default_flow_style=False)
tp['value'] = manifest
if tp.get('name') == "output":
output_values = ','.join(set(list(op.attribute_outputs.values())))
tp['value'] = output_values
output_values = ''
for value in sorted(list(op.attribute_outputs.items()), key=lambda x: x[0]):
output_value = textwrap.dedent("""\
- name: %s
valueFrom: '%s'
""" % (value[0], value[1]))
output_values += output_value
tp['value'] = literal_str(output_values)

# process loop parameters, keep this section in the behind of other processes, ahead of gen pipeline
root_group = pipeline.groups[0]
Expand Down
17 changes: 15 additions & 2 deletions sdk/python/tests/compiler/compiler_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,21 @@ def test_resourceOp_workflow(self):
Test compiling a resourceOp basic workflow.
"""
from .testdata.resourceop_basic import resourceop_basic
nf = lambda f: re.sub("{},{.metadata.name}", "{.metadata.name},{}", f)
self._test_pipeline_workflow(resourceop_basic, 'resourceop_basic.yaml', normalize_compiler_output_function=nf)
self._test_pipeline_workflow(resourceop_basic, 'resourceop_basic.yaml')

def test_volumeOp_workflow(self):
"""
Test compiling a volumeOp basic workflow.
"""
from .testdata.volume_op import volumeop_basic
self._test_pipeline_workflow(volumeop_basic, 'volume_op.yaml')

def test_volumeSnapshotOp_workflow(self):
"""
Test compiling a volumeSnapshotOp basic workflow.
"""
from .testdata.volume_snapshot_op import volume_snapshotop_sequential
self._test_pipeline_workflow(volume_snapshotop_sequential, 'volume_snapshot_op.yaml')

def test_hidden_output_file_workflow(self):
"""
Expand Down
24 changes: 12 additions & 12 deletions sdk/python/tests/compiler/testdata/resourceop_basic.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@ spec:
description: Merge strategy when using action patch
name: merge-strategy
type: string
- description: Content of the resource to deploy
name: manifest
type: string
- default: ''
description: An express to retrieval data from resource.
name: output
Expand All @@ -40,19 +37,28 @@ spec:
description: A label selector express to decide if the action on resource is failure.
name: failure-condition
type: string
- default: index.docker.io/fenglixa/kubeclient:v0.0.1
- default: index.docker.io/aipipeline/kubeclient:v0.0.2
description: Kubectl wrapper image
name: image
type: string
- default: 'false'
description: Enable set owner reference for created resource.
name: set-ownerreference
type: string
results:
- description: '{}'
name: manifest
- description: '{.metadata.name}'
name: name
steps:
- args:
- --action=$(params.action)
- --merge-strategy=$(params.merge-strategy)
- --manifest=$(params.manifest)
- "--manifest=apiVersion: batch/v1\nkind: Job\nmetadata:\n generateName: resourceop-basic-job-\n\
spec:\n backoffLimit: 4\n template:\n metadata:\n name: resource-basic\n\
\ spec:\n containers:\n - command:\n - /usr/bin/env\n \
\ image: k8s.gcr.io/busybox\n name: sample-container\n restartPolicy:\
\ Never\n"
- --output=$(params.output)
- --success-condition=$(params.success-condition)
- --failure-condition=$(params.failure-condition)
Expand All @@ -75,14 +81,8 @@ spec:
params:
- name: action
value: create
- name: manifest
value: "apiVersion: batch/v1\nkind: Job\nmetadata:\n generateName: resourceop-basic-job-\n\
spec:\n backoffLimit: 4\n template:\n metadata:\n name: resource-basic\n\
\ spec:\n containers:\n - command:\n - /usr/bin/env\n\
\ image: k8s.gcr.io/busybox\n name: sample-container\n \
\ restartPolicy: Never\n"
- name: output
value: '{.metadata.name},{}'
value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n"
- name: set-ownerreference
value: 'false'
taskRef:
Expand Down
42 changes: 42 additions & 0 deletions sdk/python/tests/compiler/testdata/volume_op.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import kfp
import kfp.dsl as dsl


@dsl.pipeline(
name="VolumeOp Basic",
description="A Basic Example on VolumeOp Usage."
)
def volumeop_basic(size):
vop = dsl.VolumeOp(
name="create-pvc",
resource_name="my-pvc",
modes=dsl.VOLUME_MODE_RWO,
size=size
)

cop = dsl.ContainerOp(
name="cop",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo foo > /mnt/file1"],
pvolumes={"/mnt": vop.volume}
)

if __name__ == '__main__':
# don't use top-level import of TektonCompiler to prevent monkey-patching KFP compiler when using KFP's dsl-compile
from kfp_tekton.compiler import TektonCompiler
TektonCompiler().compile(volumeop_basic, __file__.replace('.py', '.yaml'))
128 changes: 128 additions & 0 deletions sdk/python/tests/compiler/testdata/volume_op.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: create-pvc
spec:
params:
- description: Action on the resource
name: action
type: string
- default: strategic
description: Merge strategy when using action patch
name: merge-strategy
type: string
- default: ''
description: An express to retrieval data from resource.
name: output
type: string
- default: ''
description: A label selector express to decide if the action on resource is success.
name: success-condition
type: string
- default: ''
description: A label selector express to decide if the action on resource is failure.
name: failure-condition
type: string
- default: index.docker.io/aipipeline/kubeclient:v0.0.2
description: Kubectl wrapper image
name: image
type: string
- default: 'false'
description: Enable set owner reference for created resource.
name: set-ownerreference
type: string
- name: size
results:
- description: '{}'
name: manifest
- description: '{.metadata.name}'
name: name
- description: '{.status.capacity.storage}'
name: size
steps:
- args:
- --action=$(params.action)
- --merge-strategy=$(params.merge-strategy)
- "--manifest=apiVersion: v1\nkind: PersistentVolumeClaim\nmetadata:\n name:\
\ $(PIPELINERUN)-my-pvc\nspec:\n accessModes:\n - ReadWriteOnce\n resources:\n\
\ requests:\n storage: $(inputs.params.size)\n"
- --output=$(params.output)
- --success-condition=$(params.success-condition)
- --failure-condition=$(params.failure-condition)
- --set-ownerreference=$(params.set-ownerreference)
env:
- name: PIPELINERUN
valueFrom:
fieldRef:
fieldPath: metadata.labels['tekton.dev/pipelineRun']
image: $(params.image)
name: create-pvc
resources: {}
---
apiVersion: tekton.dev/v1beta1
kind: Task
metadata:
name: cop
spec:
params:
- name: create-pvc-name
steps:
- args:
- echo foo > /mnt/file1
command:
- sh
- -c
image: library/bash:4.4.23
name: cop
volumeMounts:
- mountPath: /mnt
name: create-pvc
volumes:
- name: create-pvc
persistentVolumeClaim:
claimName: $(inputs.params.create-pvc-name)
---
apiVersion: tekton.dev/v1beta1
kind: Pipeline
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "A Basic Example on VolumeOp
Usage.", "inputs": [{"name": "size"}], "name": "VolumeOp Basic"}'
name: volumeop-basic
spec:
params:
- name: size
tasks:
- name: create-pvc
params:
- name: action
value: create
- name: output
value: "- name: manifest\n valueFrom: '{}'\n- name: name\n valueFrom: '{.metadata.name}'\n\
- name: size\n valueFrom: '{.status.capacity.storage}'\n"
- name: set-ownerreference
value: 'false'
- name: size
value: $(params.size)
taskRef:
name: create-pvc
- name: cop
params:
- name: create-pvc-name
value: $(tasks.create-pvc.results.name)
taskRef:
name: cop
Loading

0 comments on commit a47ae08

Please sign in to comment.