-
Notifications
You must be signed in to change notification settings - Fork 1
/
experiment_controller.go
76 lines (60 loc) · 2.57 KB
/
experiment_controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
package pipelines
import (
"context"
config "github.com/sky-uk/kfp-operator/apis/config/v1alpha5"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"time"
pipelinesv1 "github.com/sky-uk/kfp-operator/apis/pipelines/v1alpha5"
)
// ExperimentReconciler reconciles a Experiment object
type ExperimentReconciler struct {
StateHandler[*pipelinesv1.Experiment]
ResourceReconciler[*pipelinesv1.Experiment]
}
func NewExperimentReconciler(ec K8sExecutionContext, workflowRepository WorkflowRepository, config config.Configuration) *ExperimentReconciler {
return &ExperimentReconciler{
StateHandler: StateHandler[*pipelinesv1.Experiment]{
WorkflowRepository: workflowRepository,
WorkflowFactory: ExperimentWorkflowFactory(config),
},
ResourceReconciler: ResourceReconciler[*pipelinesv1.Experiment]{
EC: ec,
Config: config,
},
}
}
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=experiments,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=experiments/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=pipelines.kubeflow.org,resources=experiments/finalizers,verbs=update
//+kubebuilder:rbac:groups="",resources=events,verbs=create;patch
func (r *ExperimentReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
startTime := time.Now()
logger.V(2).Info("reconciliation started")
var experiment = &pipelinesv1.Experiment{}
if err := r.EC.Client.NonCached.Get(ctx, req.NamespacedName, experiment); err != nil {
logger.Error(err, "unable to fetch experiment")
return ctrl.Result{}, client.IgnoreNotFound(err)
}
logger.V(3).Info("found experiment", "resource", experiment)
desiredProvider := desiredProvider(experiment, r.Config)
commands := r.StateHandler.StateTransition(ctx, desiredProvider, experiment)
for i := range commands {
if err := commands[i].execute(ctx, r.EC, experiment); err != nil {
logger.Error(err, "error executing command", LogKeys.Command, commands[i])
return ctrl.Result{}, err
}
}
duration := time.Now().Sub(startTime)
logger.V(2).Info("reconciliation ended", LogKeys.Duration, duration)
return ctrl.Result{}, nil
}
func (r *ExperimentReconciler) SetupWithManager(mgr ctrl.Manager) error {
experiment := &pipelinesv1.Experiment{}
controllerBuilder := ctrl.NewControllerManagedBy(mgr).
For(experiment)
controllerBuilder = r.ResourceReconciler.setupWithManager(controllerBuilder, experiment)
return controllerBuilder.Complete(r)
}