Skip to content

Commit

Permalink
Add alpha feature to output big data passing file path instead of tas…
Browse files Browse the repository at this point in the history
…k run name (kubeflow#993)

* stage

* regenerate tests

* regenerate tests

* update comments

* resolve conflicts

* fix immediate bug

* parametize path suffix name
  • Loading branch information
Tomcli committed Jul 19, 2022
1 parent 26b4455 commit e2aa560
Showing 1 changed file with 56 additions and 22 deletions.
78 changes: 56 additions & 22 deletions sdk/python/kfp_tekton/compiler/_data_passing_rewriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
BIG_DATA_MIDPATH = "artifacts/$ORIG_PR_NAME"
BIG_DATA_PATH_FORMAT = "/".join(["$(workspaces.$TASK_NAME.path)", BIG_DATA_MIDPATH, "$TASKRUN_NAME", "$TASK_PARAM_NAME"])
ARTIFACT_OUTPUTLIST_ANNOTATION_KEY = 'artifact_outputs'
OUTPUT_RESULT_PATH_SUFFIX = '-datap'


def fix_big_data_passing(workflow: dict, loops_pipeline: dict, loop_name_prefix: str) -> dict:
Expand Down Expand Up @@ -324,6 +325,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
), # TODO: pipeline has no name, use pipelineRun name?
input_parameter['name']) in inputs_consumed_as_parameters
or input_parameter['name'].endswith("-trname")
or input_parameter['name'].endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# Remove output parameters unless they're used downstream
Expand Down Expand Up @@ -363,6 +365,7 @@ def mark_upstream_ios_of_output(template_output, marked_inputs,
parameter_argument['name']) not in inputs_consumed_as_artifacts
or task['name'] in resource_template_names
or parameter_argument['name'].endswith("-trname")
or parameter_argument['name'].endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# tekton results doesn't support underscore
Expand Down Expand Up @@ -506,6 +509,7 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
task_spec = task.get('taskSpec', {})
# Data passing for the task outputs
appended_taskrun_name = False
appended_taskrun_path_step = None
artifact_output_list = task_spec.get('metadata', {}).get('annotations', {}).get(ARTIFACT_OUTPUTLIST_ANNOTATION_KEY, '')
if artifact_output_list:
temp_list = json.loads(artifact_output_list)
Expand All @@ -524,21 +528,36 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
task_output.get('name')))
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_output.get('name'))
# For child nodes to know the taskrun name, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_name:
copy_taskrun_name_step = _get_base_step('output-taskrun-name')
copy_taskrun_name_step['command'].append('echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"')
task['taskSpec']['results'].append({"name": "taskrun-name", "type": "string"})
task['taskSpec']['steps'].append(copy_taskrun_name_step)
_append_original_pr_name_env(task)
appended_taskrun_name = True
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
# For child nodes to know the taskrun output path, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_path_step:
appended_taskrun_path_step = _get_base_step('output-taskrun-path')
if len(appended_taskrun_path_step['command']) <= 2:
appended_taskrun_path_step['command'].append('')
appended_taskrun_path_step['command'][-1] += 'echo -n "%s/%s/%s" > $(results.%s%s.path)\n' % \
(BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_output.get('name'),
task_output.get('name'), OUTPUT_RESULT_PATH_SUFFIX)
task['taskSpec']['results'].append({"name": "%s%s" % (task_output.get('name'), OUTPUT_RESULT_PATH_SUFFIX),
"type": "string"})
else:
# For child nodes to know the taskrun name, it has to pass to results via /tekton/results emptydir
if not appended_taskrun_name:
copy_taskrun_name_step = _get_base_step('output-taskrun-name')
copy_taskrun_name_step['command'].append('echo -n "$(context.taskRun.name)" > "$(results.taskrun-name.path)"')
task['taskSpec']['results'].append({"name": "taskrun-name", "type": "string"})
task['taskSpec']['steps'].append(copy_taskrun_name_step)
_append_original_pr_name_env(task)
appended_taskrun_name = True
task['taskSpec'] = replace_big_data_placeholder(
task.get("taskSpec", {}), placeholder, workspaces_parameter)
artifact_items = pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items']
artifact_items[task['name']] = replace_big_data_placeholder(
artifact_items[task['name']], placeholder, workspaces_parameter)
pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items'] = \
artifact_items
if appended_taskrun_path_step:
task['taskSpec']['steps'].append(appended_taskrun_path_step)
_append_original_pr_name_env(task)

task_spec = task.get('taskSpec', {})
task_params = task_spec.get('params', [])
Expand Down Expand Up @@ -566,34 +585,48 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
# If the param name is constructed with task_name-param_name,
# use the current task_name as the path prefix

def append_taskrun_params(task_name_append: str):
def append_taskrun_params(task_name_append: str, task_path_name: str):
taskrun_param_name = task_name_append + "-trname"
inserted_taskrun_param = False
for param in task['taskSpec'].get('params', []):
if param.get('name', "") == taskrun_param_name:
inserted_taskrun_param = True
break
if not inserted_taskrun_param:
task['taskSpec']['params'].append({"name": taskrun_param_name})
task['params'].append({"name": taskrun_param_name, "value": "$(tasks.%s.results.taskrun-name)" % task_name_append})
param_name = {"name": taskrun_param_name}
param_nested_name = task_name_append + '-taskrun-name'
param_content = {"name": taskrun_param_name, "value": "$(tasks.%s.results.taskrun-name)" % task_name_append}
param_nested_content = {'name': task_name_append + '-taskrun-name',
'value': '$(tasks.%s.results.taskrun-name)' % task_name_append}
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
param_path_name = '-'.join([task_name_append, task_path])
param_name = {"name": param_path_name}
param_nested_name = param_path_name
param_content = {"name": param_path_name,
"value": "$(tasks.%s.results.%s)" % (task_name_append, task_path_name)}
param_nested_content = param_content
else:
for param in task['taskSpec'].get('params', []):
if param.get('name', "") == taskrun_param_name:
inserted_taskrun_param = True
break
if (not inserted_taskrun_param) or env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
task['taskSpec']['params'].append(param_name)
task['params'].append(param_content)
parent_task_queue = [task['name']]
while parent_task_queue:
current_task = parent_task_queue.pop(0)
for loop_name, loop_spec in loops_pipeline.items():
# print(loop_name, loop_spec)
if current_task in loop_spec.get('task_list', []):
parent_task_queue.append(loop_name.replace(loop_name_prefix, ""))
loop_param_names = [loop_param['name'] for loop_param in loops_pipeline[loop_name]['spec']['params']]
if task_name_append + '-taskrun-name' in loop_param_names:
if param_nested_name in loop_param_names:
continue
loops_pipeline[loop_name]['spec']['params'].append({'name': task_name_append + '-taskrun-name',
'value': '$(tasks.%s.results.taskrun-name)' % task_name_append})
loops_pipeline[loop_name]['spec']['params'].append(param_nested_content)

if task_param_task_name:
workspaces_parameter = '$(workspaces.%s.path)/%s/$(params.%s-trname)/%s' % (
task_name, BIG_DATA_MIDPATH, task_param_task_name, task_param_param_name)
task_path = sanitize_k8s_name(task_param_param_name) + OUTPUT_RESULT_PATH_SUFFIX
if env.get('OUTPUT_BIG_DATA_PATH', 'false').lower() == 'true':
workspaces_parameter = '$(workspaces.%s.path)/$(params.%s)' % (task_name, '-'.join([task_param_task_name, task_path]))
if task_param_task_name != task_name:
append_taskrun_params(task_param_task_name) # need to get taskrun name from parent path
append_taskrun_params(task_param_task_name, task_path) # need to get taskrun name from parent path
else:
workspaces_parameter = '$(workspaces.%s.path)/%s/%s/%s' % (
task_name, BIG_DATA_MIDPATH, "$(context.taskRun.name)", task_param.get('name'))
Expand Down Expand Up @@ -666,7 +699,8 @@ def append_taskrun_params(task_name_append: str):
task.get("taskSpec", {})['params'] = [
param for param in task_spec.get('params', [])
if (task_name, param.get('name')) not in inputs_tasks or
param.get('name').endswith("-trname")
param.get('name').endswith("-trname") or
param.get('name').endswith(OUTPUT_RESULT_PATH_SUFFIX)
]

# Remove artifacts from task_spec
Expand Down

0 comments on commit e2aa560

Please sign in to comment.