Skip to content

Commit

Permalink
[multikueue] Manage worker cluster unavailability (kubernetes-sigs#1681)
Browse files Browse the repository at this point in the history
* [multikueue] Partially active admission check.

* [multikueue] Keep ready timeout

* Review Remarks

* Refactor reconcileGroup.

* Review Remarks.

* Fix int test after rebase.
  • Loading branch information
trasc authored and vsoch committed Apr 18, 2024
1 parent 898e39c commit a6f15f2
Show file tree
Hide file tree
Showing 17 changed files with 542 additions and 86 deletions.
7 changes: 7 additions & 0 deletions apis/config/v1beta1/configuration_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,13 @@ type MultiKueue struct {
// them if their local counterpart no longer exists.
// +optional
Origin *string `json:"origin,omitempty"`

// WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready
// if the connection with its reserving worker cluster is lost.
//
// Defaults to 15 minutes.
// +optional
WorkerLostTimeout *metav1.Duration `json:"workerLostTimeout,omitempty"`
}

type RequeuingStrategy struct {
Expand Down
4 changes: 4 additions & 0 deletions apis/config/v1beta1/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ const (
defaultJobFrameworkName = "batch/job"
DefaultMultiKueueGCInterval = time.Minute
DefaultMultiKueueOrigin = "multikueue"
DefaultMultiKueueWorkerLostTimeout = 15 * time.Minute
)

func getOperatorNamespace() string {
Expand Down Expand Up @@ -175,4 +176,7 @@ func SetDefaults_Configuration(cfg *Configuration) {
if ptr.Deref(cfg.MultiKueue.Origin, "") == "" {
cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin)
}
if cfg.MultiKueue.WorkerLostTimeout == nil {
cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}
}
}
20 changes: 12 additions & 8 deletions apis/config/v1beta1/defaults_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
}

defaultMultiKueue := &MultiKueue{
GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
Origin: ptr.To(DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval},
Origin: ptr.To(DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout},
}

podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout}
Expand Down Expand Up @@ -498,8 +499,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Enable: ptr.To(false),
},
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
},
},
want: &Configuration{
Expand All @@ -512,8 +514,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: time.Minute},
},
},
},
Expand All @@ -537,8 +540,9 @@ func TestSetDefaults_Configuration(t *testing.T) {
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &MultiKueue{
GCInterval: &metav1.Duration{},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute},
},
},
},
Expand Down
5 changes: 5 additions & 0 deletions apis/config/v1beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cmd/kueue/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag
if err := multikueue.SetupControllers(mgr, *cfg.Namespace,
multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration),
multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)),
multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration),
); err != nil {
setupLog.Error(err, "Could not setup MultiKueue controller")
os.Exit(1)
Expand Down
5 changes: 3 additions & 2 deletions cmd/kueue/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,9 @@ integrations:
},
},
MultiKueue: &config.MultiKueue{
GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
Origin: ptr.To(config.DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval},
Origin: ptr.To(config.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueWorkerLostTimeout},
},
},
},
Expand Down
16 changes: 10 additions & 6 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ namespace: kueue-system
multiKueue:
gcInterval: 1m30s
origin: multikueue-manager1
workerLostTimeout: 10m
`), os.FileMode(0600)); err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -333,8 +334,9 @@ multiKueue:
}

defaultMultiKueue := &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval},
Origin: ptr.To(configapi.DefaultMultiKueueOrigin),
WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout},
}

testcases := []struct {
Expand Down Expand Up @@ -800,8 +802,9 @@ multiKueue:
Integrations: defaultIntegrations,
QueueVisibility: defaultQueueVisibility,
MultiKueue: &configapi.MultiKueue{
GCInterval: &metav1.Duration{Duration: 90 * time.Second},
Origin: ptr.To("multikueue-manager1"),
GCInterval: &metav1.Duration{Duration: 90 * time.Second},
Origin: ptr.To("multikueue-manager1"),
WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute},
},
},
wantOptions: defaultControlOptions,
Expand Down Expand Up @@ -914,8 +917,9 @@ func TestEncode(t *testing.T) {
"clusterQueues": map[string]any{"maxCount": int64(10)},
},
"multiKueue": map[string]any{
"gcInterval": "1m0s",
"origin": "multikueue",
"gcInterval": "1m0s",
"origin": "multikueue",
"workerLostTimeout": "15m0s",
},
},
},
Expand Down
52 changes: 29 additions & 23 deletions pkg/controller/admissionchecks/multikueue/admissioncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,20 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
return reconcile.Result{}, client.IgnoreNotFound(err)
}

inactiveReason := ""

log.V(2).Info("Reconcile AdmissionCheck")

newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "Active",
Message: "The admission check is active",
}

if cfg, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil {
inactiveReason = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error())
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "BadConfig"
newCondition.Message = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error())

} else {
var missingClusters []string
var inactiveClusters []string
Expand All @@ -90,28 +99,25 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re
}
}
}
unusableClustersCount := len(missingClusters) + len(inactiveClusters)
if unusableClustersCount > 0 {
if unusableClustersCount < len(cfg.Spec.Clusters) {
// keep it partially active
newCondition.Reason = "SomeActiveClusters"
} else {
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "NoUsableClusters"
}

var messageParts []string
if len(missingClusters) > 0 {
messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)}
}
if len(inactiveClusters) > 0 {
messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters))
var messageParts []string
if len(missingClusters) > 0 {
messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)}
}
if len(inactiveClusters) > 0 {
messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters))
}
newCondition.Message = strings.Join(messageParts, ", ")
}
inactiveReason = strings.Join(messageParts, ", ")
}

newCondition := metav1.Condition{
Type: kueue.AdmissionCheckActive,
}
if len(inactiveReason) == 0 {
newCondition.Status = metav1.ConditionTrue
newCondition.Reason = "Active"
newCondition.Message = "The admission check is active"
} else {
newCondition.Status = metav1.ConditionFalse
newCondition.Reason = "Inactive"
newCondition.Message = inactiveReason
}

oldCondition := apimeta.FindStatusCondition(ac.Status.Conditions, kueue.AdmissionCheckActive)
Expand Down
41 changes: 35 additions & 6 deletions pkg/controller/admissionchecks/multikueue/admissioncheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "BadConfig",
Message: `Cannot load the AdmissionChecks parameters: multikueueconfigs.kueue.x-k8s.io "config1" not found`,
}).
Obj(),
Expand Down Expand Up @@ -95,7 +95,7 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Missing clusters: [worker1]",
}).
Obj(),
Expand Down Expand Up @@ -123,13 +123,13 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Inactive clusters: [worker1]",
}).
Obj(),
},
},
"missing and inactive cluster": {
"all clusters missing or inactive": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
Expand All @@ -143,7 +143,7 @@ func TestReconcile(t *testing.T) {

clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
Expand All @@ -152,7 +152,36 @@ func TestReconcile(t *testing.T) {
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionFalse,
Reason: "Inactive",
Reason: "NoUsableClusters",
Message: "Missing clusters: [worker3], Inactive clusters: [worker1 worker2]",
}).
Obj(),
},
},
"partially active": {
reconcileFor: "ac1",
checks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Obj(),
},
configs: []kueuealpha.MultiKueueConfig{
*utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2", "worker3").Obj(),
},

clusters: []kueuealpha.MultiKueueCluster{
*utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(),
*utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(),
},
wantChecks: []kueue.AdmissionCheck{
*utiltesting.MakeAdmissionCheck("ac1").
ControllerName(ControllerName).
Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1").
Condition(metav1.Condition{
Type: kueue.AdmissionCheckActive,
Status: metav1.ConditionTrue,
Reason: "SomeActiveClusters",
Message: "Missing clusters: [worker3], Inactive clusters: [worker1]",
}).
Obj(),
Expand Down
26 changes: 19 additions & 7 deletions pkg/controller/admissionchecks/multikueue/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ import (
)

const (
defaultGCInterval = time.Minute
defaultOrigin = "multikueue"
defaultGCInterval = time.Minute
defaultOrigin = "multikueue"
defaulWorkerLostTimeout = 5 * time.Minute
)

type SetupOptions struct {
gcInterval time.Duration
origin string
gcInterval time.Duration
origin string
workerLostTimeout time.Duration
}

type SetupOption func(o *SetupOptions)
Expand All @@ -49,10 +51,20 @@ func WithOrigin(origin string) SetupOption {
}
}

// WithWorkerLostTimeout - sets the time for which the multikueue
// admission check is kept in Ready state after the connection to
// the admitting worker cluster is lost.
func WithWorkerLostTimeout(d time.Duration) SetupOption {
return func(o *SetupOptions) {
o.workerLostTimeout = d
}
}

func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error {
options := &SetupOptions{
gcInterval: defaultGCInterval,
origin: defaultOrigin,
gcInterval: defaultGCInterval,
origin: defaultOrigin,
workerLostTimeout: defaulWorkerLostTimeout,
}

for _, o := range opts {
Expand All @@ -76,6 +88,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e
return err
}

wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin)
wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout)
return wlRec.setupWithManager(mgr)
}
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) {
cRec.remoteClients["worker1"] = w1remoteClient

helper, _ := newMultiKueueStoreHelper(managerClient)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin)
reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout)

_, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}})
if gotErr != nil {
Expand Down

0 comments on commit a6f15f2

Please sign in to comment.