Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Commit 8ae84a7

Browse files
committed
Cleanup
1 parent a0db8fb commit 8ae84a7

File tree

4 files changed

+47
-55
lines changed

4 files changed

+47
-55
lines changed

cmd/main.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -105,21 +105,24 @@ func main() {
105105
if err = (&controller.AppWrapperReconciler{
106106
Client: mgr.GetClient(),
107107
Scheme: mgr.GetScheme(),
108-
Events: make(chan event.GenericEvent, 1), // channel to trigger dispatchNext
108+
Events: make(chan event.GenericEvent, 1), // channel to trigger dispatch
109109
Cache: map[types.UID]*controller.CachedAppWrapper{}, // AppWrapper cache
110110
Mode: mode, // default, dispatcher, runner
111111
}).SetupWithManager(mgr); err != nil {
112112
setupLog.Error(err, "unable to create controller", "controller", "AppWrapper")
113113
os.Exit(1)
114114
}
115-
if err = (&controller.ClusterInfoReconciler{
116-
Client: mgr.GetClient(),
117-
Scheme: mgr.GetScheme(),
118-
Mode: mode, // default, dispatcher, runner
119-
}).SetupWithManager(mgr); err != nil {
120-
setupLog.Error(err, "unable to create controller", "controller", "ClusterInfo")
121-
os.Exit(1)
115+
116+
if mode != "dispatcher" {
117+
if err = (&controller.ClusterInfoReconciler{
118+
Client: mgr.GetClient(),
119+
Scheme: mgr.GetScheme(),
120+
}).SetupWithManager(mgr); err != nil {
121+
setupLog.Error(err, "unable to create controller", "controller", "ClusterInfo")
122+
os.Exit(1)
123+
}
122124
}
125+
123126
//+kubebuilder:scaffold:builder
124127

125128
if err := mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil {

internal/controller/appwrapper_controller.go

Lines changed: 35 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -58,51 +58,21 @@ const (
5858
// Reconcile one AppWrapper or dispatch next AppWrapper
5959
// Normal reconciliations "namespace/name" implement all phase transitions except for Queued->Dispatching
6060
// Queued->Dispatching transitions happen as part of a special "*/*" reconciliation
61-
// In a "*/*" reconciliation, we first decide the AppWrapper to dispatch (dispatchNext)
62-
// We then dispatch this AppWrapper (Queued->Dispatching transition)
6361
func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
6462
log := log.FromContext(ctx)
6563

6664
// log.Info("Reconcile")
6765

68-
// handle dispatching first
69-
70-
// req == "*/*", find the next AppWrapper to dispatch and dispatch this AppWrapper
71-
if req.Name == "*" {
66+
// req == "*/*", attempt to select and dispatch one appWrapper
67+
if req.Namespace == "*" && req.Name == "*" {
7268
if r.Mode == "runner" {
7369
return ctrl.Result{}, nil
7470
}
75-
appWrapper, last, err := r.selectForDispatch(ctx) // last == is last appWrapper in queue?
76-
if err != nil {
77-
return ctrl.Result{}, err
78-
}
79-
if appWrapper == nil { // no appWrapper eligible for dispatch
80-
return ctrl.Result{RequeueAfter: dispatchDelay}, nil // retry to dispatch later
81-
}
82-
// abort and requeue reconciliation if reconciler cache is stale
83-
if err := r.checkCachedPhase(appWrapper); err != nil {
84-
return ctrl.Result{}, err
85-
}
86-
if appWrapper.Status.Phase != mcadv1beta1.Queued {
87-
// this check should be redundant but better be defensive
88-
return ctrl.Result{}, errors.New("not queued")
89-
}
90-
// set dispatching timestamp and status
91-
appWrapper.Status.LastDispatchTime = metav1.Now()
92-
if _, err := r.updateStatus(ctx, appWrapper, mcadv1beta1.Dispatching); err != nil {
93-
return ctrl.Result{}, err
94-
}
95-
if last {
96-
return ctrl.Result{RequeueAfter: dispatchDelay}, nil // retry to dispatch later
97-
}
98-
return ctrl.Result{Requeue: true}, nil // requeue to continue to dispatch queued appWrappers
71+
return r.dispatch(ctx)
9972
}
10073

101-
// normal reconciliation starts here
102-
103-
appWrapper := &mcadv1beta1.AppWrapper{}
104-
10574
// get deep copy of AppWrapper object in reconciler cache
75+
appWrapper := &mcadv1beta1.AppWrapper{}
10676
if err := r.Get(ctx, req.NamespacedName, appWrapper); err != nil {
10777
// no such AppWrapper, nothing to reconcile, not an error
10878
return ctrl.Result{}, nil
@@ -113,7 +83,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
11383
return ctrl.Result{}, err
11484
}
11585

116-
// sync status
86+
// overwrite status with dispatcher status if more recent
11787
if len(appWrapper.Spec.DispatcherStatus.Transitions) > len(appWrapper.Status.Transitions) {
11888
appWrapper.Status = appWrapper.Spec.DispatcherStatus
11989
if err := r.Status().Update(ctx, appWrapper); err != nil {
@@ -123,7 +93,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
12393
return ctrl.Result{}, nil
12494
}
12595

126-
// first handle deletion
96+
// handle deletion
12797
if !appWrapper.DeletionTimestamp.IsZero() && appWrapper.Status.Phase != mcadv1beta1.Deleted {
12898
if r.Mode == "dispatcher" {
12999
return ctrl.Result{}, nil
@@ -136,7 +106,6 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
136106
}
137107

138108
// handle all other phases including the default empty phase
139-
140109
switch appWrapper.Status.Phase {
141110
case mcadv1beta1.Deleted:
142111
if r.Mode == "runner" {
@@ -270,7 +239,7 @@ func (r *AppWrapperReconciler) podMapFunc(ctx context.Context, obj client.Object
270239
return nil
271240
}
272241

273-
// Trigger dispatchNext on cluster capacity change
242+
// Trigger dispatch on cluster capacity change
274243
func (r *AppWrapperReconciler) clusterInfoMapFunc(ctx context.Context, obj client.Object) []reconcile.Request {
275244
r.triggerDispatch()
276245
return nil
@@ -321,3 +290,31 @@ func (r *AppWrapperReconciler) triggerDispatch() {
321290
default:
322291
}
323292
}
293+
294+
// Attempt to select and dispatch one appWrapper
295+
func (r *AppWrapperReconciler) dispatch(ctx context.Context) (ctrl.Result, error) {
296+
appWrapper, last, err := r.selectForDispatch(ctx) // last == is last appWrapper in queue?
297+
if err != nil {
298+
return ctrl.Result{}, err
299+
}
300+
if appWrapper == nil { // no appWrapper eligible for dispatch
301+
return ctrl.Result{RequeueAfter: dispatchDelay}, nil // retry to dispatch later
302+
}
303+
// abort and requeue reconciliation if reconciler cache is stale
304+
if err := r.checkCachedPhase(appWrapper); err != nil {
305+
return ctrl.Result{}, err
306+
}
307+
if appWrapper.Status.Phase != mcadv1beta1.Queued {
308+
// this check should be redundant but better be defensive
309+
return ctrl.Result{}, errors.New("not queued")
310+
}
311+
// set dispatching timestamp and status
312+
appWrapper.Status.LastDispatchTime = metav1.Now()
313+
if _, err := r.updateStatus(ctx, appWrapper, mcadv1beta1.Dispatching); err != nil {
314+
return ctrl.Result{}, err
315+
}
316+
if last {
317+
return ctrl.Result{RequeueAfter: dispatchDelay}, nil // retry to dispatch later
318+
}
319+
return ctrl.Result{Requeue: true}, nil // requeue to continue to dispatch queued appWrappers
320+
}

internal/controller/clusterinfo_controller.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ type ClusterInfoReconciler struct {
3535
client.Client
3636
Scheme *runtime.Scheme
3737
NextSync time.Time // when to refresh cluster capacity
38-
Mode string // default, dispatcher, runner
3938
}
4039

4140
//+kubebuilder:rbac:groups=mcad.codeflare.dev,resources=clusterinfoes,verbs=get;list;watch;create;update;patch;delete
@@ -95,9 +94,6 @@ func (r *ClusterInfoReconciler) Reconcile(ctx context.Context, req ctrl.Request)
9594

9695
// SetupWithManager sets up the controller with the Manager.
9796
func (r *ClusterInfoReconciler) SetupWithManager(mgr ctrl.Manager) error {
98-
if r.Mode == "dispatcher" {
99-
return nil
100-
}
10197
return ctrl.NewControllerManagedBy(mgr).
10298
For(&mcadv1beta1.ClusterInfo{}).
10399
Complete(r)

internal/controller/timings.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,9 @@ const (
3030
cacheConflictTimeout = 5 * time.Minute // minimum wait before invalidating the cache
3131
clusterInfoTimeout = time.Minute // how long to cache cluster capacity
3232

33-
// Cluster capacity is only refreshed when trying to dispatch AppWrappers and only after
34-
// the previous measurement has timed out, so it is necessary to call dispatchNext on
35-
// a regular basis (we do) to ensure we detect new capacity (such as new schedulable nodes)
36-
3733
// RequeueAfter delays
3834
runDelay = time.Minute // maximum delay before next reconciliation of a Running AppWrapper
39-
dispatchDelay = time.Minute // maximum delay before next "*/*" reconciliation (dispatchNext)
35+
dispatchDelay = time.Minute // maximum delay before next "*/*" reconciliation (dispatch)
4036

4137
// The RequeueAfter delay is the maximum delay before the next reconciliation event.
4238
// Reconciliation may be triggered earlier due for instance to pod phase changes.

0 commit comments

Comments
 (0)