-
Notifications
You must be signed in to change notification settings - Fork 1
/
runschedule_controller.go
76 lines (60 loc) · 2.6 KB
/
runschedule_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package pipelines
import (
"context"
config "github.com/sky-uk/kfp-operator/apis/config/v1alpha5"
"time"
pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha5"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// RunScheduleReconciler reconciles a RunSchedule object
type RunScheduleReconciler struct {
StateHandler[*pipelinesv1.RunSchedule]
ResourceReconciler[*pipelinesv1.RunSchedule]
}
func NewRunScheduleReconciler(ec K8sExecutionContext, workflowRepository WorkflowRepository, config config.Configuration) *RunScheduleReconciler {
return &RunScheduleReconciler{
StateHandler: StateHandler[*pipelinesv1.RunSchedule]{
WorkflowRepository: workflowRepository,
WorkflowFactory: RunScheduleWorkflowFactory(config),
},
ResourceReconciler: ResourceReconciler[*pipelinesv1.RunSchedule]{
EC: ec,
Config: config,
},
}
}
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=runschedules,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=runschedules/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=runschedules/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
func (r *RunScheduleReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
startTime := time.Now()
logger.V(2).Info("reconciliation started")
var runSchedule = &pipelinesv1.RunSchedule{}
if err := r.EC.Client.NonCached.Get(ctx, req.NamespacedName, runSchedule); err != nil {
logger.Error(err, "unable to fetch run schedule")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
logger.V(3).Info("found run schedule", "resource", runSchedule)
desiredProvider := desiredProvider(runSchedule, r.Config)
commands := r.StateHandler.StateTransition(ctx, desiredProvider, runSchedule)
for i := range commands {
if err := commands[i].execute(ctx, r.EC, runSchedule); err != nil {
logger.Error(err, "error executing command", LogKeys.Command, commands[i])
return ctrl.Result{}, err
}
}
duration := time.Now().Sub(startTime)
logger.V(2).Info("reconciliation ended", LogKeys.Duration, duration)
return ctrl.Result{}, nil
}
func (r *RunScheduleReconciler) SetupWithManager(mgr ctrl.Manager) error {
runSchedule := &pipelinesv1.RunSchedule{}
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
For(runSchedule)
controllerBuilder = r.ResourceReconciler.setupWithManager(controllerBuilder, runSchedule)
return controllerBuilder.Complete(r)
}