-
Notifications
You must be signed in to change notification settings - Fork 1
/
workflow_repository.go
79 lines (65 loc) · 2.61 KB
/
workflow_repository.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
package pipelines
import (
"context"
argo "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
config "github.com/sky-uk/kfp-operator/apis/config/v1alpha5"
pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha5"
"github.com/sky-uk/kfp-operator/controllers"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
var WorkflowRepositoryConstants = struct {
WorkflowProcessedLabel string
}{
WorkflowProcessedLabel: pipelinesv1.GroupVersion.Group + "/processed",
}
type WorkflowRepository interface {
CreateWorkflowForResource(ctx context.Context, workflow *argo.Workflow, resource pipelinesv1.Resource) error
GetByLabels(ctx context.Context, matchingLabels map[string]string) []argo.Workflow
MarkWorkflowAsProcessed(ctx context.Context, workflow *argo.Workflow) error
}
type WorkflowRepositoryImpl struct {
Client controllers.OptInClient
Config config.Configuration
Scheme *runtime.Scheme
}
func (w WorkflowRepositoryImpl) CreateWorkflowForResource(ctx context.Context, workflow *argo.Workflow, resource pipelinesv1.Resource) error {
return w.Client.Create(ctx, workflow)
}
func (w WorkflowRepositoryImpl) GetByLabels(ctx context.Context, matchingLabels map[string]string) []argo.Workflow {
logger := log.FromContext(ctx)
var workflows argo.WorkflowList
sel := labels.NewSelector()
req, err := labels.NewRequirement(WorkflowRepositoryConstants.WorkflowProcessedLabel, selection.DoesNotExist, []string{})
if err != nil {
return []argo.Workflow{}
}
sel = sel.Add(*req)
for label, value := range matchingLabels {
req, err = labels.NewRequirement(label, selection.Equals, []string{value})
if err != nil {
return []argo.Workflow{}
}
sel = sel.Add(*req)
}
if err := w.Client.NonCached.List(ctx, &workflows, client.InNamespace(w.Config.WorkflowNamespace), client.MatchingLabelsSelector{Selector: sel}); err != nil {
logger.V(3).Error(err, "no matching workflows")
} else {
logger.V(3).Info("matching workflows", "workflows", workflows.Items)
}
return workflows.Items
}
func (w WorkflowRepositoryImpl) MarkWorkflowAsProcessed(ctx context.Context, workflow *argo.Workflow) error {
logger := log.FromContext(ctx)
logger.V(1).Info("marking child workflow as processed", LogKeys.Workflow, workflow)
workflowLabels := workflow.GetLabels()
if workflowLabels == nil {
workflowLabels = map[string]string{}
}
workflowLabels[WorkflowRepositoryConstants.WorkflowProcessedLabel] = "true"
workflow.SetLabels(workflowLabels)
return w.Client.Update(ctx, workflow)
}