From c0b82c35ded0073a6220036d28a0597174d75615 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 30 Jan 2024 13:30:52 +0200 Subject: [PATCH 1/6] [multikueue] Partially active admission check. --- .../multikueue/admissioncheck.go | 52 +++++++++++-------- .../multikueue/admissioncheck_test.go | 41 ++++++++++++--- .../integration/multikueue/multikueue_test.go | 6 +-- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck.go b/pkg/controller/admissionchecks/multikueue/admissioncheck.go index 41d3459f99..46b88bad27 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck.go @@ -65,11 +65,20 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, client.IgnoreNotFound(err) } - inactiveReason := "" - log.V(2).Info("Reconcile AdmissionCheck") + + newCondition := metav1.Condition{ + Type: kueue.AdmissionCheckActive, + Status: metav1.ConditionTrue, + Reason: "Active", + Message: "The admission check is active", + } + if cfg, err := a.helper.ConfigFromRef(ctx, ac.Spec.Parameters); err != nil { - inactiveReason = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error()) + newCondition.Status = metav1.ConditionFalse + newCondition.Reason = "BadConfig" + newCondition.Message = fmt.Sprintf("Cannot load the AdmissionChecks parameters: %s", err.Error()) + } else { var missingClusters []string var inactiveClusters []string @@ -90,28 +99,25 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re } } } + unusableClustersCount := len(missingClusters) + len(inactiveClusters) + if unusableClustersCount > 0 { + if unusableClustersCount < len(cfg.Spec.Clusters) { + // keep it partially active + newCondition.Reason = "PartiallyActive" + } else { + newCondition.Status = metav1.ConditionFalse + newCondition.Reason = "NoUsableClusters" + } - var messageParts []string - if len(missingClusters) > 0 { - messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)} - } - if len(inactiveClusters) > 0 { - messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters)) + var messageParts []string + if len(missingClusters) > 0 { + messageParts = []string{fmt.Sprintf("Missing clusters: %v", missingClusters)} + } + if len(inactiveClusters) > 0 { + messageParts = append(messageParts, fmt.Sprintf("Inactive clusters: %v", inactiveClusters)) + } + newCondition.Message = strings.Join(messageParts, ", ") } - inactiveReason = strings.Join(messageParts, ", ") - } - - newCondition := metav1.Condition{ - Type: kueue.AdmissionCheckActive, - } - if len(inactiveReason) == 0 { - newCondition.Status = metav1.ConditionTrue - newCondition.Reason = "Active" - newCondition.Message = "The admission check is active" - } else { - newCondition.Status = metav1.ConditionFalse - newCondition.Reason = "Inactive" - newCondition.Message = inactiveReason } oldCondition := apimeta.FindStatusCondition(ac.Status.Conditions, kueue.AdmissionCheckActive) diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go index afaf2832fd..80762c0d7c 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go @@ -58,7 +58,7 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "BadConfig", Message: `Cannot load the AdmissionChecks parameters: multikueueconfigs.kueue.x-k8s.io "config1" not found`, }). Obj(), @@ -95,7 +95,7 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: "Missing clusters: [worker1]", }). Obj(), @@ -123,13 +123,13 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: "Inactive clusters: [worker1]", }). Obj(), }, }, - "missing and inactive cluster": { + "all clusters missing or inactive": { reconcileFor: "ac1", checks: []kueue.AdmissionCheck{ *utiltesting.MakeAdmissionCheck("ac1"). @@ -143,7 +143,7 @@ func TestReconcile(t *testing.T) { clusters: []kueuealpha.MultiKueueCluster{ *utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), - *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(), + *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), }, wantChecks: []kueue.AdmissionCheck{ *utiltesting.MakeAdmissionCheck("ac1"). @@ -152,7 +152,36 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", + Message: "Missing clusters: [worker3], Inactive clusters: [worker1 worker2]", + }). + Obj(), + }, + }, + "partially active": { + reconcileFor: "ac1", + checks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(ControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + Obj(), + }, + configs: []kueuealpha.MultiKueueConfig{ + *utiltesting.MakeMultiKueueConfig("config1").Clusters("worker1", "worker2", "worker3").Obj(), + }, + + clusters: []kueuealpha.MultiKueueCluster{ + *utiltesting.MakeMultiKueueCluster("worker1").Active(metav1.ConditionFalse, "ByTest", "by test").Obj(), + *utiltesting.MakeMultiKueueCluster("worker2").Active(metav1.ConditionTrue, "ByTest", "by test").Obj(), + }, + wantChecks: []kueue.AdmissionCheck{ + *utiltesting.MakeAdmissionCheck("ac1"). + ControllerName(ControllerName). + Parameters(kueuealpha.GroupVersion.Group, "MultiKueueConfig", "config1"). + Condition(metav1.Condition{ + Type: kueue.AdmissionCheckActive, + Status: metav1.ConditionTrue, + Reason: "PartiallyActive", Message: "Missing clusters: [worker3], Inactive clusters: [worker1]", }). Obj(), diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 8e14b5869e..8aedf08e27 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -187,7 +187,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "BadConfig", Message: `Cannot load the AdmissionChecks parameters: MultiKueueConfig.kueue.x-k8s.io "testing-config" not found`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -213,7 +213,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: `Missing clusters: [testing-cluster]`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) @@ -247,7 +247,7 @@ var _ = ginkgo.Describe("Multikueue", func() { g.Expect(updatedAc.Status.Conditions).To(gomega.ContainElement(gomega.BeComparableTo(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionFalse, - Reason: "Inactive", + Reason: "NoUsableClusters", Message: `Inactive clusters: [testing-cluster]`, }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime")))) }, util.Timeout, util.Interval).Should(gomega.Succeed()) From 06f9bf11d481af6066ef69c46b99e5302802b93b Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Tue, 30 Jan 2024 14:04:18 +0200 Subject: [PATCH 2/6] [multikueue] Keep ready timeout --- apis/config/v1beta1/configuration_types.go | 5 + apis/config/v1beta1/defaults.go | 4 + apis/config/v1beta1/defaults_test.go | 20 +- apis/config/v1beta1/zz_generated.deepcopy.go | 5 + cmd/kueue/main.go | 1 + cmd/kueue/main_test.go | 5 +- pkg/config/config_test.go | 16 +- .../admissionchecks/multikueue/controllers.go | 26 ++- .../multikueue/jobset_adapter_test.go | 2 +- .../admissionchecks/multikueue/workload.go | 91 ++++++-- .../multikueue/workload_test.go | 182 +++++++++++++++- pkg/util/testing/wrappers.go | 7 + .../en/docs/reference/kueue-config.v1beta1.md | 8 + .../integration/multikueue/multikueue_test.go | 203 ++++++++++++++++++ test/integration/multikueue/suite_test.go | 9 +- 15 files changed, 538 insertions(+), 46 deletions(-) diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index d3ee9f75a3..3e8dc037c7 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -215,6 +215,11 @@ type MultiKueue struct { // them if their local counterpart no longer exists. // +optional Origin *string `json:"origin,omitempty"` + + // KeepReadyTimeout defines the time a local workload's multikueue admission check state is kept Ready + // if the connection with its reserving worker cluster is lost. + // +optional + KeepReadyTimeout *metav1.Duration `json:"keepReadyTimeout,omitempty"` } type RequeuingStrategy struct { diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index f0c008ccf5..bda9ab7319 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -46,6 +46,7 @@ const ( defaultJobFrameworkName = "batch/job" DefaultMultiKueueGCInterval = time.Minute DefaultMultiKueueOrigin = "multikueue" + DefaultMultiKueueKeepReadyTimeout = 5 * time.Minute ) func getOperatorNamespace() string { @@ -175,4 +176,7 @@ func SetDefaults_Configuration(cfg *Configuration) { if ptr.Deref(cfg.MultiKueue.Origin, "") == "" { cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin) } + if cfg.MultiKueue.KeepReadyTimeout == nil { + cfg.MultiKueue.KeepReadyTimeout = &metav1.Duration{Duration: DefaultMultiKueueKeepReadyTimeout} + } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 106d9f4bd1..7cddcc7e24 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -97,8 +97,9 @@ func TestSetDefaults_Configuration(t *testing.T) { } defaultMultiKueue := &MultiKueue{ - GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, - Origin: ptr.To(DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, + Origin: ptr.To(DefaultMultiKueueOrigin), + KeepReadyTimeout: &metav1.Duration{Duration: DefaultMultiKueueKeepReadyTimeout}, } podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} @@ -498,8 +499,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + KeepReadyTimeout: &metav1.Duration{Duration: time.Minute}, }, }, want: &Configuration{ @@ -512,8 +514,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + KeepReadyTimeout: &metav1.Duration{Duration: time.Minute}, }, }, }, @@ -537,8 +540,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{}, + Origin: ptr.To("multikueue-manager1"), + KeepReadyTimeout: &metav1.Duration{Duration: 5 * time.Minute}, }, }, }, diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index f86390a2a9..696f478b0f 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -300,6 +300,11 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) { *out = new(string) **out = **in } + if in.KeepReadyTimeout != nil { + in, out := &in.KeepReadyTimeout, &out.KeepReadyTimeout + *out = new(v1.Duration) + **out = **in + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new MultiKueue. diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index 56e5c59e36..e6b87a3739 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -250,6 +250,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration), multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)), + multikueue.WithKeepReadyTimeout(cfg.MultiKueue.KeepReadyTimeout.Duration), ); err != nil { setupLog.Error(err, "Could not setup MultiKueue controller") os.Exit(1) diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 2d289e8187..94507c205f 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -113,8 +113,9 @@ integrations: }, }, MultiKueue: &config.MultiKueue{ - GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, - Origin: ptr.To(config.DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, + Origin: ptr.To(config.DefaultMultiKueueOrigin), + KeepReadyTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueKeepReadyTimeout}, }, }, }, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 32567cc7d9..7e58742f50 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -264,6 +264,7 @@ namespace: kueue-system multiKueue: gcInterval: 1m30s origin: multikueue-manager1 + keepReadyTimeout: 10m `), os.FileMode(0600)); err != nil { t.Fatal(err) } @@ -333,8 +334,9 @@ multiKueue: } defaultMultiKueue := &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, - Origin: ptr.To(configapi.DefaultMultiKueueOrigin), + GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, + Origin: ptr.To(configapi.DefaultMultiKueueOrigin), + KeepReadyTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueKeepReadyTimeout}, } testcases := []struct { @@ -800,8 +802,9 @@ multiKueue: Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: 90 * time.Second}, - Origin: ptr.To("multikueue-manager1"), + GCInterval: &metav1.Duration{Duration: 90 * time.Second}, + Origin: ptr.To("multikueue-manager1"), + KeepReadyTimeout: &metav1.Duration{Duration: 10 * time.Minute}, }, }, wantOptions: defaultControlOptions, @@ -914,8 +917,9 @@ func TestEncode(t *testing.T) { "clusterQueues": map[string]any{"maxCount": int64(10)}, }, "multiKueue": map[string]any{ - "gcInterval": "1m0s", - "origin": "multikueue", + "gcInterval": "1m0s", + "origin": "multikueue", + "keepReadyTimeout": "5m0s", }, }, }, diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 90c14dc5b5..27efe82010 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -23,13 +23,15 @@ import ( ) const ( - defaultGCInterval = time.Minute - defaultOrigin = "multikueue" + defaultGCInterval = time.Minute + defaultOrigin = "multikueue" + defaultKeepReadyTimeout = 5 * time.Minute ) type SetupOptions struct { - gcInterval time.Duration - origin string + gcInterval time.Duration + origin string + keepReadyTimeout time.Duration } type SetupOption func(o *SetupOptions) @@ -49,10 +51,20 @@ func WithOrigin(origin string) SetupOption { } } +// WithKeepReadyTimeout - sets the time for which the multikueue +// admission check is kept in Ready state after the connection to +// the admitting worker cluster is lost. +func WithKeepReadyTimeout(d time.Duration) SetupOption { + return func(o *SetupOptions) { + o.keepReadyTimeout = d + } +} + func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { options := &SetupOptions{ - gcInterval: defaultGCInterval, - origin: defaultOrigin, + gcInterval: defaultGCInterval, + origin: defaultOrigin, + keepReadyTimeout: defaultKeepReadyTimeout, } for _, o := range opts { @@ -76,6 +88,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.keepReadyTimeout) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index e34c6da374..df8db2cf8c 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -337,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) { cRec.remoteClients["worker1"] = w1remoteClient helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultKeepReadyTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}}) if gotErr != nil { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 94f452139e..2c79b29f89 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -54,10 +54,11 @@ var ( ) type wlReconciler struct { - client client.Client - helper *multiKueueStoreHelper - clusters *clustersReconciler - origin string + client client.Client + helper *multiKueueStoreHelper + clusters *clustersReconciler + origin string + keepReadyTimeout time.Duration } var _ reconcile.Reconciler = (*wlReconciler)(nil) @@ -168,7 +169,7 @@ func (a *wlReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re return reconcile.Result{}, nil } - return reconcile.Result{}, a.reconcileGroup(ctx, grp) + return a.reconcileGroup(ctx, grp) } func (w *wlReconciler) remoteClientsForAC(ctx context.Context, acName string) (map[string]*remoteClient, error) { @@ -241,10 +242,12 @@ func (a *wlReconciler) readGroup(ctx context.Context, local *kueue.Workload) (*w return &grp, nil } -func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error { +func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reconcile.Result, error) { log := ctrl.LoggerFrom(ctx).WithValues("op", "reconcileGroup") log.V(3).Info("Reconcile Workload Group") + acs := workload.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName) + // 1. delete all remote workloads when finished or the local wl has no reservation if group.IsFinished() || !workload.HasQuotaReservation(group.local) { errs := []error{} @@ -254,7 +257,17 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error log.V(2).Error(err, "Deleting remote workload", "workerCluster", rem) } } - return errors.Join(errs...) + + if !workload.HasQuotaReservation(group.local) && acs.State == kueue.CheckStateRetry { + acs.State = kueue.CheckStatePending + acs.Message = "Requeued" + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + errs = append(errs, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership)) + } + + return reconcile.Result{}, errors.Join(errs...) } if remoteFinishedCond, remote := group.RemoteFinishedCondition(); remoteFinishedCond != nil { @@ -265,7 +278,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[remote].client, group.controllerKey, group.local.Name, a.origin); err != nil { log.V(2).Error(err, "copying remote controller status", "workerCluster", remote) // we should retry this - return err + return reconcile.Result{}, err } } else { log.V(3).Info("Group with no adapter, skip owner status copy", "workerCluster", remote) @@ -279,11 +292,43 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error Reason: remoteFinishedCond.Reason, Message: remoteFinishedCond.Message, }) - return a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership) + return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership) } hasReserving, reservingRemote := group.FirstReserving() + // if the reserving workload is out of sync, remove it + if hasReserving { + outOfSync := group.local == nil || !equality.Semantic.DeepEqual(group.local.Spec, group.remotes[reservingRemote].Spec) + if outOfSync { + if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, reservingRemote)); err != nil { + log.V(2).Error(err, "Deleting out of sync remote objects", "remote", reservingRemote) + return reconcile.Result{}, err + } + group.remotes[reservingRemote] = nil + + // check if another one exists + hasReserving, reservingRemote = group.FirstReserving() + } + } + + // If there is no reserving and the AC is ready, the connection with the reserving remote might + // be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that. + if !hasReserving && acs.State == kueue.CheckStateReady { + if time.Now().Before(acs.LastTransitionTime.Add(a.keepReadyTimeout)) { + retryAfter := a.keepReadyTimeout - time.Since(acs.LastTransitionTime.Time) + log.V(3).Info("Reserving remote lost, retry", "retryAfter", retryAfter) + return reconcile.Result{RequeueAfter: retryAfter}, nil + } else { + acs.State = kueue.CheckStateRetry + acs.Message = "Reserving remote lost" + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) + } + } + // 2. delete all workloads that are out of sync or are not in the chosen worker for rem, remWl := range group.remotes { if remWl == nil { @@ -294,8 +339,9 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error if outOfSync || notReservingRemote { if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) - return err + return reconcile.Result{}, err } + group.remotes[rem] = nil } } @@ -305,7 +351,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, a.origin); err != nil { log.V(2).Error(err, "creating remote controller object", "remote", reservingRemote) // We'll retry this in the next reconcile. - return err + return reconcile.Result{}, err } if acs.State != kueue.CheckStateRetry && acs.State != kueue.CheckStateRejected { @@ -316,15 +362,19 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error } // update the message acs.Message = fmt.Sprintf("The workload got reservation on %q", reservingRemote) + // update the transition time + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) err := a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) if err != nil { - return err + return reconcile.Result{}, err } } - // drop this if we want to create new remote workloads while holding a reservation - return nil + // drop this if we want to create new remote workloads while holding a reservation. + // check again the connection to the remote in half the keepReadyTimeout. + return reconcile.Result{RequeueAfter: a.keepReadyTimeout / 2}, nil } // finally - create missing workloads @@ -340,15 +390,16 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) error } } } - return errors.Join(errs...) + return reconcile.Result{}, errors.Join(errs...) } -func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string) *wlReconciler { +func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, keepReadyTimeout time.Duration) *wlReconciler { return &wlReconciler{ - client: c, - helper: helper, - clusters: cRec, - origin: origin, + client: c, + helper: helper, + clusters: cRec, + origin: origin, + keepReadyTimeout: keepReadyTimeout, } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index a5a70d696a..5cb8543920 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -20,6 +20,7 @@ import ( "context" "errors" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" @@ -423,6 +424,185 @@ func TestWlReconcile(t *testing.T) { Obj(), }, }, + "the local workload is kept Ready if the keepReadyTimeout did not pass since last successful sync": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaultKeepReadyTimeout / 2)), //50% of the timeout + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + }, + "the local workload is set for Retry if the keepReadyTimeout did pass since last successful sync": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateReady, + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaultKeepReadyTimeout * 3 / 2)), // 150% of the timeout + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRetry, + Message: `Reserving remote lost`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + }, + "the local workload is set to pendin when it is in Retry without quota": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStateRetry, + Message: `Reserving remote lost`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + }, + "worker reconnects after the local workload is requeued, remote objects are deleted": { + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `Requeued`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + Obj(), + }, + + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, + "worker reconnects after the local workload is requeued and got reservation on a second worker": { + // the worker with the oldest reservation is kept + reconcileFor: "wl1", + managersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `The workload got reservation on "worker2"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + managersJobs: []batchv1.Job{ + *baseJobBuilder.Clone().Obj(), + }, + worker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(time.Now().Add(-time.Hour)). // one hour ago + Obj(), + }, + worker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + useSecondWorker: true, + worker2Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + QuotaReservedTime(time.Now().Add(-time.Minute)). //one minute ago + Obj(), + }, + worker2Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + wantManagersWorkloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + AdmissionCheck(kueue.AdmissionCheckState{ + Name: "ac1", + State: kueue.CheckStatePending, + Message: `The workload got reservation on "worker1"`, + }). + ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantManagersJobs: []batchv1.Job{ + *baseJobBuilder.Clone().Obj(), + }, + wantWorker1Workloads: []kueue.Workload{ + *baseWorkloadBuilder.Clone(). + Label(kueuealpha.MultiKueueOriginLabel, defaultOrigin). + ReserveQuota(utiltesting.MakeAdmission("q1").Obj()). + Obj(), + }, + wantWorker1Jobs: []batchv1.Job{ + *baseJobBuilder.Clone(). + Label(constants.PrebuiltWorkloadLabel, "wl1"). + Obj(), + }, + }, } for name, tc := range cases { @@ -488,7 +668,7 @@ func TestWlReconcile(t *testing.T) { } helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultKeepReadyTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor, Namespace: TestNamespace}}) if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { diff --git a/pkg/util/testing/wrappers.go b/pkg/util/testing/wrappers.go index e2d7ec01c8..7e802a4c05 100644 --- a/pkg/util/testing/wrappers.go +++ b/pkg/util/testing/wrappers.go @@ -127,6 +127,13 @@ func (w *WorkloadWrapper) ReserveQuota(a *kueue.Admission) *WorkloadWrapper { return w } +// QuotaReservedTime - sets the LastTransitionTime of the QuotaReserved condition if found. +func (w *WorkloadWrapper) QuotaReservedTime(t time.Time) *WorkloadWrapper { + cond := apimeta.FindStatusCondition(w.Status.Conditions, kueue.WorkloadQuotaReserved) + cond.LastTransitionTime = metav1.NewTime(t) + return w +} + func (w *WorkloadWrapper) Admitted(a bool) *WorkloadWrapper { cond := metav1.Condition{ Type: kueue.WorkloadAdmitted, diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index bf605300bb..9566d0ade6 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -512,6 +512,14 @@ remote objects that ware created by this multikueue manager cluster and delete them if their local counterpart no longer exists.

+keepReadyTimeout
+k8s.io/apimachinery/pkg/apis/meta/v1.Duration + + +

KeepReadyTimeout defines the time a local workload's multikueue admission check state is kept Ready +if the connection with its reserving worker cluster is lost.

+ + diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 8aedf08e27..526acd6ef1 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -17,6 +17,8 @@ limitations under the License. package multikueue import ( + "time" + "github.com/google/go-cmp/cmp/cmpopts" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -560,4 +562,205 @@ var _ = ginkgo.Describe("Multikueue", func() { }) }) + ginkgo.It("Should requeue the workload with a delay when the connection to the admitting worker is lost", func() { + jobSet := testingjobset.MakeJobSet("job-set", managerNs.Name). + Queue(managerLq.Name). + ReplicatedJobs( + testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-1", + Replicas: 1, + Parallelism: 1, + Completions: 1, + }, testingjobset.ReplicatedJobRequirements{ + Name: "replicated-job-2", + Replicas: 3, + Parallelism: 1, + Completions: 1, + }, + ). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, jobSet)).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name), Namespace: managerNs.Name} + + admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( + kueue.PodSetAssignment{ + Name: "replicated-job-1", + }, kueue.PodSetAssignment{ + Name: "replicated-job-2", + }, + ).Obj() + + ginkgo.By("setting workload reservation in the management cluster", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager amd worker1 wl is removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).NotTo(gomega.BeNil()) + g.Expect(acs.State).To(gomega.Equal(kueue.CheckStateReady)) + g.Expect(acs.Message).To(gomega.Equal(`The workload got reservation on "worker2"`)) + + g.Expect(apimeta.FindStatusCondition(createdWorkload.Status.Conditions, kueue.WorkloadAdmitted)).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueue.WorkloadAdmitted, + Status: metav1.ConditionTrue, + Reason: "Admitted", + Message: "The workload is admitted", + }, cmpopts.IgnoreFields(metav1.Condition{}, "LastTransitionTime"))) + + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + var disconnectedTime time.Time + ginkgo.By("breaking the connection to worker2", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + createdCluster.Spec.KubeConfig.Location = "bad-secret" + g.Expect(managerTestCluster.client.Update(managerTestCluster.ctx, createdCluster)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + activeCondition := apimeta.FindStatusCondition(createdCluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) + g.Expect(activeCondition).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueuealpha.MultiKueueClusterActive, + Status: metav1.ConditionFalse, + Reason: "BadConfig", + }, cmpopts.IgnoreFields(metav1.Condition{}, "Message", "LastTransitionTime"))) + disconnectedTime = activeCondition.LastTransitionTime.Time + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("waiting for the local workload admission check state to be set to pending and quotaReservatio removed", func() { + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + acs := workload.FindAdmissionCheck(createdWorkload.Status.AdmissionChecks, multikueueAC.Name) + g.Expect(acs).To(gomega.BeComparableTo(&kueue.AdmissionCheckState{ + Name: multikueueAC.Name, + State: kueue.CheckStatePending, + }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "Message"))) + + // The transition interval should be close to testingKeepReadyTimeout (taking into account the resolution of the LastTransitionTime field) + g.Expect(acs.LastTransitionTime.Time.Sub(disconnectedTime)).To(gomega.BeComparableTo(testingKeepReadyTimeout, + cmpopts.EquateApproxTime(time.Second))) + + g.Expect(apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)).To(gomega.BeFalse()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("restoring the connection to worker2", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + createdCluster.Spec.KubeConfig.Location = managerMultikueueSecret2.Name + g.Expect(managerTestCluster.client.Update(managerTestCluster.ctx, createdCluster)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdCluster := &kueuealpha.MultiKueueCluster{} + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, client.ObjectKeyFromObject(workerCluster2), createdCluster)).To(gomega.Succeed()) + activeCondition := apimeta.FindStatusCondition(createdCluster.Status.Conditions, kueuealpha.MultiKueueClusterActive) + g.Expect(activeCondition).To(gomega.BeComparableTo(&metav1.Condition{ + Type: kueuealpha.MultiKueueClusterActive, + Status: metav1.ConditionTrue, + Reason: "Active", + }, cmpopts.IgnoreFields(metav1.Condition{}, "Message", "LastTransitionTime"))) + disconnectedTime = activeCondition.LastTransitionTime.Time + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("the worker2 wl is removed since the local one no longer has a reservation", func() { + gomega.Eventually(func(g gomega.Gomega) { + createdWorkload := &kueue.Workload{} + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + + ginkgo.It("Should remove the worker's workload and job when managers job is deleted", func() { + job := testingjob.MakeJob("job", managerNs.Name). + Queue(managerLq.Name). + Obj() + gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed()) + + createdWorkload := &kueue.Workload{} + wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: managerNs.Name} + + ginkgo.By("setting workload reservation in the management cluster", func() { + admission := utiltesting.MakeAdmission(managerCq.Name).Obj() + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("checking the workload creation in the worker clusters", func() { + managerWl := &kueue.Workload{} + gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("setting workload reservation in worker1, the job is created in worker1", func() { + admission := utiltesting.MakeAdmission(managerCq.Name).Obj() + + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + }, util.Timeout, util.Interval).Should(gomega.Succeed()) + }) + + ginkgo.By("removing the managers job and workload, the workload and job in worker1 are removed", func() { + gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, job)).Should(gomega.Succeed()) + gomega.Eventually(func(g gomega.Gomega) { + g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) + g.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, createdWorkload)).To(gomega.Succeed()) + + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + + gomega.Eventually(func(g gomega.Gomega) { + createdJob := batchv1.Job{} + g.Expect(worker1TestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) + g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) + }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) + }) + }) + }) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index ed145ed1fd..46ccc63861 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -46,6 +46,10 @@ import ( // +kubebuilder:scaffold:imports ) +const ( + testingKeepReadyTimeout = 3 * time.Second +) + type cluster struct { cfg *rest.Config client client.Client @@ -173,6 +177,9 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) { err := multikueue.SetupIndexer(ctx, mgr.GetFieldIndexer(), managersConfigNamespace.Name) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(2*time.Second)) + err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, + multikueue.WithGCInterval(2*time.Second), + multikueue.WithKeepReadyTimeout(testingKeepReadyTimeout), + ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } From 536aef859633cfa578dcbeb042a28bafc6d48a57 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Mon, 19 Feb 2024 15:36:15 +0200 Subject: [PATCH 3/6] Review Remarks --- apis/config/v1beta1/configuration_types.go | 4 +- apis/config/v1beta1/defaults.go | 6 +- apis/config/v1beta1/defaults_test.go | 24 ++++---- apis/config/v1beta1/zz_generated.deepcopy.go | 4 +- cmd/kueue/main.go | 2 +- cmd/kueue/main_test.go | 6 +- pkg/config/config_test.go | 20 +++---- .../admissionchecks/multikueue/controllers.go | 22 +++---- .../multikueue/jobset_adapter_test.go | 2 +- .../admissionchecks/multikueue/workload.go | 34 +++++------ .../multikueue/workload_test.go | 6 +- .../en/docs/reference/kueue-config.v1beta1.md | 4 +- .../integration/multikueue/multikueue_test.go | 59 ------------------- test/integration/multikueue/suite_test.go | 2 +- 14 files changed, 68 insertions(+), 127 deletions(-) diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index 3e8dc037c7..f149264157 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -216,10 +216,10 @@ type MultiKueue struct { // +optional Origin *string `json:"origin,omitempty"` - // KeepReadyTimeout defines the time a local workload's multikueue admission check state is kept Ready + // WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready // if the connection with its reserving worker cluster is lost. // +optional - KeepReadyTimeout *metav1.Duration `json:"keepReadyTimeout,omitempty"` + WorkerLostTimeout *metav1.Duration `json:"workerLostTimeout,omitempty"` } type RequeuingStrategy struct { diff --git a/apis/config/v1beta1/defaults.go b/apis/config/v1beta1/defaults.go index bda9ab7319..8f11d2cd87 100644 --- a/apis/config/v1beta1/defaults.go +++ b/apis/config/v1beta1/defaults.go @@ -46,7 +46,7 @@ const ( defaultJobFrameworkName = "batch/job" DefaultMultiKueueGCInterval = time.Minute DefaultMultiKueueOrigin = "multikueue" - DefaultMultiKueueKeepReadyTimeout = 5 * time.Minute + DefaultMultiKueueWorkerLostTimeout = 15 * time.Minute ) func getOperatorNamespace() string { @@ -176,7 +176,7 @@ func SetDefaults_Configuration(cfg *Configuration) { if ptr.Deref(cfg.MultiKueue.Origin, "") == "" { cfg.MultiKueue.Origin = ptr.To(DefaultMultiKueueOrigin) } - if cfg.MultiKueue.KeepReadyTimeout == nil { - cfg.MultiKueue.KeepReadyTimeout = &metav1.Duration{Duration: DefaultMultiKueueKeepReadyTimeout} + if cfg.MultiKueue.WorkerLostTimeout == nil { + cfg.MultiKueue.WorkerLostTimeout = &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout} } } diff --git a/apis/config/v1beta1/defaults_test.go b/apis/config/v1beta1/defaults_test.go index 7cddcc7e24..29cd949c9e 100644 --- a/apis/config/v1beta1/defaults_test.go +++ b/apis/config/v1beta1/defaults_test.go @@ -97,9 +97,9 @@ func TestSetDefaults_Configuration(t *testing.T) { } defaultMultiKueue := &MultiKueue{ - GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, - Origin: ptr.To(DefaultMultiKueueOrigin), - KeepReadyTimeout: &metav1.Duration{Duration: DefaultMultiKueueKeepReadyTimeout}, + GCInterval: &metav1.Duration{Duration: DefaultMultiKueueGCInterval}, + Origin: ptr.To(DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: DefaultMultiKueueWorkerLostTimeout}, } podsReadyTimeoutTimeout := metav1.Duration{Duration: defaultPodsReadyTimeout} @@ -499,9 +499,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Enable: ptr.To(false), }, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), - KeepReadyTimeout: &metav1.Duration{Duration: time.Minute}, + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, }, }, want: &Configuration{ @@ -514,9 +514,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{Duration: time.Second}, - Origin: ptr.To("multikueue-manager1"), - KeepReadyTimeout: &metav1.Duration{Duration: time.Minute}, + GCInterval: &metav1.Duration{Duration: time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: time.Minute}, }, }, }, @@ -540,9 +540,9 @@ func TestSetDefaults_Configuration(t *testing.T) { Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &MultiKueue{ - GCInterval: &metav1.Duration{}, - Origin: ptr.To("multikueue-manager1"), - KeepReadyTimeout: &metav1.Duration{Duration: 5 * time.Minute}, + GCInterval: &metav1.Duration{}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: 15 * time.Minute}, }, }, }, diff --git a/apis/config/v1beta1/zz_generated.deepcopy.go b/apis/config/v1beta1/zz_generated.deepcopy.go index 696f478b0f..000f346d73 100644 --- a/apis/config/v1beta1/zz_generated.deepcopy.go +++ b/apis/config/v1beta1/zz_generated.deepcopy.go @@ -300,8 +300,8 @@ func (in *MultiKueue) DeepCopyInto(out *MultiKueue) { *out = new(string) **out = **in } - if in.KeepReadyTimeout != nil { - in, out := &in.KeepReadyTimeout, &out.KeepReadyTimeout + if in.WorkerLostTimeout != nil { + in, out := &in.WorkerLostTimeout, &out.WorkerLostTimeout *out = new(v1.Duration) **out = **in } diff --git a/cmd/kueue/main.go b/cmd/kueue/main.go index e6b87a3739..8412bc4c08 100644 --- a/cmd/kueue/main.go +++ b/cmd/kueue/main.go @@ -250,7 +250,7 @@ func setupControllers(mgr ctrl.Manager, cCache *cache.Cache, queues *queue.Manag if err := multikueue.SetupControllers(mgr, *cfg.Namespace, multikueue.WithGCInterval(cfg.MultiKueue.GCInterval.Duration), multikueue.WithOrigin(ptr.Deref(cfg.MultiKueue.Origin, configapi.DefaultMultiKueueOrigin)), - multikueue.WithKeepReadyTimeout(cfg.MultiKueue.KeepReadyTimeout.Duration), + multikueue.WithWorkerLostTimeout(cfg.MultiKueue.WorkerLostTimeout.Duration), ); err != nil { setupLog.Error(err, "Could not setup MultiKueue controller") os.Exit(1) diff --git a/cmd/kueue/main_test.go b/cmd/kueue/main_test.go index 94507c205f..25d78524a3 100644 --- a/cmd/kueue/main_test.go +++ b/cmd/kueue/main_test.go @@ -113,9 +113,9 @@ integrations: }, }, MultiKueue: &config.MultiKueue{ - GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, - Origin: ptr.To(config.DefaultMultiKueueOrigin), - KeepReadyTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueKeepReadyTimeout}, + GCInterval: &metav1.Duration{Duration: config.DefaultMultiKueueGCInterval}, + Origin: ptr.To(config.DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: config.DefaultMultiKueueWorkerLostTimeout}, }, }, }, diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 7e58742f50..4caa3d90ea 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -264,7 +264,7 @@ namespace: kueue-system multiKueue: gcInterval: 1m30s origin: multikueue-manager1 - keepReadyTimeout: 10m + workerLostTimeout: 10m `), os.FileMode(0600)); err != nil { t.Fatal(err) } @@ -334,9 +334,9 @@ multiKueue: } defaultMultiKueue := &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, - Origin: ptr.To(configapi.DefaultMultiKueueOrigin), - KeepReadyTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueKeepReadyTimeout}, + GCInterval: &metav1.Duration{Duration: configapi.DefaultMultiKueueGCInterval}, + Origin: ptr.To(configapi.DefaultMultiKueueOrigin), + WorkerLostTimeout: &metav1.Duration{Duration: configapi.DefaultMultiKueueWorkerLostTimeout}, } testcases := []struct { @@ -802,9 +802,9 @@ multiKueue: Integrations: defaultIntegrations, QueueVisibility: defaultQueueVisibility, MultiKueue: &configapi.MultiKueue{ - GCInterval: &metav1.Duration{Duration: 90 * time.Second}, - Origin: ptr.To("multikueue-manager1"), - KeepReadyTimeout: &metav1.Duration{Duration: 10 * time.Minute}, + GCInterval: &metav1.Duration{Duration: 90 * time.Second}, + Origin: ptr.To("multikueue-manager1"), + WorkerLostTimeout: &metav1.Duration{Duration: 10 * time.Minute}, }, }, wantOptions: defaultControlOptions, @@ -917,9 +917,9 @@ func TestEncode(t *testing.T) { "clusterQueues": map[string]any{"maxCount": int64(10)}, }, "multiKueue": map[string]any{ - "gcInterval": "1m0s", - "origin": "multikueue", - "keepReadyTimeout": "5m0s", + "gcInterval": "1m0s", + "origin": "multikueue", + "workerLostTimeout": "15m0s", }, }, }, diff --git a/pkg/controller/admissionchecks/multikueue/controllers.go b/pkg/controller/admissionchecks/multikueue/controllers.go index 27efe82010..d1657641ec 100644 --- a/pkg/controller/admissionchecks/multikueue/controllers.go +++ b/pkg/controller/admissionchecks/multikueue/controllers.go @@ -25,13 +25,13 @@ import ( const ( defaultGCInterval = time.Minute defaultOrigin = "multikueue" - defaultKeepReadyTimeout = 5 * time.Minute + defaulWorkerLostTimeout = 5 * time.Minute ) type SetupOptions struct { - gcInterval time.Duration - origin string - keepReadyTimeout time.Duration + gcInterval time.Duration + origin string + workerLostTimeout time.Duration } type SetupOption func(o *SetupOptions) @@ -51,20 +51,20 @@ func WithOrigin(origin string) SetupOption { } } -// WithKeepReadyTimeout - sets the time for which the multikueue +// WithWorkerLostTimeout - sets the time for which the multikueue // admission check is kept in Ready state after the connection to // the admitting worker cluster is lost. -func WithKeepReadyTimeout(d time.Duration) SetupOption { +func WithWorkerLostTimeout(d time.Duration) SetupOption { return func(o *SetupOptions) { - o.keepReadyTimeout = d + o.workerLostTimeout = d } } func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) error { options := &SetupOptions{ - gcInterval: defaultGCInterval, - origin: defaultOrigin, - keepReadyTimeout: defaultKeepReadyTimeout, + gcInterval: defaultGCInterval, + origin: defaultOrigin, + workerLostTimeout: defaulWorkerLostTimeout, } for _, o := range opts { @@ -88,6 +88,6 @@ func SetupControllers(mgr ctrl.Manager, namespace string, opts ...SetupOption) e return err } - wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.keepReadyTimeout) + wlRec := newWlReconciler(mgr.GetClient(), helper, cRec, options.origin, options.workerLostTimeout) return wlRec.setupWithManager(mgr) } diff --git a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go index df8db2cf8c..2b8c45e1d5 100644 --- a/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go +++ b/pkg/controller/admissionchecks/multikueue/jobset_adapter_test.go @@ -337,7 +337,7 @@ func TestWlReconcileJobset(t *testing.T) { cRec.remoteClients["worker1"] = w1remoteClient helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultKeepReadyTimeout) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: "wl1", Namespace: TestNamespace}}) if gotErr != nil { diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 2c79b29f89..135527341e 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -54,11 +54,11 @@ var ( ) type wlReconciler struct { - client client.Client - helper *multiKueueStoreHelper - clusters *clustersReconciler - origin string - keepReadyTimeout time.Duration + client client.Client + helper *multiKueueStoreHelper + clusters *clustersReconciler + origin string + workerLostTimeout time.Duration } var _ reconcile.Reconciler = (*wlReconciler)(nil) @@ -315,10 +315,10 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco // If there is no reserving and the AC is ready, the connection with the reserving remote might // be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that. if !hasReserving && acs.State == kueue.CheckStateReady { - if time.Now().Before(acs.LastTransitionTime.Add(a.keepReadyTimeout)) { - retryAfter := a.keepReadyTimeout - time.Since(acs.LastTransitionTime.Time) - log.V(3).Info("Reserving remote lost, retry", "retryAfter", retryAfter) - return reconcile.Result{RequeueAfter: retryAfter}, nil + remainingWaitTime := a.workerLostTimeout - time.Since(acs.LastTransitionTime.Time) + if remainingWaitTime > 0 { + log.V(3).Info("Reserving remote lost, retry", "retryAfter", remainingWaitTime) + return reconcile.Result{RequeueAfter: remainingWaitTime}, nil } else { acs.State = kueue.CheckStateRetry acs.Message = "Reserving remote lost" @@ -373,8 +373,8 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco } } // drop this if we want to create new remote workloads while holding a reservation. - // check again the connection to the remote in half the keepReadyTimeout. - return reconcile.Result{RequeueAfter: a.keepReadyTimeout / 2}, nil + // check again the connection to the remote in workerLostTimeout. + return reconcile.Result{RequeueAfter: a.workerLostTimeout}, nil } // finally - create missing workloads @@ -393,13 +393,13 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco return reconcile.Result{}, errors.Join(errs...) } -func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, keepReadyTimeout time.Duration) *wlReconciler { +func newWlReconciler(c client.Client, helper *multiKueueStoreHelper, cRec *clustersReconciler, origin string, workerLostTimeout time.Duration) *wlReconciler { return &wlReconciler{ - client: c, - helper: helper, - clusters: cRec, - origin: origin, - keepReadyTimeout: keepReadyTimeout, + client: c, + helper: helper, + clusters: cRec, + origin: origin, + workerLostTimeout: workerLostTimeout, } } diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index 5cb8543920..e424f64eff 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -431,7 +431,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{ Name: "ac1", State: kueue.CheckStateReady, - LastTransitionTime: metav1.NewTime(time.Now().Add(-defaultKeepReadyTimeout / 2)), //50% of the timeout + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaulWorkerLostTimeout / 2)), //50% of the timeout Message: `The workload got reservation on "worker1"`, }). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). @@ -457,7 +457,7 @@ func TestWlReconcile(t *testing.T) { AdmissionCheck(kueue.AdmissionCheckState{ Name: "ac1", State: kueue.CheckStateReady, - LastTransitionTime: metav1.NewTime(time.Now().Add(-defaultKeepReadyTimeout * 3 / 2)), // 150% of the timeout + LastTransitionTime: metav1.NewTime(time.Now().Add(-defaulWorkerLostTimeout * 3 / 2)), // 150% of the timeout Message: `The workload got reservation on "worker1"`, }). ControllerReference(batchv1.SchemeGroupVersion.WithKind("Job"), "job1", "uid1"). @@ -668,7 +668,7 @@ func TestWlReconcile(t *testing.T) { } helper, _ := newMultiKueueStoreHelper(managerClient) - reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaultKeepReadyTimeout) + reconciler := newWlReconciler(managerClient, helper, cRec, defaultOrigin, defaulWorkerLostTimeout) _, gotErr := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: types.NamespacedName{Name: tc.reconcileFor, Namespace: TestNamespace}}) if diff := cmp.Diff(tc.wantError, gotErr, cmpopts.EquateErrors()); diff != "" { diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index 9566d0ade6..77bb918010 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -512,11 +512,11 @@ remote objects that ware created by this multikueue manager cluster and delete them if their local counterpart no longer exists.

-keepReadyTimeout
+workerLostTimeout
k8s.io/apimachinery/pkg/apis/meta/v1.Duration -

KeepReadyTimeout defines the time a local workload's multikueue admission check state is kept Ready +

WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready if the connection with its reserving worker cluster is lost.

diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 526acd6ef1..728f768bb8 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -704,63 +704,4 @@ var _ = ginkgo.Describe("Multikueue", func() { }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) }) }) - - ginkgo.It("Should remove the worker's workload and job when managers job is deleted", func() { - job := testingjob.MakeJob("job", managerNs.Name). - Queue(managerLq.Name). - Obj() - gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, job)).Should(gomega.Succeed()) - - createdWorkload := &kueue.Workload{} - wlLookupKey := types.NamespacedName{Name: workloadjob.GetWorkloadNameForJob(job.Name), Namespace: managerNs.Name} - - ginkgo.By("setting workload reservation in the management cluster", func() { - admission := utiltesting.MakeAdmission(managerCq.Name).Obj() - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(util.SetQuotaReservation(managerTestCluster.ctx, managerTestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - - ginkgo.By("checking the workload creation in the worker clusters", func() { - managerWl := &kueue.Workload{} - gomega.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, managerWl)).To(gomega.Succeed()) - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) - g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(createdWorkload.Spec).To(gomega.BeComparableTo(managerWl.Spec)) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - - ginkgo.By("setting workload reservation in worker1, the job is created in worker1", func() { - admission := utiltesting.MakeAdmission(managerCq.Name).Obj() - - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(util.SetQuotaReservation(worker1TestCluster.ctx, worker1TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - - gomega.Eventually(func(g gomega.Gomega) { - createdJob := batchv1.Job{} - g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) - }, util.Timeout, util.Interval).Should(gomega.Succeed()) - }) - - ginkgo.By("removing the managers job and workload, the workload and job in worker1 are removed", func() { - gomega.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, job)).Should(gomega.Succeed()) - gomega.Eventually(func(g gomega.Gomega) { - g.Expect(managerTestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) - g.Expect(managerTestCluster.client.Delete(managerTestCluster.ctx, createdWorkload)).To(gomega.Succeed()) - - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) - - gomega.Eventually(func(g gomega.Gomega) { - createdJob := batchv1.Job{} - g.Expect(worker1TestCluster.client.Get(managerTestCluster.ctx, wlLookupKey, createdWorkload)).To(utiltesting.BeNotFoundError()) - g.Expect(worker1TestCluster.client.Get(worker1TestCluster.ctx, client.ObjectKeyFromObject(job), &createdJob)).To(gomega.Succeed()) - }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) - }) - }) - }) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 46ccc63861..09a84c3232 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -179,7 +179,7 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) { err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(2*time.Second), - multikueue.WithKeepReadyTimeout(testingKeepReadyTimeout), + multikueue.WithWorkerLostTimeout(testingKeepReadyTimeout), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } From ee847474cf2b7511ef1eaf8195c858492b1af269 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 21 Feb 2024 21:53:11 +0200 Subject: [PATCH 4/6] Refactor reconcileGroup. --- .../admissionchecks/multikueue/workload.go | 70 ++++++++----------- .../multikueue/workload_test.go | 6 +- .../integration/multikueue/multikueue_test.go | 5 +- test/integration/multikueue/suite_test.go | 4 +- 4 files changed, 35 insertions(+), 50 deletions(-) diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index 135527341e..b1293321e1 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -295,48 +295,9 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName+"-finish"), client.ForceOwnership) } - hasReserving, reservingRemote := group.FirstReserving() - - // if the reserving workload is out of sync, remove it - if hasReserving { - outOfSync := group.local == nil || !equality.Semantic.DeepEqual(group.local.Spec, group.remotes[reservingRemote].Spec) - if outOfSync { - if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, reservingRemote)); err != nil { - log.V(2).Error(err, "Deleting out of sync remote objects", "remote", reservingRemote) - return reconcile.Result{}, err - } - group.remotes[reservingRemote] = nil - - // check if another one exists - hasReserving, reservingRemote = group.FirstReserving() - } - } - - // If there is no reserving and the AC is ready, the connection with the reserving remote might - // be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that. - if !hasReserving && acs.State == kueue.CheckStateReady { - remainingWaitTime := a.workerLostTimeout - time.Since(acs.LastTransitionTime.Time) - if remainingWaitTime > 0 { - log.V(3).Info("Reserving remote lost, retry", "retryAfter", remainingWaitTime) - return reconcile.Result{RequeueAfter: remainingWaitTime}, nil - } else { - acs.State = kueue.CheckStateRetry - acs.Message = "Reserving remote lost" - acs.LastTransitionTime = metav1.NewTime(time.Now()) - wlPatch := workload.BaseSSAWorkload(group.local) - workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) - return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) - } - } - // 2. delete all workloads that are out of sync or are not in the chosen worker for rem, remWl := range group.remotes { - if remWl == nil { - continue - } - outOfSync := group.local == nil || !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) - notReservingRemote := hasReserving && reservingRemote != rem - if outOfSync || notReservingRemote { + if remWl != nil && !equality.Semantic.DeepEqual(group.local.Spec, remWl.Spec) { if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) return reconcile.Result{}, err @@ -346,7 +307,19 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco } // 3. get the first reserving + hasReserving, reservingRemote := group.FirstReserving() if hasReserving { + // remove the non-reserving worker workloads + for rem, remWl := range group.remotes { + if remWl != nil && rem != reservingRemote { + if err := client.IgnoreNotFound(group.RemoveRemoteObjects(ctx, rem)); err != nil { + log.V(2).Error(err, "Deleting out of sync remote objects", "remote", rem) + return reconcile.Result{}, err + } + group.remotes[rem] = nil + } + } + acs := workload.FindAdmissionCheck(group.local.Status.AdmissionChecks, group.acName) if err := group.jobAdapter.SyncJob(ctx, a.client, group.remoteClients[reservingRemote].client, group.controllerKey, group.local.Name, a.origin); err != nil { log.V(2).Error(err, "creating remote controller object", "remote", reservingRemote) @@ -372,9 +345,22 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco return reconcile.Result{}, err } } - // drop this if we want to create new remote workloads while holding a reservation. - // check again the connection to the remote in workerLostTimeout. return reconcile.Result{RequeueAfter: a.workerLostTimeout}, nil + } else if acs.State == kueue.CheckStateReady { + // If there is no reserving and the AC is ready, the connection with the reserving remote might + // be lost, keep the workload admitted for keepReadyTimeout and put it back in the queue after that. + remainingWaitTime := a.workerLostTimeout - time.Since(acs.LastTransitionTime.Time) + if remainingWaitTime > 0 { + log.V(3).Info("Reserving remote lost, retry", "retryAfter", remainingWaitTime) + return reconcile.Result{RequeueAfter: remainingWaitTime}, nil + } else { + acs.State = kueue.CheckStateRetry + acs.Message = "Reserving remote lost" + acs.LastTransitionTime = metav1.NewTime(time.Now()) + wlPatch := workload.BaseSSAWorkload(group.local) + workload.SetAdmissionCheckState(&wlPatch.Status.AdmissionChecks, *acs) + return reconcile.Result{}, a.client.Status().Patch(ctx, wlPatch, client.Apply, client.FieldOwner(ControllerName), client.ForceOwnership) + } } // finally - create missing workloads diff --git a/pkg/controller/admissionchecks/multikueue/workload_test.go b/pkg/controller/admissionchecks/multikueue/workload_test.go index e424f64eff..92bddda163 100644 --- a/pkg/controller/admissionchecks/multikueue/workload_test.go +++ b/pkg/controller/admissionchecks/multikueue/workload_test.go @@ -424,7 +424,7 @@ func TestWlReconcile(t *testing.T) { Obj(), }, }, - "the local workload is kept Ready if the keepReadyTimeout did not pass since last successful sync": { + "the local workload admission check Ready if the remote WorkerLostTimeout is not exceeded": { reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). @@ -450,7 +450,7 @@ func TestWlReconcile(t *testing.T) { Obj(), }, }, - "the local workload is set for Retry if the keepReadyTimeout did pass since last successful sync": { + "the local workload's admission check is set to Retry if the WorkerLostTimeout is exceeded": { reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). @@ -476,7 +476,7 @@ func TestWlReconcile(t *testing.T) { Obj(), }, }, - "the local workload is set to pendin when it is in Retry without quota": { + "the local workload's admission check is set to Pending when it is in Retry without quota": { reconcileFor: "wl1", managersWorkloads: []kueue.Workload{ *baseWorkloadBuilder.Clone(). diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 728f768bb8..670538f48e 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -610,7 +610,7 @@ var _ = ginkgo.Describe("Multikueue", func() { }, util.Timeout, util.Interval).Should(gomega.Succeed()) }) - ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager amd worker1 wl is removed", func() { + ginkgo.By("setting workload reservation in worker2, the workload is admitted in manager and worker1 wl is removed", func() { gomega.Eventually(func(g gomega.Gomega) { g.Expect(worker2TestCluster.client.Get(worker2TestCluster.ctx, wlLookupKey, createdWorkload)).To(gomega.Succeed()) g.Expect(util.SetQuotaReservation(worker2TestCluster.ctx, worker2TestCluster.client, createdWorkload, admission)).To(gomega.Succeed()) @@ -669,8 +669,7 @@ var _ = ginkgo.Describe("Multikueue", func() { }, cmpopts.IgnoreFields(kueue.AdmissionCheckState{}, "LastTransitionTime", "Message"))) // The transition interval should be close to testingKeepReadyTimeout (taking into account the resolution of the LastTransitionTime field) - g.Expect(acs.LastTransitionTime.Time.Sub(disconnectedTime)).To(gomega.BeComparableTo(testingKeepReadyTimeout, - cmpopts.EquateApproxTime(time.Second))) + g.Expect(acs.LastTransitionTime.Time).To(gomega.BeComparableTo(disconnectedTime.Add(testingWorkerLostTimeout), cmpopts.EquateApproxTime(2*time.Second))) g.Expect(apimeta.IsStatusConditionTrue(createdWorkload.Status.Conditions, kueue.WorkloadQuotaReserved)).To(gomega.BeFalse()) }, util.LongTimeout, util.Interval).Should(gomega.Succeed()) diff --git a/test/integration/multikueue/suite_test.go b/test/integration/multikueue/suite_test.go index 09a84c3232..8a021bd0b6 100644 --- a/test/integration/multikueue/suite_test.go +++ b/test/integration/multikueue/suite_test.go @@ -47,7 +47,7 @@ import ( ) const ( - testingKeepReadyTimeout = 3 * time.Second + testingWorkerLostTimeout = 3 * time.Second ) type cluster struct { @@ -179,7 +179,7 @@ func managerAndMultiKueueSetup(mgr manager.Manager, ctx context.Context) { err = multikueue.SetupControllers(mgr, managersConfigNamespace.Name, multikueue.WithGCInterval(2*time.Second), - multikueue.WithWorkerLostTimeout(testingKeepReadyTimeout), + multikueue.WithWorkerLostTimeout(testingWorkerLostTimeout), ) gomega.Expect(err).NotTo(gomega.HaveOccurred()) } From ce1bc97aa0e10005352ee0d4aa971f3e6070a2f0 Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Thu, 22 Feb 2024 22:55:25 +0200 Subject: [PATCH 5/6] Review Remarks. --- apis/config/v1beta1/configuration_types.go | 2 ++ pkg/controller/admissionchecks/multikueue/admissioncheck.go | 2 +- .../admissionchecks/multikueue/admissioncheck_test.go | 2 +- pkg/controller/admissionchecks/multikueue/workload.go | 2 +- site/content/en/docs/reference/kueue-config.v1beta1.md | 1 + 5 files changed, 6 insertions(+), 3 deletions(-) diff --git a/apis/config/v1beta1/configuration_types.go b/apis/config/v1beta1/configuration_types.go index f149264157..918bb967ce 100644 --- a/apis/config/v1beta1/configuration_types.go +++ b/apis/config/v1beta1/configuration_types.go @@ -218,6 +218,8 @@ type MultiKueue struct { // WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready // if the connection with its reserving worker cluster is lost. + // + // Defaults to 15 minutes. // +optional WorkerLostTimeout *metav1.Duration `json:"workerLostTimeout,omitempty"` } diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck.go b/pkg/controller/admissionchecks/multikueue/admissioncheck.go index 46b88bad27..d444272662 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck.go @@ -103,7 +103,7 @@ func (a *ACReconciler) Reconcile(ctx context.Context, req reconcile.Request) (re if unusableClustersCount > 0 { if unusableClustersCount < len(cfg.Spec.Clusters) { // keep it partially active - newCondition.Reason = "PartiallyActive" + newCondition.Reason = "SomeActiveClusters" } else { newCondition.Status = metav1.ConditionFalse newCondition.Reason = "NoUsableClusters" diff --git a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go index 80762c0d7c..2853ea4efa 100644 --- a/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go +++ b/pkg/controller/admissionchecks/multikueue/admissioncheck_test.go @@ -181,7 +181,7 @@ func TestReconcile(t *testing.T) { Condition(metav1.Condition{ Type: kueue.AdmissionCheckActive, Status: metav1.ConditionTrue, - Reason: "PartiallyActive", + Reason: "SomeActiveClusters", Message: "Missing clusters: [worker3], Inactive clusters: [worker1]", }). Obj(), diff --git a/pkg/controller/admissionchecks/multikueue/workload.go b/pkg/controller/admissionchecks/multikueue/workload.go index b1293321e1..5f846fcc9c 100644 --- a/pkg/controller/admissionchecks/multikueue/workload.go +++ b/pkg/controller/admissionchecks/multikueue/workload.go @@ -335,7 +335,7 @@ func (a *wlReconciler) reconcileGroup(ctx context.Context, group *wlGroup) (reco } // update the message acs.Message = fmt.Sprintf("The workload got reservation on %q", reservingRemote) - // update the transition time + // update the transition time since is used to detect the lost worker state. acs.LastTransitionTime = metav1.NewTime(time.Now()) wlPatch := workload.BaseSSAWorkload(group.local) diff --git a/site/content/en/docs/reference/kueue-config.v1beta1.md b/site/content/en/docs/reference/kueue-config.v1beta1.md index 77bb918010..a3016ae344 100644 --- a/site/content/en/docs/reference/kueue-config.v1beta1.md +++ b/site/content/en/docs/reference/kueue-config.v1beta1.md @@ -518,6 +518,7 @@ them if their local counterpart no longer exists.

WorkerLostTimeout defines the time a local workload's multikueue admission check state is kept Ready if the connection with its reserving worker cluster is lost.

+

Defaults to 15 minutes.

From c9a4984c0c2577ab3f0c23c5f81f67e5438dfa4b Mon Sep 17 00:00:00 2001 From: Traian Schiau Date: Wed, 6 Mar 2024 18:56:30 +0200 Subject: [PATCH 6/6] Fix int test after rebase. --- test/integration/multikueue/multikueue_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/integration/multikueue/multikueue_test.go b/test/integration/multikueue/multikueue_test.go index 670538f48e..da6c27bc25 100644 --- a/test/integration/multikueue/multikueue_test.go +++ b/test/integration/multikueue/multikueue_test.go @@ -582,7 +582,7 @@ var _ = ginkgo.Describe("Multikueue", func() { gomega.Expect(managerTestCluster.client.Create(managerTestCluster.ctx, jobSet)).Should(gomega.Succeed()) createdWorkload := &kueue.Workload{} - wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name), Namespace: managerNs.Name} + wlLookupKey := types.NamespacedName{Name: workloadjobset.GetWorkloadNameForJobSet(jobSet.Name, jobSet.UID), Namespace: managerNs.Name} admission := utiltesting.MakeAdmission(managerCq.Name).PodSets( kueue.PodSetAssignment{