Skip to content
This repository was archived by the owner on Feb 18, 2025. It is now read-only.

Commit 18d2ba1

Browse files
authored
Implement Queued condition on AppWrapper Status (#101)
1 parent 5453333 commit 18d2ba1

File tree

12 files changed

+217
-41
lines changed

12 files changed

+217
-41
lines changed

.golangci.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ linters:
3131
- errcheck
3232
- exportloopref
3333
- goconst
34-
- gocyclo
3534
- gofmt
3635
- goimports
3736
- gosimple

api/v1beta1/appwrapper_types.go

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,9 @@ type AppWrapperStatus struct {
115115

116116
// Number of transitions
117117
TransitionCount int32 `json:"transitionCount,omitempty"`
118+
119+
// Conditions
120+
Conditions []metav1.Condition `json:"conditions,omitempty"`
118121
}
119122

120123
// AppWrapperState is the label for the AppWrapper status
@@ -123,6 +126,9 @@ type AppWrapperState string
123126
// AppWrapperState is the status of wrapped resources
124127
type AppWrapperStep string
125128

129+
// AppWrapperQueuedReason is the Type for the Queued Condition
130+
type AppWrapperQueuedReason string
131+
126132
const (
127133
// Initial state upon creation of the AppWrapper object
128134
Empty AppWrapperState = ""
@@ -150,6 +156,18 @@ const (
150156

151157
// MCAD is in the process of deleting the wrapped resources
152158
Deleting AppWrapperStep = "deleting"
159+
160+
// Queued because of insufficient available resources
161+
QueuedInsufficientResources = "InsufficientResources"
162+
163+
// Queued because of insufficient available quota
164+
QueuedInsufficientQuota = "InsufficientQuota"
165+
166+
// Queued because it was Requeued
167+
QueuedRequeue = "Requeued"
168+
169+
// Not Queued because it was Dispatched
170+
QueuedDispatch = "Dispatched"
153171
)
154172

155173
// AppWrapperService

api/v1beta1/zz_generated.deepcopy.go

Lines changed: 7 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/main.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,10 +95,11 @@ func main() {
9595
}
9696

9797
if err = (&controller.AppWrapperReconciler{
98-
Client: mgr.GetClient(),
99-
Scheme: mgr.GetScheme(),
100-
Cache: map[types.UID]*controller.CachedAppWrapper{}, // AppWrapper cache
101-
Events: make(chan event.GenericEvent, 1), // channel to trigger dispatch
98+
Client: mgr.GetClient(),
99+
Scheme: mgr.GetScheme(),
100+
Cache: map[types.UID]*controller.CachedAppWrapper{}, // AppWrapper cache
101+
Events: make(chan event.GenericEvent, 1), // channel to trigger dispatch
102+
Decisions: map[types.UID]*controller.QueuingDecision{}, // cache of recent queuing decisions
102103
}).SetupWithManager(mgr); err != nil {
103104
setupLog.Error(err, "unable to create controller", "controller", "AppWrapper")
104105
os.Exit(1)

config/crd/bases/workload.codeflare.dev_appwrappers.yaml

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -596,6 +596,75 @@ spec:
596596
status:
597597
description: AppWrapperStatus defines the observed state of AppWrapper
598598
properties:
599+
conditions:
600+
description: Conditions
601+
items:
602+
description: "Condition contains details for one aspect of the current
603+
state of this API Resource. --- This struct is intended for direct
604+
use as an array at the field path .status.conditions. For example,
605+
\n type FooStatus struct{ // Represents the observations of a
606+
foo's current state. // Known .status.conditions.type are: \"Available\",
607+
\"Progressing\", and \"Degraded\" // +patchMergeKey=type // +patchStrategy=merge
608+
// +listType=map // +listMapKey=type Conditions []metav1.Condition
609+
`json:\"conditions,omitempty\" patchStrategy:\"merge\" patchMergeKey:\"type\"
610+
protobuf:\"bytes,1,rep,name=conditions\"` \n // other fields }"
611+
properties:
612+
lastTransitionTime:
613+
description: lastTransitionTime is the last time the condition
614+
transitioned from one status to another. This should be when
615+
the underlying condition changed. If that is not known, then
616+
using the time when the API field changed is acceptable.
617+
format: date-time
618+
type: string
619+
message:
620+
description: message is a human readable message indicating
621+
details about the transition. This may be an empty string.
622+
maxLength: 32768
623+
type: string
624+
observedGeneration:
625+
description: observedGeneration represents the .metadata.generation
626+
that the condition was set based upon. For instance, if .metadata.generation
627+
is currently 12, but the .status.conditions[x].observedGeneration
628+
is 9, the condition is out of date with respect to the current
629+
state of the instance.
630+
format: int64
631+
minimum: 0
632+
type: integer
633+
reason:
634+
description: reason contains a programmatic identifier indicating
635+
the reason for the condition's last transition. Producers
636+
of specific condition types may define expected values and
637+
meanings for this field, and whether the values are considered
638+
a guaranteed API. The value should be a CamelCase string.
639+
This field may not be empty.
640+
maxLength: 1024
641+
minLength: 1
642+
pattern: ^[A-Za-z]([A-Za-z0-9_,:]*[A-Za-z0-9_])?$
643+
type: string
644+
status:
645+
description: status of the condition, one of True, False, Unknown.
646+
enum:
647+
- "True"
648+
- "False"
649+
- Unknown
650+
type: string
651+
type:
652+
description: type of condition in CamelCase or in foo.example.com/CamelCase.
653+
--- Many .condition.type values are consistent across resources
654+
like Available, but because arbitrary conditions can be useful
655+
(see .node.status.conditions), the ability to deconflict is
656+
important. The regex it matches is (dns1123SubdomainFmt/)?(qualifiedNameFmt)
657+
maxLength: 316
658+
pattern: ^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*/)?(([A-Za-z0-9][-A-Za-z0-9_.]*)?[A-Za-z0-9])$
659+
type: string
660+
required:
661+
- lastTransitionTime
662+
- message
663+
- reason
664+
- status
665+
- type
666+
type: object
667+
type: array
599668
dispatchTimestamp:
600669
description: When last dispatched
601670
format: date-time

internal/controller/appwrapper_controller.go

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ package controller
1919
import (
2020
"context"
2121
"errors"
22+
"fmt"
2223
"strconv"
2324
"time"
2425

2526
batchv1 "k8s.io/api/batch/v1"
2627
v1 "k8s.io/api/core/v1"
28+
"k8s.io/apimachinery/pkg/api/meta"
2729
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2830
"k8s.io/apimachinery/pkg/runtime"
2931
"k8s.io/apimachinery/pkg/types"
@@ -47,6 +49,7 @@ type AppWrapperReconciler struct {
4749
Events chan event.GenericEvent // event channel to trigger dispatch
4850
ClusterCapacity Weights // cluster capacity available to MCAD
4951
NextSync time.Time // when to refresh cluster capacity
52+
Decisions map[types.UID]*QueuingDecision // transient log of queuing decisions to enable recording in AppWrapper Status
5053
}
5154

5255
const (
@@ -124,6 +127,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
124127
}
125128
// remove AppWrapper from cache
126129
r.deleteCachedAW(appWrapper)
130+
delete(r.Decisions, appWrapper.UID)
127131
log.FromContext(ctx).Info("Deleted")
128132
return ctrl.Result{}, nil
129133
}
@@ -141,8 +145,27 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
141145
return r.updateStatus(ctx, appWrapper, mcadv1beta1.Queued, mcadv1beta1.Idle)
142146

143147
case mcadv1beta1.Queued:
144-
r.triggerDispatch()
145-
return ctrl.Result{}, nil
148+
// Propagate most recent queuing decision to AppWrapper's Queued Condition
149+
if decision, ok := r.Decisions[appWrapper.UID]; ok {
150+
meta.SetStatusCondition(&appWrapper.Status.Conditions, metav1.Condition{
151+
Type: string(mcadv1beta1.Queued),
152+
Status: metav1.ConditionTrue,
153+
Reason: string(decision.reason),
154+
Message: decision.message,
155+
})
156+
if r.Status().Update(ctx, appWrapper) == nil {
157+
// If successfully propagated, remove from in memory map
158+
delete(r.Decisions, appWrapper.UID)
159+
}
160+
}
161+
162+
if meta.FindStatusCondition(appWrapper.Status.Conditions, string(mcadv1beta1.Queued)) == nil {
163+
// Absence of Queued Condition strongly suggests AppWrapper is new; trigger dispatch and a short requeue
164+
r.triggerDispatch()
165+
return ctrl.Result{RequeueAfter: deletionDelay}, nil
166+
} else {
167+
return ctrl.Result{RequeueAfter: queuedDelay}, nil
168+
}
146169

147170
case mcadv1beta1.Running:
148171
switch appWrapper.Status.Step {
@@ -193,7 +216,21 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
193216
// reset status to queued/idle
194217
appWrapper.Status.Restarts += 1
195218
appWrapper.Status.RequeueTimestamp = metav1.Now() // overwrite requeue decision time with completion time
196-
return r.updateStatus(ctx, appWrapper, mcadv1beta1.Queued, mcadv1beta1.Idle)
219+
msg := "Requeued by MCAD"
220+
if decision, ok := r.Decisions[appWrapper.UID]; ok && decision.reason == mcadv1beta1.QueuedRequeue {
221+
msg = fmt.Sprintf("Requeued because %s", decision.message)
222+
}
223+
meta.SetStatusCondition(&appWrapper.Status.Conditions, metav1.Condition{
224+
Type: string(mcadv1beta1.Queued),
225+
Status: metav1.ConditionTrue,
226+
Reason: string(mcadv1beta1.QueuedRequeue),
227+
Message: msg,
228+
})
229+
res, err := r.updateStatus(ctx, appWrapper, mcadv1beta1.Queued, mcadv1beta1.Idle)
230+
if err == nil {
231+
delete(r.Decisions, appWrapper.UID)
232+
}
233+
return res, err
197234
}
198235

199236
case mcadv1beta1.Failed:
@@ -295,6 +332,7 @@ func (r *AppWrapperReconciler) requeueOrFail(ctx context.Context, appWrapper *mc
295332
}
296333
// requeue AppWrapper
297334
appWrapper.Status.RequeueTimestamp = metav1.Now()
335+
r.Decisions[appWrapper.UID] = &QueuingDecision{reason: mcadv1beta1.QueuedRequeue, message: reason}
298336
return r.updateStatus(ctx, appWrapper, mcadv1beta1.Running, mcadv1beta1.Deleting, reason)
299337
}
300338

@@ -332,6 +370,12 @@ func (r *AppWrapperReconciler) dispatch(ctx context.Context) (ctrl.Result, error
332370
}
333371
// set dispatching time and status
334372
appWrapper.Status.DispatchTimestamp = metav1.Now()
373+
meta.SetStatusCondition(&appWrapper.Status.Conditions, metav1.Condition{
374+
Type: string(mcadv1beta1.Queued),
375+
Status: metav1.ConditionFalse,
376+
Reason: string(mcadv1beta1.QueuedDispatch),
377+
Message: "Selected for dispatch",
378+
})
335379
if _, err := r.updateStatus(ctx, appWrapper, mcadv1beta1.Running, mcadv1beta1.Creating); err != nil {
336380
return ctrl.Result{}, err
337381
}

internal/controller/dispatch_logic.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"fmt"
2122
"sort"
2223
"strconv"
2324
"time"
@@ -31,6 +32,11 @@ import (
3132
"github.com/prometheus/client_golang/prometheus"
3233
)
3334

35+
type QueuingDecision struct {
36+
reason mcadv1beta1.AppWrapperQueuedReason
37+
message string
38+
}
39+
3440
// Compute available cluster capacity
3541
func (r *AppWrapperReconciler) computeCapacity(ctx context.Context) (Weights, error) {
3642
capacity := Weights{}
@@ -251,8 +257,16 @@ func (r *AppWrapperReconciler) selectForDispatch(ctx context.Context) (*mcadv1be
251257
// return first AppWrapper that fits if any
252258
for _, appWrapper := range queue {
253259
request := aggregateRequests(appWrapper)
254-
if request.Fits(available[int(appWrapper.Spec.Priority)]) {
260+
fits, gaps := request.Fits(available[int(appWrapper.Spec.Priority)])
261+
if fits {
255262
return appWrapper.DeepCopy(), nil // deep copy AppWrapper
263+
} else {
264+
msg := ""
265+
for _, resource := range gaps {
266+
msg += fmt.Sprintf("Insufficient %v; requested %v but only %v available. ", resource, request[resource], available[int(appWrapper.Spec.Priority)][resource])
267+
268+
}
269+
r.Decisions[appWrapper.UID] = &QueuingDecision{reason: mcadv1beta1.QueuedInsufficientResources, message: msg}
256270
}
257271
}
258272
// no queued AppWrapper fits

internal/controller/timings.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ const (
2929

3030
// RequeueAfter delays
3131
runDelay = time.Minute // how often to force check running AppWrapper health
32+
queuedDelay = 2 * time.Minute // how often to update condition information on Queued AppWrappes
3233
dispatchDelay = time.Minute // how often to force dispatch
3334
deletionDelay = 5 * time.Second // how often to check deleted resources
3435
)

internal/controller/weights.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,8 @@ func (w Weights) Max(r Weights) {
100100

101101
// Compare receiver to argument
102102
// True if receiver is less than or equal to argument in every dimension
103-
func (w Weights) Fits(r Weights) bool {
103+
func (w Weights) Fits(r Weights) (bool, []v1.ResourceName) {
104+
insufficient := []v1.ResourceName{}
104105
zero := &inf.Dec{} // shared zero, never mutated
105106
for k, v := range w { // range over receiver not argument
106107
// ignore 0 requests in case r does not contain k
@@ -109,10 +110,14 @@ func (w Weights) Fits(r Weights) bool {
109110
}
110111
// v > 0 so r[k] must be defined and no less than v
111112
if r[k] == nil || v.Cmp(r[k]) == 1 {
112-
return false
113+
insufficient = append(insufficient, k)
113114
}
114115
}
115-
return true
116+
if len(insufficient) == 0 {
117+
return true, nil
118+
} else {
119+
return false, insufficient
120+
}
116121
}
117122

118123
// Converts Weights to a ResourceList

test/e2e/aw_fixtures.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ func createGenericAWTimeoutWithStatus(ctx context.Context, name string) *arbv1.A
108108
return aw
109109
}
110110

111-
func createJobAWWithInitContainer(ctx context.Context, name string, requeuingTimeInSeconds int, requeuingGrowthType string, requeuingMaxNumRequeuings int) *arbv1.AppWrapper {
111+
func createJobAWWithStuckInitContainer(ctx context.Context, name string, requeuingSpec arbv1.RequeuingSpec) *arbv1.AppWrapper {
112112
rb := []byte(`{"apiVersion": "batch/v1",
113113
"kind": "Job",
114114
"metadata": {
@@ -133,7 +133,7 @@ func createJobAWWithInitContainer(ctx context.Context, name string, requeuingTim
133133
{
134134
"name": "job-init-container",
135135
"image": "quay.io/project-codeflare/busybox:latest",
136-
"command": ["sleep", "200"],
136+
"command": ["sleep", "infinity"],
137137
"resources": {
138138
"requests": {
139139
"cpu": "500m"
@@ -167,11 +167,7 @@ func createJobAWWithInitContainer(ctx context.Context, name string, requeuingTim
167167
Spec: arbv1.AppWrapperSpec{
168168
Scheduling: arbv1.SchedulingSpec{
169169
MinAvailable: minAvailable,
170-
Requeuing: arbv1.RequeuingSpec{
171-
TimeInSeconds: int64(requeuingTimeInSeconds),
172-
NotImplemented_GrowthType: requeuingGrowthType,
173-
MaxNumRequeuings: int32(requeuingMaxNumRequeuings),
174-
},
170+
Requeuing: requeuingSpec,
175171
},
176172
Resources: arbv1.AppWrapperResources{
177173
GenericItems: []arbv1.GenericItem{

0 commit comments

Comments
 (0)