diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index d3ee9f75a3..918bb967ce 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -215,6 +215,13 @@ type MultiKueue struct { // them if their local counterpart no longer exists. // +optional Origin *string `json:"origin,omitempty"` + + // WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready + // if the connection with its reserving worker cluster is lost. + // + // Defaults to 15 minutes. + // +optional + WorkerLostTimeout *metav1.Duration `json:"workerLostTimeout,omitempty"` } type RequeuingStrategy struct { diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index f0c008ccf5..8f11d2cd87 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -46,6 +46,7 @@ const ( defaultJobFrameworkName = "batch/job" DefaultMultiKueueGCInterval = time.Minute DefaultMultiKueueOrigin = "multikueue" + DefaultMultiKueueWorkerLostTimeout = 15 * time.Minute ) func getOperatorNamespace() string { @@ -175,4 +176,7 @@ func SetDefaults_Configuration(cfg *Configuration) { if ptr.Deref(cfg.MultiKueue.Origin, "") == "" { cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin) } + if cfg.MultiKueue.WorkerLostTimeout == nil { + cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout} + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 106d9f4bd1..29cd949c9e 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -97,8 +97,9 @@ func TestSetDefaults_Configuration(t *testing.T) { } defaultMultiKueue := &MultiKueue{ - GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, - Origin: ptr.To(DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, + Origin: ptr.To(DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}, } podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} @@ -498,8 +499,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, }, }, want: &Configuration{ @@ -512,8 +514,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, }, }, }, @@ -537,8 +540,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute}, }, }, }, diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index f86390a2a9..000f346d73 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -300,6 +300,11 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) { *out = new(string) **out = **in } + if in.WorkerLostTimeout != nil { + in, out := &in.WorkerLostTimeout, &out.WorkerLostTimeout + *out = new(v1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue. diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 56e5c59e36..8412bc4c08 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -250,6 +250,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration), multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)), + multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration), ); err != nil { setupLog.Error(err, "Could not setup MultiKueue controller") os.Exit(1) diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 2d289e8187..25d78524a3 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -113,8 +113,9 @@ integrations: }, }, MultiKueue: &config.MultiKueue{ - GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, - Origin: ptr.To(config.DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, + Origin: ptr.To(config.DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueWorkerLostTimeout}, }, }, }, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 32567cc7d9..4caa3d90ea 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -264,6 +264,7 @@ namespace: kueue-system multiKueue: gcInterval: 1m30s origin: multikueue-manager1 + workerLostTimeout: 10m `), os.FileMode(0600)); err != nil { t.Fatal(err) } @@ -333,8 +334,9 @@ multiKueue: } defaultMultiKueue := &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, - Origin: ptr.To(configapi.DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, + Origin: ptr.To(configapi.DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, } testcases := []struct { @@ -800,8 +802,9 @@ multiKueue: Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: 90 * time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: 90 * time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute}, }, }, wantOptions: defaultControlOptions, @@ -914,8 +917,9 @@ func TestEncode(t *testing.T) { "clusterQueues": map[string]any{"maxCount": int64(10)}, }, "multiKueue": map[string]any{ - "gcInterval": "1m0s", - "origin": "multikueue", + "gcInterval": "1m0s", + "origin": "multikueue", + "workerLostTimeout": "15m0s", }, }, }, diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck.go b/pkg/controller/admissionchecks/multikueue/admissioncheck.go index 41d3459f99..d444272662 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck.go @@ -65,11 +65,20 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, client.IgnoreNotFound(err) } - inactiveReason := "" - log.V(2).Info("Reconcile AdmissionCheck") + + newCondition := metav1.Condition{ + Type: kueue.AdmissionCheckActive, + Status: metav1.ConditionTrue, + Reason: "Active", + Message: "The admission check is active", + } + if cfg, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil { - inactiveReason = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error()) + newCondition.Status = metav1.ConditionFalse + newCondition.Reason = "BadConfig" + newCondition.Message = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error()) + } else { var missingClusters []string var inactiveClusters []string @@ -90,28 +99,25 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re } } } + unusableClustersCount := len(missingClusters) + len(inactiveClusters) + if unusableClustersCount > 0 { + if unusableClustersCount < len(cfg.Spec.Clusters) { + // keep it partially active + newCondition.Reason = "SomeActiveClusters" + } else { + newCondition.Status = metav1.ConditionFalse + newCondition.Reason = "NoUsableClusters" + } - var messageParts []string - if len(missingClusters) > 0 { - messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)} - } - if len(inactiveClusters) > 0 { - messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters)) + var messageParts []string + if len(missingClusters) > 0 { + messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)} + } + if len(inactiveClusters) > 0 { + messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters)) + } + newCondition.Message = strings.Join(messageParts, ", ") } - inactiveReason = strings.Join(messageParts, ", ") - } - - newCondition := metav1.Condition{ - Type: kueue.AdmissionCheckActive, - } - if len(inactiveReason) == 0 { - newCondition.Status = metav1.ConditionTrue - newCondition.Reason = "Active" - newCondition.Message = "The admission check is active" - } else { - newCondition.Status = metav1.ConditionFalse - newCondition.Reason = "Inactive" - newCondition.Message = inactiveReason } oldCondition := apimeta.FindStatusCondition(ac.Status.Conditions, kueue.AdmissionCheckActive) diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go index afaf2832fd..2853ea4efa 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go @@ -58,7 +58,7 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "BadConfig", Message: `Cannot load the AdmissionChecks parameters: multikueueconfigs.kueue.x-k8s.io "config1" not found`, }). Obj(), @@ -95,7 +95,7 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: "Missing clusters: [worker1]", }). Obj(), @@ -123,13 +123,13 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: "Inactive clusters: [worker1]", }). Obj(), }, }, - "missing and inactive cluster": { + "all clusters missing or inactive": { reconcileFor: "ac1", checks: []kueue.AdmissionCheck{ *utiltesting.MakeAdmissionCheck("ac1"). @@ -143,7 +143,7 @@ func TestReconcile(t *testing.T) { clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), - *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(), + *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), }, wantChecks: []kueue.AdmissionCheck{ *utiltesting.MakeAdmissionCheck("ac1"). @@ -152,7 +152,36 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", + Message: "Missing clusters: [worker3], Inactive clusters: [worker1 worker2]", + }). + Obj(), + }, + }, + "partially active": { + reconcileFor: "ac1", + checks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(ControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + Obj(), + }, + configs: []kueuealpha.MultiKueueConfig{ + *utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2", "worker3").Obj(), + }, + + clusters: []kueuealpha.MultiKueueCluster{ + *utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), + *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(), + }, + wantChecks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(ControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + Condition(metav1.Condition{ + Type: kueue.AdmissionCheckActive, + Status: metav1.ConditionTrue, + Reason: "SomeActiveClusters", Message: "Missing clusters: [worker3], Inactive clusters: [worker1]", }). Obj(), diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 90c14dc5b5..d1657641ec 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -23,13 +23,15 @@ import ( ) const ( - defaultGCInterval = time.Minute - defaultOrigin = "multikueue" + defaultGCInterval = time.Minute + defaultOrigin = "multikueue" + defaulWorkerLostTimeout = 5 * time.Minute ) type SetupOptions struct { - gcInterval time.Duration - origin string + gcInterval time.Duration + origin string + workerLostTimeout time.Duration } type SetupOption func(o *SetupOptions) @@ -49,10 +51,20 @@ func WithOrigin(origin string) SetupOption { } } +// WithWorkerLostTimeout - sets the time for which the multikueue +// admission check is kept in Ready state after the connection to +// the admitting worker cluster is lost. +func WithWorkerLostTimeout(d time.Duration) SetupOption { + return func(o *SetupOptions) { + o.workerLostTimeout = d + } +} + func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { options := &SetupOptions{ - gcInterval: defaultGCInterval, - origin: defaultOrigin, + gcInterval: defaultGCInterval, + origin: defaultOrigin, + workerLostTimeout: defaulWorkerLostTimeout, } for _, o := range opts { @@ -76,6 +88,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index e34c6da374..2b8c45e1d5 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -337,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) { cRec.remoteClients["worker1"] = w1remoteClient helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}}) if gotErr != nil { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 94f452139e..5f846fcc9c 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -54,10 +54,11 @@ var ( ) type wlReconciler struct { - client client.Client - helper *multiKueueStoreHelper - clusters *clustersReconciler - origin string + client client.Client + helper *multiKueueStoreHelper + clusters *clustersReconciler + origin string + workerLostTimeout time.Duration } var _ reconcile.Reconciler = (*wlReconciler)(nil) @@ -168,7 +169,7 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, nil } - return reconcile.Result{}, a.reconcileGroup(ctx, grp) + return a.reconcileGroup(ctx, grp) } func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) { @@ -241,10 +242,12 @@ func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*w return &grp, nil } -func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error { +func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx).WithValues("op", "reconcileGroup") log.V(3).Info("Reconcile Workload Group") + acs := workload.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName) + // 1. delete all remote workloads when finished or the local wl has no reservation if group.IsFinished() || !workload.HasQuotaReservation(group.local) { errs := []error{} @@ -254,7 +257,17 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error log.V(2).Error(err, "Deleting remote workload", "workerCluster", rem) } } - return errors.Join(errs...) + + if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry { + acs.State = kueue.CheckStatePending + acs.Message = "Requeued" + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)) + } + + return reconcile.Result{}, errors.Join(errs...) } if remoteFinishedCond, remote := group.RemoteFinishedCondition(); remoteFinishedCond != nil { @@ -265,7 +278,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[remote].client, group.controllerKey, group.local.Name, a.origin); err != nil { log.V(2).Error(err, "copying remote controller status", "workerCluster", remote) // we should retry this - return err + return reconcile.Result{}, err } } else { log.V(3).Info("Group with no adapter, skip owner status copy", "workerCluster", remote) @@ -279,33 +292,39 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error Reason: remoteFinishedCond.Reason, Message: remoteFinishedCond.Message, }) - return a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership) + return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership) } - hasReserving, reservingRemote := group.FirstReserving() - // 2. delete all workloads that are out of sync or are not in the chosen worker for rem, remWl := range group.remotes { - if remWl == nil { - continue - } - outOfSync := group.local == nil || !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) - notReservingRemote := hasReserving && reservingRemote != rem - if outOfSync || notReservingRemote { + if remWl != nil && !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) { if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) - return err + return reconcile.Result{}, err } + group.remotes[rem] = nil } } // 3. get the first reserving + hasReserving, reservingRemote := group.FirstReserving() if hasReserving { + // remove the non-reserving worker workloads + for rem, remWl := range group.remotes { + if remWl != nil && rem != reservingRemote { + if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { + log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) + return reconcile.Result{}, err + } + group.remotes[rem] = nil + } + } + acs := workload.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName) if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, a.origin); err != nil { log.V(2).Error(err, "creating remote controller object", "remote", reservingRemote) // We'll retry this in the next reconcile. - return err + return reconcile.Result{}, err } if acs.State != kueue.CheckStateRetry && acs.State != kueue.CheckStateRejected { @@ -316,15 +335,32 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error } // update the message acs.Message = fmt.Sprintf("The workload got reservation on %q", reservingRemote) + // update the transition time since is used to detect the lost worker state. + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) err := a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) if err != nil { - return err + return reconcile.Result{}, err } } - // drop this if we want to create new remote workloads while holding a reservation - return nil + return reconcile.Result{RequeueAfter: a.workerLostTimeout}, nil + } else if acs.State == kueue.CheckStateReady { + // If there is no reserving and the AC is ready, the connection with the reserving remote might + // be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that. + remainingWaitTime := a.workerLostTimeout - time.Since(acs.LastTransitionTime.Time) + if remainingWaitTime > 0 { + log.V(3).Info("Reserving remote lost, retry", "retryAfter", remainingWaitTime) + return reconcile.Result{RequeueAfter: remainingWaitTime}, nil + } else { + acs.State = kueue.CheckStateRetry + acs.Message = "Reserving remote lost" + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) + } } // finally - create missing workloads @@ -340,15 +376,16 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error } } } - return errors.Join(errs...) + return reconcile.Result{}, errors.Join(errs...) } -func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string) *wlReconciler { +func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout time.Duration) *wlReconciler { return &wlReconciler{ - client: c, - helper: helper, - clusters: cRec, - origin: origin, + client: c, + helper: helper, + clusters: cRec, + origin: origin, + workerLostTimeout: workerLostTimeout, } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index a5a70d696a..92bddda163 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -423,6 +424,185 @@ func TestWlReconcile(t *testing.T) { Obj(), }, }, + "the local workload admission check Ready if the remote WorkerLostTimeout is not exceeded": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaulWorkerLostTimeout / 2)), //50% of the timeout + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + }, + "the local workload's admission check is set to Retry if the WorkerLostTimeout is exceeded": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaulWorkerLostTimeout * 3 / 2)), // 150% of the timeout + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRetry, + Message: `Reserving remote lost`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + }, + "the local workload's admission check is set to Pending when it is in Retry without quota": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRetry, + Message: `Reserving remote lost`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + }, + "worker reconnects after the local workload is requeued, remote objects are deleted": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, + "worker reconnects after the local workload is requeued and got reservation on a second worker": { + // the worker with the oldest reservation is kept + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `The workload got reservation on "worker2"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(time.Now().Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + worker2Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(time.Now().Add(-time.Minute)). //one minute ago + Obj(), + }, + worker2Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobBuilder.Clone().Obj(), + }, + wantWorker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantWorker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, } for name, tc := range cases { @@ -488,7 +668,7 @@ func TestWlReconcile(t *testing.T) { } helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor, Namespace: TestNamespace}}) if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index e2d7ec01c8..7e802a4c05 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -127,6 +127,13 @@ func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper { return w } +// QuotaReservedTime - sets the LastTransitionTime of the QuotaReserved condition if found. +func (w *WorkloadWrapper) QuotaReservedTime(t time.Time) *WorkloadWrapper { + cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadQuotaReserved) + cond.LastTransitionTime = metav1.NewTime(t) + return w +} + func (w *WorkloadWrapper) Admitted(a bool) *WorkloadWrapper { cond := metav1.Condition{ Type: kueue.WorkloadAdmitted, diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index bf605300bb..a3016ae344 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -512,6 +512,15 @@ remote objects that ware created by this multikueue manager cluster and delete them if their local counterpart no longer exists.

+workerLostTimeout
+k8s.io/apimachinery/pkg/apis/meta/v1.Duration + + +

WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready +if the connection with its reserving worker cluster is lost.

+

Defaults to 15 minutes.

+ + diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 8e14b5869e..da6c27bc25 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -17,6 +17,8 @@ limitations under the License. package multikueue import ( + "time" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -187,7 +189,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "BadConfig", Message: `Cannot load the AdmissionChecks parameters: MultiKueueConfig.kueue.x-k8s.io "testing-config" not found`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -213,7 +215,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: `Missing clusters: [testing-cluster]`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -247,7 +249,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: `Inactive clusters: [testing-cluster]`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -560,4 +562,145 @@ var _ = ginkgo.Describe("Multikueue", func() { }) }) + ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() { + jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). + Queue(managerLq.Name). + ReplicatedJobs( + testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-1", + Replicas: 1, + Parallelism: 1, + Completions: 1, + }, testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-2", + Replicas: 3, + Parallelism: 1, + Completions: 1, + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, jobSet)).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), Namespace: managerNs.Name} + + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "replicated-job-1", + }, kueue.PodSetAssignment{ + Name: "replicated-job-2", + }, + ).Obj() + + ginkgo.By("setting workload reservation in the management cluster", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager and worker1 wl is removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).NotTo(gomega.BeNil()) + g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady)) + g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker2"`)) + + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"))) + + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + var disconnectedTime time.Time + ginkgo.By("breaking the connection to worker2", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + createdCluster.Spec.KubeConfig.Location = "bad-secret" + g.Expect(managerTestCluster.client.Update(managerTestCluster.ctx, createdCluster)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + activeCondition := apimeta.FindStatusCondition(createdCluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) + g.Expect(activeCondition).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueuealpha.MultiKueueClusterActive, + Status: metav1.ConditionFalse, + Reason: "BadConfig", + }, cmpopts.IgnoreFields(metav1.Condition{}, "Message", "LastTransitionTime"))) + disconnectedTime = activeCondition.LastTransitionTime.Time + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("waiting for the local workload admission check state to be set to pending and quotaReservatio removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multikueueAC.Name, + State: kueue.CheckStatePending, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "Message"))) + + // The transition interval should be close to testingKeepReadyTimeout (taking into account the resolution of the LastTransitionTime field) + g.Expect(acs.LastTransitionTime.Time).To(gomega.BeComparableTo(disconnectedTime.Add(testingWorkerLostTimeout), cmpopts.EquateApproxTime(2*time.Second))) + + g.Expect(apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)).To(gomega.BeFalse()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("restoring the connection to worker2", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + createdCluster.Spec.KubeConfig.Location = managerMultikueueSecret2.Name + g.Expect(managerTestCluster.client.Update(managerTestCluster.ctx, createdCluster)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + activeCondition := apimeta.FindStatusCondition(createdCluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) + g.Expect(activeCondition).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueuealpha.MultiKueueClusterActive, + Status: metav1.ConditionTrue, + Reason: "Active", + }, cmpopts.IgnoreFields(metav1.Condition{}, "Message", "LastTransitionTime"))) + disconnectedTime = activeCondition.LastTransitionTime.Time + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("the worker2 wl is removed since the local one no longer has a reservation", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) }) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index ed145ed1fd..8a021bd0b6 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -46,6 +46,10 @@ import ( // +kubebuilder:scaffold:imports ) +const ( + testingWorkerLostTimeout = 3 * time.Second +) + type cluster struct { cfg *rest.Config client client.Client @@ -173,6 +177,9 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) { err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(2*time.Second)) + err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, + multikueue.WithGCInterval(2*time.Second), + multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout), + ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }