-
Notifications
You must be signed in to change notification settings - Fork 1
/
workflow_utils.go
101 lines (80 loc) · 2.7 KB
/
workflow_utils.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package pipelines
import (
"fmt"
argo "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
providers "github.com/sky-uk/kfp-operator/argo/providers/base"
"gopkg.in/yaml.v2"
)
var mapParams = func(params []argo.Parameter) map[string]string {
m := make(map[string]string, len(params))
for i := range params {
m[params[i].Name] = string(*params[i].Value)
}
return m
}
func getWorkflowParameter(workflow *argo.Workflow, name string) string {
for _, parameter := range workflow.Spec.Arguments.Parameters {
if parameter.Name == name {
return parameter.Value.String()
}
}
return ""
}
func getWorkflowOutput(workflow *argo.Workflow, key string) (providers.Output, error) {
output := providers.Output{}
entrypointNode, exists := workflow.Status.Nodes[workflow.Name]
if !exists || entrypointNode.Outputs == nil {
return output, fmt.Errorf("workflow does not have %s node", workflow.Name)
}
yamlOutput := []byte(mapParams(entrypointNode.Outputs.Parameters)[key])
err := yaml.Unmarshal(yamlOutput, &output)
return output, err
}
func setWorkflowProvider(workflow *argo.Workflow, provider string) *argo.Workflow {
workflow.Spec.Arguments.Parameters = append(workflow.Spec.Arguments.Parameters, argo.Parameter{Name: WorkflowConstants.ProviderNameParameterName, Value: argo.AnyStringPtr(provider)})
return workflow
}
func setWorkflowOutputs(workflow *argo.Workflow, parameters []argo.Parameter) *argo.Workflow {
nodes := make(map[string]argo.NodeStatus)
nodes[workflow.Name] = argo.NodeStatus{
Outputs: &argo.Outputs{
Parameters: parameters,
},
}
workflow.Status.Nodes = nodes
return workflow
}
func setProviderOutput(workflow *argo.Workflow, output providers.Output) *argo.Workflow {
return setWorkflowOutputs(
workflow,
[]argo.Parameter{
{
Name: WorkflowConstants.ProviderOutputParameterName,
Value: argo.AnyStringPtr("id: " + output.Id + "\nproviderError: " + output.ProviderError),
},
},
)
}
func latestWorkflow(workflow1 *argo.Workflow, workflow2 *argo.Workflow) *argo.Workflow {
if workflow1 == nil {
return workflow2
} else if workflow2 == nil || workflow2.ObjectMeta.CreationTimestamp.Before(&workflow1.ObjectMeta.CreationTimestamp) {
return workflow1
} else {
return workflow2
}
}
func latestWorkflowByPhase(workflows []argo.Workflow) (inProgress *argo.Workflow, succeeded *argo.Workflow, failed *argo.Workflow) {
for i := range workflows {
workflow := workflows[i]
switch workflow.Status.Phase {
case argo.WorkflowFailed, argo.WorkflowError:
failed = latestWorkflow(failed, &workflow)
case argo.WorkflowSucceeded:
succeeded = latestWorkflow(succeeded, &workflow)
default:
inProgress = latestWorkflow(inProgress, &workflow)
}
}
return
}