-
Notifications
You must be signed in to change notification settings - Fork 1
/
pipeline_controller.go
82 lines (65 loc) · 2.73 KB
/
pipeline_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package pipelines
import (
"context"
config "github.com/sky-uk/kfp-operator/apis/config/v1alpha5"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"time"
pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha5"
)
var (
workflowOwnerKey = ".metadata.controller"
apiGVStr = pipelinesv1.GroupVersion.String()
finalizerName = "finalizer.pipelines.kubeflow.org"
)
type PipelineReconciler struct {
StateHandler[*pipelinesv1.Pipeline]
ResourceReconciler[*pipelinesv1.Pipeline]
}
func NewPipelineReconciler(ec K8sExecutionContext, workflowRepository WorkflowRepository, config config.Configuration) *PipelineReconciler {
return &PipelineReconciler{
StateHandler: StateHandler[*pipelinesv1.Pipeline]{
WorkflowRepository: workflowRepository,
WorkflowFactory: PipelineWorkflowFactory(config),
},
ResourceReconciler: ResourceReconciler[*pipelinesv1.Pipeline]{
EC: ec,
Config: config,
},
}
}
//+kubebuilder:rbac:groups=argoproj.io,resources=workflows,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=pipelines,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=pipelines/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=pipelines/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
startTime := time.Now()
logger.V(2).Info("reconciliation started")
var pipeline = &pipelinesv1.Pipeline{}
if err := r.EC.Client.NonCached.Get(ctx, req.NamespacedName, pipeline); err != nil {
logger.Error(err, "unable to fetch pipeline")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
logger.V(3).Info("found pipeline", "resource", pipeline)
desiredProvider := desiredProvider(pipeline, r.Config)
commands := r.StateHandler.StateTransition(ctx, desiredProvider, pipeline)
for i := range commands {
if err := commands[i].execute(ctx, r.EC, pipeline); err != nil {
logger.Error(err, "error executing command", LogKeys.Command, commands[i])
return ctrl.Result{}, err
}
}
duration := time.Now().Sub(startTime)
logger.V(2).Info("reconciliation ended", LogKeys.Duration, duration)
return ctrl.Result{}, nil
}
func (r *PipelineReconciler) SetupWithManager(mgr ctrl.Manager) error {
pipeline := &pipelinesv1.Pipeline{}
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
For(pipeline)
controllerBuilder = r.ResourceReconciler.setupWithManager(controllerBuilder, pipeline)
return controllerBuilder.Complete(r)
}