diff --git a/cmd/memberagent/main.go b/cmd/memberagent/main.go index 0c8f6f1bb..d8e0e4cdc 100644 --- a/cmd/memberagent/main.go +++ b/cmd/memberagent/main.go @@ -418,7 +418,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb // resource processing. 5, // Use the default worker count (4) for parallelized manifest processing. - parallelizer.DefaultNumOfWorkers, + parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), time.Minute*time.Duration(*deletionWaitTime), *watchWorkWithPriorityQueue, *watchWorkReconcileAgeMinutes, diff --git a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go index 6720e2f35..6176e4926 100644 --- a/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go +++ b/pkg/controllers/internalmembercluster/v1beta1/member_suite_test.go @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil) + workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil) propertyProvider1 = &manuallyUpdatedProvider{} member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1) @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() { // This controller is created for testing purposes only; no reconciliation loop is actually // run. - workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, 1, time.Minute, false, 60, nil) + workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil) member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/controllers/workapplier/controller.go b/pkg/controllers/workapplier/controller.go index 95bda4311..393867554 100644 --- a/pkg/controllers/workapplier/controller.go +++ b/pkg/controllers/workapplier/controller.go @@ -49,7 +49,7 @@ import ( "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" "github.com/kubefleet-dev/kubefleet/pkg/utils/controller" "github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter" - "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer" + parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer" ) const ( @@ -227,7 +227,7 @@ type Reconciler struct { watchWorkReconcileAgeMinutes int deletionWaitTime time.Duration joined *atomic.Bool - parallelizer *parallelizer.Parallerlizer + parallelizer parallelizerutil.Parallelizer requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter } @@ -237,15 +237,20 @@ func NewReconciler( spokeDynamicClient dynamic.Interface, spokeClient client.Client, restMapper meta.RESTMapper, recorder record.EventRecorder, concurrentReconciles int, - workerCount int, + parallelizer parallelizerutil.Parallelizer, deletionWaitTime time.Duration, watchWorkWithPriorityQueue bool, watchWorkReconcileAgeMinutes int, requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter, ) *Reconciler { if requeueRateLimiter == nil { + klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter") requeueRateLimiter = defaultRequeueRateLimiter } + if parallelizer == nil { + klog.V(2).InfoS("parallelizer is not set; using the default parallelizer with a worker count of 1") + parallelizer = parallelizerutil.NewParallelizer(1) + } return &Reconciler{ hubClient: hubClient, @@ -254,7 +259,7 @@ func NewReconciler( restMapper: restMapper, recorder: recorder, concurrentReconciles: concurrentReconciles, - parallelizer: parallelizer.NewParallelizer(workerCount), + parallelizer: parallelizer, watchWorkWithPriorityQueue: watchWorkWithPriorityQueue, watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes, workNameSpace: workNameSpace, diff --git a/pkg/controllers/workapplier/controller_integration_test.go b/pkg/controllers/workapplier/controller_integration_test.go index 7841ad72a..f742f8c02 100644 --- a/pkg/controllers/workapplier/controller_integration_test.go +++ b/pkg/controllers/workapplier/controller_integration_test.go @@ -73,6 +73,7 @@ var ( dummyLabelValue2 = "baz" dummyLabelValue3 = "quz" dummyLabelValue4 = "qux" + dummyLabelValue5 = "quux" ) // createWorkObject creates a new Work object with the given name, manifests, and apply strategy. @@ -470,6 +471,7 @@ func markDeploymentAsAvailable(nsName, deployName string) { } func workStatusUpdated( + memberReservedNSName string, workName string, workConds []metav1.Condition, manifestConds []fleetv1beta1.ManifestCondition, @@ -479,7 +481,7 @@ func workStatusUpdated( return func() error { // Retrieve the Work object. work := &fleetv1beta1.Work{} - if err := hubClient.Get(ctx, client.ObjectKey{Name: workName, Namespace: memberReservedNSName1}, work); err != nil { + if err := hubClient.Get(ctx, client.ObjectKey{Name: workName, Namespace: memberReservedNSName}, work); err != nil { return fmt.Errorf("failed to retrieve the Work object: %w", err) } @@ -878,7 +880,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1060,7 +1062,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1153,7 +1155,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1350,7 +1352,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1530,7 +1532,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1705,7 +1707,7 @@ var _ = Describe("applying manifests", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -1875,7 +1877,7 @@ var _ = Describe("work applier garbage collection", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -2170,7 +2172,7 @@ var _ = Describe("work applier garbage collection", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -2489,7 +2491,7 @@ var _ = Describe("work applier garbage collection", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -2780,7 +2782,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -3062,7 +3064,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -3354,7 +3356,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -3510,7 +3512,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -3754,7 +3756,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -3919,7 +3921,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4285,7 +4287,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4411,7 +4413,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4525,7 +4527,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4651,7 +4653,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4782,7 +4784,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -4909,7 +4911,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5012,7 +5014,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5143,7 +5145,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5236,7 +5238,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") // Track the timestamp that was just after the drift was first detected. @@ -5310,7 +5312,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &driftObservedMustBeforeTimestamp, &firstDriftedMustBeforeTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &driftObservedMustBeforeTimestamp, &firstDriftedMustBeforeTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5471,7 +5473,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5676,7 +5678,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -5940,7 +5942,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6071,7 +6073,7 @@ var _ = Describe("drift detection and takeover", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6183,7 +6185,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6415,7 +6417,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6505,7 +6507,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6720,7 +6722,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -6866,7 +6868,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -7312,7 +7314,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -7458,7 +7460,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -7587,7 +7589,7 @@ var _ = Describe("report diff", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -7823,7 +7825,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, &noLaterThanTimestamp, &noLaterThanTimestamp) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -7921,7 +7923,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8135,7 +8137,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8206,7 +8208,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8424,7 +8426,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8579,7 +8581,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8815,7 +8817,7 @@ var _ = Describe("handling different apply strategies", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") }) @@ -8991,7 +8993,7 @@ var _ = Describe("negative cases", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") Consistently(workStatusUpdatedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Work status changed unexpectedly") }) @@ -9154,7 +9156,7 @@ var _ = Describe("negative cases", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") Consistently(workStatusUpdatedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Work status changed unexpectedly") }) @@ -9336,7 +9338,7 @@ var _ = Describe("negative cases", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") Consistently(workStatusUpdatedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Work status changed unexpectedly") }) @@ -9567,7 +9569,7 @@ var _ = Describe("negative cases", func() { }, } - workStatusUpdatedActual := workStatusUpdated(workName, workConds, manifestConds, nil, nil) + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName1, workName, workConds, manifestConds, nil, nil) Eventually(workStatusUpdatedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to update work status") Consistently(workStatusUpdatedActual, consistentlyDuration, consistentlyInterval).Should(Succeed(), "Work status changed unexpectedly") }) diff --git a/pkg/controllers/workapplier/process.go b/pkg/controllers/workapplier/process.go index 73bfe82c1..82dc5ce18 100644 --- a/pkg/controllers/workapplier/process.go +++ b/pkg/controllers/workapplier/process.go @@ -82,7 +82,7 @@ func (r *Reconciler) processManifests( klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundlesInWave[piece].manifestObj), "work", klog.KObj(work)) } - r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, "processingManifests") + r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, fmt.Sprintf("processingManifestsInWave%d", idx)) } } diff --git a/pkg/controllers/workapplier/suite_test.go b/pkg/controllers/workapplier/suite_test.go index afb9b2ed2..ecab1d12f 100644 --- a/pkg/controllers/workapplier/suite_test.go +++ b/pkg/controllers/workapplier/suite_test.go @@ -20,6 +20,7 @@ import ( "context" "flag" "path/filepath" + "strings" "sync" "testing" "time" @@ -32,6 +33,7 @@ import ( "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" + "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" "k8s.io/klog/v2/textlogger" ctrl "sigs.k8s.io/controller-runtime" @@ -46,6 +48,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/predicate" fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" + "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer" testv1alpha1 "github.com/kubefleet-dev/kubefleet/test/apis/v1alpha1" ) @@ -70,6 +73,13 @@ var ( memberDynamicClient2 dynamic.Interface workApplier2 *Reconciler + memberCfg3 *rest.Config + memberEnv3 *envtest.Environment + hubMgr3 manager.Manager + memberClient3 client.Client + memberDynamicClient3 dynamic.Interface + workApplier3 *Reconciler + ctx context.Context cancel context.CancelFunc wg sync.WaitGroup @@ -83,8 +93,33 @@ const ( memberReservedNSName1 = "fleet-member-experimental-1" memberReservedNSName2 = "fleet-member-experimental-2" + memberReservedNSName3 = "fleet-member-experimental-3" + + parallelizerFixedDelay = time.Second * 5 ) +// tasks in parallel with a fixed delay after completing each task group. +// +// This is added to help verify the behavior of waved parallel processing in the work applier. +type parallelizerWithFixedDelay struct { + regularParallelizer parallelizer.Parallelizer + delay time.Duration +} + +func (p *parallelizerWithFixedDelay) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) { + p.regularParallelizer.ParallelizeUntil(ctx, pieces, doWork, operation) + klog.V(2).InfoS("Parallelization completed, start to wait with a fixed delay", "operation", operation, "delay", p.delay) + // No need to add delay for non-waved operations. + if strings.HasPrefix(operation, "processingManifestsInWave") { + // Only log the delay for operations that are actually related to waves. + klog.V(2).InfoS("Waiting with a fixed delay after processing a wave", "operation", operation, "delay", p.delay) + time.Sleep(p.delay) + } +} + +// Verify that parallelizerWithFixedDelay implements the parallelizer.Parallelizer interface. +var _ parallelizer.Parallelizer = ¶llelizerWithFixedDelay{} + func TestAPIs(t *testing.T) { RegisterFailHandler(Fail) @@ -105,6 +140,13 @@ func setupResources() { }, } Expect(hubClient.Create(ctx, ns2)).To(Succeed()) + + ns3 := &corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: memberReservedNSName3, + }, + } + Expect(hubClient.Create(ctx, ns3)).To(Succeed()) } var _ = BeforeSuite(func() { @@ -143,6 +185,14 @@ var _ = BeforeSuite(func() { filepath.Join("../../../", "test", "manifests"), }, } + // memberEnv3 is the test environment for verifying the behavior of waved parallel processing in + // the work applier. + memberEnv3 = &envtest.Environment{ + CRDDirectoryPaths: []string{ + filepath.Join("../../../", "config", "crd", "bases"), + filepath.Join("../../../", "test", "manifests"), + }, + } var err error hubCfg, err = hubEnv.Start() @@ -157,6 +207,14 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) Expect(memberCfg2).ToNot(BeNil()) + memberCfg3, err = memberEnv3.Start() + Expect(err).ToNot(HaveOccurred()) + Expect(memberCfg3).ToNot(BeNil()) + + memberCfg2, err = memberEnv2.Start() + Expect(err).ToNot(HaveOccurred()) + Expect(memberCfg2).ToNot(BeNil()) + err = batchv1.AddToScheme(scheme.Scheme) Expect(err).NotTo(HaveOccurred()) err = fleetv1beta1.AddToScheme(scheme.Scheme) @@ -177,6 +235,10 @@ var _ = BeforeSuite(func() { Expect(err).ToNot(HaveOccurred()) Expect(memberClient2).ToNot(BeNil()) + memberClient3, err = client.New(memberCfg3, client.Options{Scheme: scheme.Scheme}) + Expect(err).ToNot(HaveOccurred()) + Expect(memberClient3).ToNot(BeNil()) + // This setup also requires a client-go dynamic client for the member cluster. memberDynamicClient1, err = dynamic.NewForConfig(memberCfg1) Expect(err).ToNot(HaveOccurred()) @@ -184,6 +246,9 @@ var _ = BeforeSuite(func() { memberDynamicClient2, err = dynamic.NewForConfig(memberCfg2) Expect(err).ToNot(HaveOccurred()) + memberDynamicClient3, err = dynamic.NewForConfig(memberCfg3) + Expect(err).ToNot(HaveOccurred()) + By("Setting up the resources") setupResources() @@ -210,7 +275,7 @@ var _ = BeforeSuite(func() { memberClient1.RESTMapper(), hubMgr1.GetEventRecorderFor("work-applier"), maxConcurrentReconciles, - workerCount, + parallelizer.NewParallelizer(workerCount), 30*time.Second, true, 60, @@ -259,7 +324,7 @@ var _ = BeforeSuite(func() { memberClient2.RESTMapper(), hubMgr2.GetEventRecorderFor("work-applier"), maxConcurrentReconciles, - workerCount, + parallelizer.NewParallelizer(workerCount), 30*time.Second, true, 60, @@ -274,8 +339,52 @@ var _ = BeforeSuite(func() { Complete(workApplier2) Expect(err).NotTo(HaveOccurred()) + By("Setting up the controller and the controller manager for member cluster 3") + hubMgr3, err = ctrl.NewManager(hubCfg, ctrl.Options{ + Scheme: scheme.Scheme, + Metrics: server.Options{ + BindAddress: "0", + }, + Cache: cache.Options{ + DefaultNamespaces: map[string]cache.Config{ + memberReservedNSName3: {}, + }, + }, + Logger: textlogger.NewLogger(textlogger.NewConfig(textlogger.Verbosity(4))), + }) + Expect(err).ToNot(HaveOccurred()) + + pWithDelay := ¶llelizerWithFixedDelay{ + regularParallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers), + // To avoid flakiness, use a fixed delay of 5 seconds so that we could reliably verify + // if manifests are actually being processed in waves. + delay: parallelizerFixedDelay, + } + workApplier3 = NewReconciler( + hubClient, + memberReservedNSName3, + memberDynamicClient3, + memberClient3, + memberClient3.RESTMapper(), + hubMgr3.GetEventRecorderFor("work-applier"), + maxConcurrentReconciles, + pWithDelay, + 30*time.Second, + true, + 60, + nil, // Use the default backoff rate limiter. + ) + // Due to name conflicts, the third work applier must be set up manually. + err = ctrl.NewControllerManagedBy(hubMgr3).Named("work-applier-controller-waved-parallel-processing"). + WithOptions(ctrloption.Options{ + MaxConcurrentReconciles: workApplier3.concurrentReconciles, + }). + For(&fleetv1beta1.Work{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). + Complete(workApplier3) + Expect(err).NotTo(HaveOccurred()) + wg = sync.WaitGroup{} - wg.Add(2) + wg.Add(3) go func() { defer GinkgoRecover() defer wg.Done() @@ -289,6 +398,13 @@ var _ = BeforeSuite(func() { Expect(workApplier2.Join(ctx)).To(Succeed()) Expect(hubMgr2.Start(ctx)).To(Succeed()) }() + + go func() { + defer GinkgoRecover() + defer wg.Done() + Expect(workApplier3.Join(ctx)).To(Succeed()) + Expect(hubMgr3.Start(ctx)).To(Succeed()) + }() }) var _ = AfterSuite(func() { @@ -300,4 +416,5 @@ var _ = AfterSuite(func() { Expect(hubEnv.Stop()).To(Succeed()) Expect(memberEnv1.Stop()).To(Succeed()) Expect(memberEnv2.Stop()).To(Succeed()) + Expect(memberEnv3.Stop()).To(Succeed()) }) diff --git a/pkg/controllers/workapplier/waves_integration_test.go b/pkg/controllers/workapplier/waves_integration_test.go new file mode 100644 index 000000000..f9aebc46c --- /dev/null +++ b/pkg/controllers/workapplier/waves_integration_test.go @@ -0,0 +1,1269 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "fmt" + "math/rand/v2" + "slices" + "time" + + "github.com/google/go-cmp/cmp" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + admissionregistrationv1 "k8s.io/api/admissionregistration/v1" + appsv1 "k8s.io/api/apps/v1" + autoscalingv1 "k8s.io/api/autoscaling/v1" + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + networkingv1 "k8s.io/api/networking/v1" + policyv1 "k8s.io/api/policy/v1" + rbacv1 "k8s.io/api/rbac/v1" + schedulingv1 "k8s.io/api/scheduling/v1" + storagev1 "k8s.io/api/storage/v1" + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/utils/ptr" + "sigs.k8s.io/controller-runtime/pkg/client" + + fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" + "github.com/kubefleet-dev/kubefleet/pkg/utils" + "github.com/kubefleet-dev/kubefleet/pkg/utils/condition" +) + +// Note (chenyu1): all test cases in this file use a separate test environment +// (same hub cluster, different fleet member reserved namespace, different +// work applier instance) from the other integration tests. This is needed +// to (relatively speaking) reliably verify the wave-based parallel processing +// in the work applier. + +var _ = Describe("parallel processing with waves", func() { + Context("single wave", Ordered, func() { + workName := fmt.Sprintf(workNameTemplate, utils.RandStr()) + // The environment prepared by the envtest package does not support namespace + // deletion; each test case would use a new namespace. + nsName := fmt.Sprintf(nsNameTemplate, utils.RandStr()) + + pcName := "priority-class-1" + + BeforeAll(func() { + // Prepare a NS object. + regularNS := ns.DeepCopy() + regularNS.Name = nsName + regularNSJSON := marshalK8sObjJSON(regularNS) + + // Prepare a PriorityClass object. + regularPC := &schedulingv1.PriorityClass{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "scheduling.k8s.io/v1", + Kind: "PriorityClass", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: pcName, + }, + Value: 1000, + GlobalDefault: false, + Description: "Experimental priority class", + } + regularPCJSON := marshalK8sObjJSON(regularPC) + + // Create a new Work object with all the manifest JSONs. + createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularPCJSON) + }) + + // For simplicity reasons, this test case will skip some of the regular apply op result verification + // (finalizer check, AppliedWork object check, etc.), as they have been repeatedly verified in different + // test cases under similar conditions. + + It("should update the Work object status", func() { + // Prepare the status information. + workConds := []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAppliedReason, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAvailableReason, + }, + } + manifestConds := []fleetv1beta1.ManifestCondition{ + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "scheduling.k8s.io", + Version: "v1", + Kind: "PriorityClass", + Resource: "priorityclasses", + Name: pcName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 1, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 1, + }, + }, + }, + } + + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName3, workName, workConds, manifestConds, nil, nil) + // Considering the presence of fixed delay in the parallelizer, the test case here + // uses a longer timeout and interval. + Eventually(workStatusUpdatedActual, eventuallyDuration*2, eventuallyInterval*2).Should(Succeed(), "Failed to update work status") + }) + + It("should create resources in parallel", func() { + // The work applier in use for this environment is set to wait for a fixed delay between each + // parallelizer call. If the parallelization is set up correctly, resources in the same wave + // should have very close creation timestamps, while the creation timestamps between resources + // in different waves should have a consistent gap (roughly the fixed delay). + + placedNS := &corev1.Namespace{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Name: nsName}, placedNS), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed Namespace") + + placedPC := &schedulingv1.PriorityClass{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Name: pcName}, placedPC), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed PriorityClass") + + gap := placedPC.CreationTimestamp.Sub(placedNS.CreationTimestamp.Time) + // The two objects belong to the same wave; the creation timestamps should be very close. + Expect(gap).To(BeNumerically("<=", time.Second), "The creation time gap between resources in the same wave is greater than or equal to the fixed delay") + }) + + AfterAll(func() { + // Delete the Work object and related resources. + deleteWorkObject(workName, memberReservedNSName3) + + // Remove the PriorityClass object if it still exists. + Eventually(func() error { + pc := &schedulingv1.PriorityClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: pcName, + }, + } + return memberClient3.Delete(ctx, pc) + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the PriorityClass object") + + // Ensure that the AppliedWork object has been removed. + appliedWorkRemovedActual := appliedWorkRemovedActual(workName, nsName) + Eventually(appliedWorkRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the AppliedWork object") + + workRemovedActual := workRemovedActual(workName) + Eventually(workRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Work object") + // The environment prepared by the envtest package does not support namespace + // deletion; consequently this test suite would not attempt to verify its deletion. + }) + }) + + Context("two consecutive waves", Ordered, func() { + workName := fmt.Sprintf(workNameTemplate, utils.RandStr()) + // The environment prepared by the envtest package does not support namespace + // deletion; each test case would use a new namespace. + nsName := fmt.Sprintf(nsNameTemplate, utils.RandStr()) + + BeforeAll(func() { + // Prepare a NS object. + regularNS := ns.DeepCopy() + regularNS.Name = nsName + regularNSJSON := marshalK8sObjJSON(regularNS) + + // Prepare a ConfigMap object. + regularCM := configMap.DeepCopy() + regularCM.Namespace = nsName + regularCMJSON := marshalK8sObjJSON(regularCM) + + // Create a new Work object with all the manifest JSONs. + createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularCMJSON) + }) + + // For simplicity reasons, this test case will skip some of the regular apply op result verification + // (finalizer check, AppliedWork object check, etc.), as they have been repeatedly verified in different + // test cases under similar conditions. + + It("should update the Work object status", func() { + // Prepare the status information. + workConds := []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAppliedReason, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAvailableReason, + }, + } + manifestConds := []fleetv1beta1.ManifestCondition{ + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "", + Version: "v1", + Kind: "ConfigMap", + Resource: "configmaps", + Name: configMapName, + Namespace: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + } + + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName3, workName, workConds, manifestConds, nil, nil) + // Considering the presence of fixed delay in the parallelizer, the test case here + // uses a longer timeout and interval. + Eventually(workStatusUpdatedActual, eventuallyDuration*2, eventuallyInterval*2).Should(Succeed(), "Failed to update work status") + }) + + It("should create resources in waves", func() { + // The work applier in use for this environment is set to wait for a fixed delay between each + // parallelizer call. If the parallelization is set up correctly, resources in the same wave + // should have very close creation timestamps, while the creation timestamps between resources + // in different waves should have a consistent gap (roughly the fixed delay). + + placedNS := &corev1.Namespace{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Name: nsName}, placedNS), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed Namespace") + + placedCM := &corev1.ConfigMap{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Namespace: nsName, Name: configMapName}, placedCM), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed ConfigMap") + + gap := placedCM.CreationTimestamp.Sub(placedNS.CreationTimestamp.Time) + Expect(gap).To(BeNumerically(">=", parallelizerFixedDelay), "The creation time gap between resources in different waves is less than the fixed delay") + Expect(gap).To(BeNumerically("<", parallelizerFixedDelay*2), "The creation time gap between resources in different waves is at least twice as large as the fixed delay") + }) + + AfterAll(func() { + // Delete the Work object and related resources. + deleteWorkObject(workName, memberReservedNSName3) + + // Remove the ConfigMap object if it still exists. + cmRemovedActual := regularConfigMapRemovedActual(nsName, configMapName) + Eventually(cmRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the ConfigMap object") + + // Ensure that the AppliedWork object has been removed. + appliedWorkRemovedActual := appliedWorkRemovedActual(workName, nsName) + Eventually(appliedWorkRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the AppliedWork object") + + workRemovedActual := workRemovedActual(workName) + Eventually(workRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Work object") + // The environment prepared by the envtest package does not support namespace + // deletion; consequently this test suite would not attempt to verify its deletion. + }) + }) + + Context("two non-consecutive waves", Ordered, func() { + workName := fmt.Sprintf(workNameTemplate, utils.RandStr()) + // The environment prepared by the envtest package does not support namespace + // deletion; each test case would use a new namespace. + nsName := fmt.Sprintf(nsNameTemplate, utils.RandStr()) + + roleName := "role-1" + + BeforeAll(func() { + // Prepare a NS object. + regularNS := ns.DeepCopy() + regularNS.Name = nsName + regularNSJSON := marshalK8sObjJSON(regularNS) + + // Prepare a Role object. + regularRole := &rbacv1.Role{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "Role", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: roleName, + Namespace: nsName, + }, + Rules: []rbacv1.PolicyRule{}, + } + regularRoleJSON := marshalK8sObjJSON(regularRole) + + // Create a new Work object with all the manifest JSONs. + createWorkObject(workName, memberReservedNSName3, nil, regularNSJSON, regularRoleJSON) + }) + + // For simplicity reasons, this test case will skip some of the regular apply op result verification + // (finalizer check, AppliedWork object check, etc.), as they have been repeatedly verified in different + // test cases under similar conditions. + + It("should update the Work object status", func() { + // Prepare the status information. + workConds := []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAppliedReason, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAvailableReason, + }, + } + manifestConds := []fleetv1beta1.ManifestCondition{ + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + Group: "", + Version: "v1", + Kind: "Namespace", + Resource: "namespaces", + Name: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + { + Identifier: fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + Group: "rbac.authorization.k8s.io", + Version: "v1", + Kind: "Role", + Resource: "roles", + Name: roleName, + Namespace: nsName, + }, + Conditions: []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: string(ApplyOrReportDiffResTypeApplied), + ObservedGeneration: 0, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionTrue, + Reason: string(AvailabilityResultTypeAvailable), + ObservedGeneration: 0, + }, + }, + }, + } + + workStatusUpdatedActual := workStatusUpdated(memberReservedNSName3, workName, workConds, manifestConds, nil, nil) + // Considering the presence of fixed delay in the parallelizer, the test case here + // uses a longer timeout and interval. + Eventually(workStatusUpdatedActual, eventuallyDuration*2, eventuallyInterval*2).Should(Succeed(), "Failed to update work status") + }) + + It("should create resources in waves", func() { + // The work applier in use for this environment is set to wait for a fixed delay between each + // parallelizer call. If the parallelization is set up correctly, resources in the same wave + // should have very close creation timestamps, while the creation timestamps between resources + // in different waves should have a consistent gap (roughly the fixed delay). + + placedNS := &corev1.Namespace{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Name: nsName}, placedNS), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed Namespace") + + placedRole := &rbacv1.Role{} + Eventually(memberClient3.Get(ctx, types.NamespacedName{Namespace: nsName, Name: roleName}, placedRole), eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to get the placed Role") + + gap := placedRole.CreationTimestamp.Sub(placedNS.CreationTimestamp.Time) + Expect(gap).To(BeNumerically(">=", parallelizerFixedDelay), "The creation time gap between resources in different waves is less than the fixed delay") + Expect(gap).To(BeNumerically("<", parallelizerFixedDelay*2), "The creation time gap between resources in different waves is at least twice as large as the fixed delay") + }) + + AfterAll(func() { + // Delete the Work object and related resources. + deleteWorkObject(workName, memberReservedNSName3) + + // Remove the Role object if it still exists. + Eventually(func() error { + cr := &rbacv1.Role{ + ObjectMeta: metav1.ObjectMeta{ + Name: roleName, + Namespace: nsName, + }, + } + if err := memberClient3.Delete(ctx, cr); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to deleete the role object: %w", err) + } + + // Check that the Role object has been deleted. + if err := memberClient3.Get(ctx, types.NamespacedName{Namespace: nsName, Name: roleName}, cr); !errors.IsNotFound(err) { + return fmt.Errorf("role object still exists or an unexpected error occurred: %w", err) + } + return nil + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Role object") + + // Ensure that the AppliedWork object has been removed. + appliedWorkRemovedActual := appliedWorkRemovedActual(workName, nsName) + Eventually(appliedWorkRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the AppliedWork object") + + workRemovedActual := workRemovedActual(workName) + Eventually(workRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Work object") + // The environment prepared by the envtest package does not support namespace + // deletion; consequently this test suite would not attempt to verify its deletion. + }) + }) + + Context("all waves", Ordered, func() { + //workName := fmt.Sprintf(workNameTemplate, utils.RandStr()) + // The environment prepared by the envtest package does not support namespace + // deletion; each test case would use a new namespace. + nsName := fmt.Sprintf(nsNameTemplate, utils.RandStr()) + + // The array below includes objects of all known resource types for waved + // processing, plus a few objects of unknown resource types. + objectsOfVariousResourceTypes := []client.Object{ + // Wave 0 objects. + // Namespace object is created separately. + &schedulingv1.PriorityClass{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "scheduling.k8s.io/v1", + Kind: "PriorityClass", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pc-%s", utils.RandStr()), + }, + Value: 1000, + }, + // Wave 1 objects. + &networkingv1.NetworkPolicy{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "networking.k8s.io/v1", + Kind: "NetworkPolicy", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("np-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: networkingv1.NetworkPolicySpec{ + PodSelector: metav1.LabelSelector{}, + }, + }, + &corev1.ResourceQuota{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ResourceQuota", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rq-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.ResourceQuotaSpec{}, + }, + &corev1.LimitRange{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "LimitRange", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("lr-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.LimitRangeSpec{}, + }, + &policyv1.PodDisruptionBudget{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "policy/v1", + Kind: "PodDisruptionBudget", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pdb-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: policyv1.PodDisruptionBudgetSpec{ + Selector: &metav1.LabelSelector{}, + }, + }, + &corev1.ServiceAccount{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ServiceAccount", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("sa-%s", utils.RandStr()), + Namespace: nsName, + }, + }, + &corev1.Secret{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Secret", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("secret-%s", utils.RandStr()), + Namespace: nsName, + }, + Type: corev1.SecretTypeOpaque, + }, + &corev1.ConfigMap{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ConfigMap", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("cm-%s", utils.RandStr()), + Namespace: nsName, + }, + }, + &storagev1.StorageClass{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "storage.k8s.io/v1", + Kind: "StorageClass", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("sc-%s", utils.RandStr()), + }, + Provisioner: "kubernetes.io/no-provisioner", + }, + &corev1.PersistentVolume{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PersistentVolume", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pv-%s", utils.RandStr()), + }, + Spec: corev1.PersistentVolumeSpec{ + Capacity: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("1Gi"), + }, + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + PersistentVolumeSource: corev1.PersistentVolumeSource{ + HostPath: &corev1.HostPathVolumeSource{ + Path: "/mnt/data", + }, + }, + }, + }, + &corev1.PersistentVolumeClaim{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "PersistentVolumeClaim", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pvc-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.PersistentVolumeClaimSpec{ + AccessModes: []corev1.PersistentVolumeAccessMode{ + corev1.ReadWriteOnce, + }, + Resources: corev1.VolumeResourceRequirements{ + Requests: corev1.ResourceList{ + corev1.ResourceStorage: resource.MustParse("1Gi"), + }, + }, + }, + }, + &apiextensionsv1.CustomResourceDefinition{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apiextensions.k8s.io/v1", + Kind: "CustomResourceDefinition", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "bars.example.com", + }, + Spec: apiextensionsv1.CustomResourceDefinitionSpec{ + Group: "example.com", + Names: apiextensionsv1.CustomResourceDefinitionNames{ + Plural: "bars", + Kind: "Bar", + Singular: "bar", + }, + Scope: apiextensionsv1.NamespaceScoped, + Versions: []apiextensionsv1.CustomResourceDefinitionVersion{{ + Name: "v1", + Served: true, + Storage: true, + Schema: &apiextensionsv1.CustomResourceValidation{ + OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{ + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "spec": { + Type: "object", + Properties: map[string]apiextensionsv1.JSONSchemaProps{ + "placeholder": { + Type: "string", + }, + }, + }, + }, + }, + }, + }}, + }, + }, + &networkingv1.IngressClass{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "networking.k8s.io/v1", + Kind: "IngressClass", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("ic-%s", utils.RandStr()), + }, + Spec: networkingv1.IngressClassSpec{ + Controller: "example.com/ingress-controller", + }, + }, + // Wave 2 objects. + &rbacv1.ClusterRole{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "ClusterRole", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("cr-%s", utils.RandStr()), + }, + Rules: []rbacv1.PolicyRule{}, + }, + &rbacv1.Role{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "Role", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("role-%s", utils.RandStr()), + Namespace: nsName, + }, + Rules: []rbacv1.PolicyRule{}, + }, + // Wave 3 objects. + &rbacv1.ClusterRoleBinding{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "ClusterRoleBinding", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("crb-%s", utils.RandStr()), + }, + Subjects: []rbacv1.Subject{}, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "ClusterRole", + Name: "dummy", + }, + }, + &rbacv1.RoleBinding{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "rbac.authorization.k8s.io/v1", + Kind: "RoleBinding", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rb-%s", utils.RandStr()), + Namespace: nsName, + }, + Subjects: []rbacv1.Subject{}, + RoleRef: rbacv1.RoleRef{ + APIGroup: "rbac.authorization.k8s.io", + Kind: "Role", + Name: "dummy", + }, + }, + // Wave 4 objects. + &corev1.Service{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Service", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("svc-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.ServiceSpec{ + Selector: map[string]string{}, + Ports: []corev1.ServicePort{ + { + Name: "http", + Port: 80, + }, + }, + }, + }, + &appsv1.DaemonSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "DaemonSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("ds-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: appsv1.DaemonSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + dummyLabelKey: dummyLabelValue1, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + dummyLabelKey: dummyLabelValue1, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + }, + }, + &corev1.Pod{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Pod", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("pod-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + &corev1.ReplicationController{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "ReplicationController", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rc-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: corev1.ReplicationControllerSpec{ + Selector: map[string]string{ + dummyLabelKey: dummyLabelValue2, + }, + Template: &corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + dummyLabelKey: dummyLabelValue2, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + }, + }, + &appsv1.ReplicaSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "ReplicaSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("rs-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: appsv1.ReplicaSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + dummyLabelKey: dummyLabelValue3, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + dummyLabelKey: dummyLabelValue3, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + }, + }, + &appsv1.Deployment{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "Deployment", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("deploy-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: appsv1.DeploymentSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + dummyLabelKey: dummyLabelValue4, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + dummyLabelKey: dummyLabelValue4, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + }, + }, + &autoscalingv1.HorizontalPodAutoscaler{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "autoscaling/v1", + Kind: "HorizontalPodAutoscaler", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("hpa-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: autoscalingv1.HorizontalPodAutoscalerSpec{ + ScaleTargetRef: autoscalingv1.CrossVersionObjectReference{ + Kind: "Deployment", + Name: "dummy", + }, + MaxReplicas: 10, + MinReplicas: ptr.To(int32(1)), + }, + }, + &appsv1.StatefulSet{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "apps/v1", + Kind: "StatefulSet", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("sts-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: appsv1.StatefulSetSpec{ + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + dummyLabelKey: dummyLabelValue5, + }, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + dummyLabelKey: dummyLabelValue5, + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + }, + }, + }, + }, + &batchv1.Job{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "Job", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("job-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + &batchv1.CronJob{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "batch/v1", + Kind: "CronJob", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("cj-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: batchv1.CronJobSpec{ + JobTemplate: batchv1.JobTemplateSpec{ + Spec: batchv1.JobSpec{ + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{}, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "busybox", + Image: "busybox", + }, + }, + RestartPolicy: corev1.RestartPolicyNever, + }, + }, + }, + }, + Schedule: "*/1 * * * *", + }, + }, + &networkingv1.Ingress{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "networking.k8s.io/v1", + Kind: "Ingress", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("ing-%s", utils.RandStr()), + Namespace: nsName, + }, + Spec: networkingv1.IngressSpec{ + Rules: []networkingv1.IngressRule{ + { + Host: "example.com", + IngressRuleValue: networkingv1.IngressRuleValue{ + HTTP: &networkingv1.HTTPIngressRuleValue{ + Paths: []networkingv1.HTTPIngressPath{ + { + Path: "/", + PathType: ptr.To(networkingv1.PathTypePrefix), + Backend: networkingv1.IngressBackend{ + Service: &networkingv1.IngressServiceBackend{ + Name: "placeholder", + Port: networkingv1.ServiceBackendPort{ + Number: 80, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + // Wave 5 objects. + // The APIService object is not included due to setup complications. + &admissionregistrationv1.ValidatingWebhookConfiguration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admissionregistration.k8s.io/v1", + Kind: "ValidatingWebhookConfiguration", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("vwc-%s", utils.RandStr()), + }, + Webhooks: []admissionregistrationv1.ValidatingWebhook{}, + }, + &admissionregistrationv1.MutatingWebhookConfiguration{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "admissionregistration.k8s.io/v1", + Kind: "MutatingWebhookConfiguration", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("mwc-%s", utils.RandStr()), + }, + Webhooks: []admissionregistrationv1.MutatingWebhook{}, + }, + // Unknown resource types (no wave assigned by default); should always get processed at last. + &discoveryv1.EndpointSlice{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "discovery.k8s.io/v1", + Kind: "EndpointSlice", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("eps-%s", utils.RandStr()), + Namespace: nsName, + }, + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{}, + }, + } + + BeforeAll(func() { + allManifestJSONByteArrs := make([][]byte, 0, len(objectsOfVariousResourceTypes)+1) + + // Prepare a NS object. + regularNS := ns.DeepCopy() + regularNS.Name = nsName + regularNSJSON := marshalK8sObjJSON(regularNS) + allManifestJSONByteArrs = append(allManifestJSONByteArrs, regularNSJSON) + + // Prepare all other objects. + for idx := range objectsOfVariousResourceTypes { + obj := objectsOfVariousResourceTypes[idx] + allManifestJSONByteArrs = append(allManifestJSONByteArrs, marshalK8sObjJSON(obj)) + } + // Shuffle the manifest JSONs. + rand.Shuffle(len(allManifestJSONByteArrs), func(i, j int) { + allManifestJSONByteArrs[i], allManifestJSONByteArrs[j] = allManifestJSONByteArrs[j], allManifestJSONByteArrs[i] + }) + + // Create a new Work object with all the manifest JSONs. + createWorkObject(workName, memberReservedNSName3, nil, allManifestJSONByteArrs...) + }) + + // For simplicity reasons, this test case will skip some of the regular apply op result verification + // (finalizer check, AppliedWork object check, etc.), as they have been repeatedly verified in different + // test cases under similar conditions. + + It("should update the Work object status", func() { + Eventually(func() error { + work := &fleetv1beta1.Work{} + if err := hubClient.Get(ctx, client.ObjectKey{Name: workName, Namespace: memberReservedNSName3}, work); err != nil { + return fmt.Errorf("failed to retrieve the Work object: %w", err) + } + + // Compare only the work conditions for simplicity reasons. + wantWorkConds := []metav1.Condition{ + { + Type: fleetv1beta1.WorkConditionTypeApplied, + Status: metav1.ConditionTrue, + Reason: condition.WorkAllManifestsAppliedReason, + }, + { + Type: fleetv1beta1.WorkConditionTypeAvailable, + Status: metav1.ConditionFalse, + // In the current test environment some API objects will never become available. + Reason: condition.WorkNotAllManifestsAvailableReason, + }, + } + for idx := range wantWorkConds { + wantWorkConds[idx].ObservedGeneration = work.Generation + } + if diff := cmp.Diff( + work.Status.Conditions, wantWorkConds, + ignoreFieldConditionLTTMsg, + ); diff != "" { + return fmt.Errorf("Work status conditions mismatch (-got, +want):\n%s", diff) + } + return nil + // As each wave starts with a fixed delay, and this specific test case involves all the waves, + // the test spec here uses a longer timeout and interval. + }, eventuallyDuration*10, eventuallyInterval*5).Should(Succeed(), "Failed to update work status") + }) + + It("should process manifests in waves", func() { + creationTimestampsPerWave := make(map[waveNumber][]metav1.Time, len(defaultWaveNumberByResourceType)) + for idx := range objectsOfVariousResourceTypes { + obj := objectsOfVariousResourceTypes[idx] + objGK := obj.GetObjectKind().GroupVersionKind().GroupKind() + objVer := obj.GetObjectKind().GroupVersionKind().Version + objResTyp, err := memberClient3.RESTMapper().RESTMapping(objGK, objVer) + Expect(err).NotTo(HaveOccurred(), "Failed to get the resource type of an object") + + processedObj := obj.DeepCopyObject().(client.Object) + Expect(memberClient3.Get(ctx, client.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()}, processedObj)).To(Succeed(), "Failed to get a placed object") + + waveNum, ok := defaultWaveNumberByResourceType[objResTyp.Resource.Resource] + if !ok { + waveNum = lastWave + } + + timestamps := creationTimestampsPerWave[waveNum] + if timestamps == nil { + timestamps = make([]metav1.Time, 0, 1) + } + timestamps = append(timestamps, processedObj.GetCreationTimestamp()) + creationTimestampsPerWave[waveNum] = timestamps + } + + expectedWaveNums := []waveNumber{0, 1, 2, 3, 4, 5, lastWave} + // Do a sanity check. + Expect(len(creationTimestampsPerWave)).To(Equal(len(expectedWaveNums)), "The number of waves does not match the expectation") + var observedLatestCreationTimestampInLastWave *metav1.Time + for _, waveNum := range expectedWaveNums { + By(fmt.Sprintf("checking wave %d", waveNum)) + + timestamps := creationTimestampsPerWave[waveNum] + // Do a sanity check. + Expect(timestamps).NotTo(BeEmpty(), "No creation timestamps recorded for a wave") + + // Check that timestamps in the same wave are close enough. + slices.SortFunc(timestamps, func(a, b metav1.Time) int { + return a.Time.Compare(b.Time) + }) + + earliest := timestamps[0] + latest := timestamps[len(timestamps)-1] + gapWithinWave := latest.Sub(earliest.Time) + // Normally all resources in the same wave should be created within a very short time window, + // usually within a few tens of milliseconds; here the test spec uses a more forgiving threshold + // of 2 seconds to avoid flakiness. + Expect(gapWithinWave).To(BeNumerically("<", time.Second*2), "The creation time gap between resources in the same wave is larger than expected") + + if observedLatestCreationTimestampInLastWave != nil { + // Check that the current wave is processed after the last wave with a fixed delay. + gapBetweenWaves := earliest.Sub(observedLatestCreationTimestampInLastWave.Time) + Expect(gapBetweenWaves).To(BeNumerically(">=", parallelizerFixedDelay), "The creation time gap between resources in different waves is less than the fixed delay") + Expect(gapBetweenWaves).To(BeNumerically("<", parallelizerFixedDelay*2), "The creation time gap between resources in different waves is at least twice as large as the fixed delay") + } + + observedLatestCreationTimestampInLastWave = ×tamps[len(timestamps)-1] + } + }) + + AfterAll(func() { + // Delete the Work object and related resources. + deleteWorkObject(workName, memberReservedNSName3) + + // Remove all the placed objects if they still exist. + for idx := range objectsOfVariousResourceTypes { + objCopy := objectsOfVariousResourceTypes[idx].DeepCopyObject().(client.Object) + gvk := objCopy.GetObjectKind().GroupVersionKind() + switch { + case gvk.Group == "" && gvk.Kind == "PersistentVolume": + // Skip the PV resources as their deletion might get stuck in the test environment. + // This in most cases should have no side effect as the tests do not reuse namespaces and + // the resources have random suffixes in their names. + continue + case gvk.Group == "" && gvk.Kind == "PersistentVolumeClaim": + // For the same reason as above, skip the PVC resources. + continue + case gvk.Group == "" && gvk.Kind == "ReplicationController": + // For the same reason as above, skip the RC resources. + continue + case gvk.Group == "batch" && gvk.Kind == "Job": + // For the same reason as above, skip the Job resources. + continue + } + placeholder := objCopy.DeepCopyObject().(client.Object) + + Eventually(func() error { + if err := memberClient3.Delete(ctx, objCopy); err != nil && !errors.IsNotFound(err) { + return fmt.Errorf("failed to delete the object: %w", err) + } + + // Check that the object has been deleted. + if err := memberClient3.Get(ctx, client.ObjectKey{Namespace: objCopy.GetNamespace(), Name: objCopy.GetName()}, placeholder); !errors.IsNotFound(err) { + return fmt.Errorf("object still exists or an unexpected error occurred: %w", err) + } + return nil + }, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove a placed object (idx: %d)", idx) + } + + // Ensure that the AppliedWork object has been removed. + appliedWorkRemovedActual := appliedWorkRemovedActual(workName, nsName) + Eventually(appliedWorkRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the AppliedWork object") + + workRemovedActual := workRemovedActual(workName) + Eventually(workRemovedActual, eventuallyDuration, eventuallyInterval).Should(Succeed(), "Failed to remove the Work object") + // The environment prepared by the envtest package does not support namespace + // deletion; consequently this test suite would not attempt to verify its deletion. + }) + }) +}) diff --git a/pkg/scheduler/framework/framework.go b/pkg/scheduler/framework/framework.go index 2b317a37e..f276d6f00 100644 --- a/pkg/scheduler/framework/framework.go +++ b/pkg/scheduler/framework/framework.go @@ -121,7 +121,7 @@ type framework struct { eventRecorder record.EventRecorder // parallelizer is a utility which helps run tasks in parallel. - parallelizer *parallelizer.Parallerlizer + parallelizer parallelizer.Parallelizer // eligibilityChecker is a utility which helps determine if a cluster is eligible for resource placement. clusterEligibilityChecker *clustereligibilitychecker.ClusterEligibilityChecker diff --git a/pkg/utils/parallelizer/parallelizer.go b/pkg/utils/parallelizer/parallelizer.go index f6fd331d0..4c26fb43d 100644 --- a/pkg/utils/parallelizer/parallelizer.go +++ b/pkg/utils/parallelizer/parallelizer.go @@ -30,19 +30,25 @@ const ( ) // Parallelizer helps run tasks in parallel. -type Parallerlizer struct { +type Parallelizer interface { + // ParallelizeUntil runs tasks in parallel, wrapping workqueue.ParallelizeUntil. + ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) +} + +// Parallelizer helps run tasks in parallel. +type parallelizer struct { numOfWorkers int } -// NewParallelizer returns a Parallelizer for running tasks in parallel. -func NewParallelizer(workers int) *Parallerlizer { - return &Parallerlizer{ +// NewParallelizer returns a parallelizer for running tasks in parallel. +func NewParallelizer(workers int) *parallelizer { + return ¶llelizer{ numOfWorkers: workers, } } // ParallelizeUntil wraps workqueue.ParallelizeUntil for running tasks in parallel. -func (p *Parallerlizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) { +func (p *parallelizer) ParallelizeUntil(ctx context.Context, pieces int, doWork workqueue.DoWorkPieceFunc, operation string) { doWorkWithLogs := func(piece int) { klog.V(4).Infof("run piece %d for operation %s", piece, operation) doWork(piece)