Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix special compilation cases for kfp samples #91

Merged
merged 8 commits into from
Apr 20, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 55 additions & 50 deletions sdk/python/kfp_tekton/compiler/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,59 +382,64 @@ def _create_pipeline_workflow(self, args, pipeline, op_transformers=None, pipeli

# Generate pipelinerun if generate-pipelinerun flag is enabled
# The base templete is generated first and then insert optional parameters.
if self.generate_pipelinerun:
pipelinerun = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline_template['metadata']['name'] + '-run'
},
'spec': {
'params': [{
'name': p['name'],
'value': p.get('default', '')
} for p in pipeline_template['spec']['params']
],
'pipelineRef': {
'name': pipeline_template['metadata']['name']
# Wrapped in a try catch for when this method is called directly (e.g. there is no pipeline decorator)
try:
if self.generate_pipelinerun:
pipelinerun = {
'apiVersion': tekton_api_version,
'kind': 'PipelineRun',
'metadata': {
'name': pipeline_template['metadata']['name'] + '-run'
},
'spec': {
'params': [{
'name': p['name'],
'value': p.get('default', '')
} for p in pipeline_template['spec']['params']
],
'pipelineRef': {
'name': pipeline_template['metadata']['name']
}
}
}
}


pod_template = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op.affinity:
pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations
if op.node_selector:
pod_template['nodeSelector'] = op.node_selector

if pod_template:
pipelinerun['spec']['podtemplate'] = pod_template

# add workflow level timeout to pipeline run
if pipeline_conf.timeout:
pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout

# generate the Tekton service account template
service_template = {}
if len(pipeline_conf.image_pull_secrets) > 0:
service_template = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'}
}
for image_pull_secret in pipeline_conf.image_pull_secrets:
service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}]

if service_template:
workflow = workflow + [service_template]
pipelinerun['spec']['serviceAccountName'] = service_template['metadata']['name']

workflow = workflow + [pipelinerun]
pod_template = {}
for task in task_refs:
op = pipeline.ops.get(task['name'])
if op.affinity:
pod_template['affinity'] = convert_k8s_obj_to_json(op.affinity)
if op.tolerations:
pod_template['tolerations'] = pod_template.get('tolerations', []) + op.tolerations
if op.node_selector:
pod_template['nodeSelector'] = op.node_selector

if pod_template:
pipelinerun['spec']['podtemplate'] = pod_template

# add workflow level timeout to pipeline run
if pipeline_conf.timeout:
pipelinerun['spec']['timeout'] = '%ds' % pipeline_conf.timeout

# generate the Tekton service account template
service_template = {}
if len(pipeline_conf.image_pull_secrets) > 0:
service_template = {
'apiVersion': 'v1',
'kind': 'ServiceAccount',
'metadata': {'name': pipelinerun['metadata']['name'] + '-sa'}
}
for image_pull_secret in pipeline_conf.image_pull_secrets:
service_template['imagePullSecrets'] = [{'name': image_pull_secret.name}]

if service_template:
workflow = workflow + [service_template]
pipelinerun['spec']['serviceAccountName'] = service_template['metadata']['name']

workflow = workflow + [pipelinerun]
except:
# Intentionally do nothing for when _create_pipeline_workflow is called directly (e.g. in the case of there
# being no pipeline decorator) and self.generate_pipeline is not set
pass

# Use regex to replace all the Argo variables to Tekton variables. For variables that are unique to Argo,
# we raise an Error to alert users about the unsupported variables. Here is the list of Argo variables.
Expand Down
13 changes: 13 additions & 0 deletions sdk/python/tests/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
pipeline: compose.py
type: nested
components:
- name: save_most_frequent_word
- name: download_save_most_frequent_word
---
pipeline: basic_no_decorator.py
type: no_decorator
components:
function: save_most_frequent_word
name: 'Save Most Frequent'
description: 'Get Most Frequent Word and Save to GCS'
paramsList: ["message_param", "output_path_param"]
17 changes: 13 additions & 4 deletions sdk/python/tests/test_kfp_samples.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ KFP_TESTDATA_DIR="${KFP_CLONE_DIR}/sdk/python/tests/compiler/testdata"
TEKTON_COMPILED_YAML_DIR="${TEMP_DIR}/tekton_compiler_output"
COMPILE_REPORT_FILE="${PROJECT_DIR}/sdk/python/tests/test_kfp_samples_report.txt"
COMPILER_OUTPUTS_FILE="${TEMP_DIR}/test_kfp_samples_output.txt"
CONFIG_FILE="${PROJECT_DIR}/sdk/python/tests/config.yaml"

mkdir -p "${TEMP_DIR}"
mkdir -p "${TEKTON_COMPILED_YAML_DIR}"
Expand Down Expand Up @@ -70,14 +71,22 @@ cp "${COMPILE_REPORT_FILE}" "${COMPILE_REPORT_FILE_OLD}"
# delete the previous compiler output file
rm -f "${COMPILER_OUTPUTS_FILE}"

# check which pipelines have special configurations
PIPELINES=$(awk '/pipeline:/{print $NF}' ${CONFIG_FILE})

# compile each of the Python scripts in the KFP testdata folder
for f in "${KFP_TESTDATA_DIR}"/*.py; do
echo -e "\nCompiling ${f##*/}:" >> "${COMPILER_OUTPUTS_FILE}"
if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;
then
echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
IS_SPECIAL=$(grep -E ${f##*/} <<< ${PIPELINES})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the test_kfp_samples.sh script, why do we not just generate PipelineRun for all testdata scripts and only change one line:

  if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;

to

  if dsl-compile-tekton --generate-pipelinerun --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;

There is no need or benefit to have a mix here, since we will catch the NotImplementedErrors either way.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I do not understand, but wouldn't this still fail to compile basic_no_decorator.py and compose.py?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Andrew, yes, I wrote that comment before understanding the full purpose of your test_util.py. But let me explain the my bigger point below

if [ -z "${IS_SPECIAL}" ]; then
if dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1;
then
echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
else
echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
fi
else
echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
python3 -m test_util ${f} ${CONFIG_FILE} | grep -E 'SUCCESS:|FAILURE:'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should move up so we can capture the entire output of the compile run in the ${COMPILER_OUTPUTS_FILE}, not just the SUCCESS or FAILURE. The python3 command should return an exit value of 0 for success and 1 for failure

Maybe we could use a ternary stile approach:

    if ([ -z "${IS_SPECIAL}" ] \
        && dsl-compile-tekton --py "${f}" --output "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" \
        || python3 -m test_util ${f} ${CONFIG_FILE} ) >> "${COMPILER_OUTPUTS_FILE}" 2>&1 ;
    then
      echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
    else
      echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
    fi

...or better.. wrap the compile part into a bash function and inside that function check for IS_SPECIAL:

function compile_dsl {
  if [ -z "${IS_SPECIAL}" ]; then
    dsl-compile-tekton --py "$1" --output "$2"
  else
    python3 -m test_util $1 ${CONFIG_FILE} # need to produce YAML file output
  fi
} 

# ...

    if compile_dsl "${f}" "${TEKTON_COMPILED_YAML_DIR}/${f##*/}.yaml" >> "${COMPILER_OUTPUTS_FILE}" 2>&1 ;
    then
      echo "SUCCESS: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
    else
      echo "FAILURE: ${f##*/}" | tee -a "${COMPILER_OUTPUTS_FILE}"
    fi

fi
done | tee "${COMPILE_REPORT_FILE}"

Expand Down
4 changes: 2 additions & 2 deletions sdk/python/tests/test_kfp_samples_report.txt
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
SUCCESS: add_pod_env.py
SUCCESS: artifact_location.py
SUCCESS: basic.py
FAILURE: basic_no_decorator.py
SUCCESS: basic_no_decorator.py
FAILURE: coin.py
FAILURE: compose.py
SUCCESS: compose.py
SUCCESS: default_value.py
FAILURE: input_artifact_raw_value.py
FAILURE: loop_over_lightweight_output.py
Expand Down
102 changes: 102 additions & 0 deletions sdk/python/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
#!/bin/bash
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we not move these tests into our standard compiler test suite?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move the two functions test_workflow_without_decorator and test_nested_workflow into the actual compiler unit test and have the test_util.py call them and capture the output YAML into a file, like dsl-compile does


# Copyright 2020 kubeflow.org
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import shutil
import zipfile
import yaml
import tempfile
import importlib
import kfp_tekton.compiler as compiler
import filecmp

def _get_yaml_from_zip(zip_file):
with zipfile.ZipFile(zip_file, 'r') as zip:
with open(zip.extract(zip.namelist()[0]), 'r') as yaml_file:
return list(yaml.safe_load_all(yaml_file))

def get_config(config_path):
with open(config_path) as file:
return list(yaml.safe_load_all(file))

def get_params_from_config(pipeline_name, config_path):
pipelines = get_config(config_path)

for pipeline in pipelines:
if pipeline_name == pipeline["pipeline"]:
return pipeline

def test_workflow_without_decorator(pipeline_mod, params_dict):
"""Test compiling a workflow and appending pipeline params."""

try:
pipeline_params = []
for param in params_dict.get('paramsList', []):
pipeline_params.append(getattr(pipeline_mod, param))

compiled_workflow = compiler.TektonCompiler()._create_workflow(
getattr(pipeline_mod,params_dict['function']),
params_dict.get('name', None),
params_dict.get('description', None),
pipeline_params if pipeline_params else None,
params_dict.get('conf', None))
return True
except :
return False

def test_nested_workflow(pipeline_mod, pipeline_list):
"""Test compiling a simple workflow, and a bigger one composed from the simple one."""

tmpdir = tempfile.mkdtemp()
try:
for pipeline in pipeline_list:
pipeline_name = pipeline['name']
package_path = os.path.join(tmpdir, pipeline_name + '.zip')
compiler.TektonCompiler().compile(getattr(pipeline_mod, pipeline_name), package_path)
return True
except:
return False


if __name__ == '__main__':
test_data_path = sys.argv[1]
config_path = sys.argv[2]
did_compile = False

# Import pipeline
test_data_dir, test_data_file = os.path.split(test_data_path)
import_name, test_data_ext = os.path.splitext(test_data_file)
sys.path.append(test_data_dir)
pipeline_mod = importlib.import_module(import_name)

# Get the pipeline specific parameters from the config file
params = get_params_from_config(test_data_file, config_path)
if params == None:
raise ValueError('No pipeline matches available in the config file')
test_type = params['type']

if test_type == 'nested':
did_compile = test_nested_workflow(pipeline_mod, params['components'])
elif test_type == 'no_decorator':
did_compile = test_workflow_without_decorator(pipeline_mod, params['components'])
else:
raise ValueError('Pipeline type \''+test_type+'\' is not recognized')

if did_compile:
print("SUCCESS:", test_data_file)
else:
print("FAILURE:", test_data_file)