diff --git a/pkg/controllers/workapplier/process.go b/pkg/controllers/workapplier/process.go index 646433b1f..d58c183c2 100644 --- a/pkg/controllers/workapplier/process.go +++ b/pkg/controllers/workapplier/process.go @@ -37,15 +37,52 @@ func (r *Reconciler) processManifests( work *fleetv1beta1.Work, expectedAppliedWorkOwnerRef *metav1.OwnerReference, ) { - // TODO: We have to apply the namespace/crd/secret/configmap/pvc first - // then we can process some of the manifests in parallel. - for _, bundle := range bundles { - if bundle.applyOrReportDiffErr != nil { - // Skip a manifest if it has failed pre-processing. - continue + // Process all manifests in parallel. + // + // There are cases where certain groups of manifests should not be processed in parallel with + // each other (e.g., a config map must be applied after its owner namespace is applied); + // to address this situation, manifests are processed in waves: manifests in the same wave are + // processed in parallel, while different waves are processed sequentially. + + // As a special case, if the ReportDiff mode is on, all manifests are processed in parallel in + // one wave. + if work.Spec.ApplyStrategy != nil && work.Spec.ApplyStrategy.Type == fleetv1beta1.ApplyStrategyTypeReportDiff { + doWork := func(piece int) { + if bundles[piece].applyOrReportDiffErr != nil { + // Skip a manifest if it has failed pre-processing. + return + } + + r.processOneManifest(ctx, bundles[piece], work, expectedAppliedWorkOwnerRef) + klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundles[piece].manifestObj), "work", klog.KObj(work)) } - r.processOneManifest(ctx, bundle, work, expectedAppliedWorkOwnerRef) - klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundle.manifestObj), "work", klog.KObj(work)) + + r.parallelizer.ParallelizeUntil(ctx, len(bundles), doWork, "processingManifestsInReportDiffMode") + return + } + + // Organize the bundles into different waves of bundles for parallel processing based on their + // GVR information. + processingWaves := organizeBundlesIntoProcessingWaves(bundles, klog.KObj(work)) + for idx := range processingWaves { + bundlesInWave := processingWaves[idx].bundles + + // TO-DO (chenyu1): evaluate if there is a need to avoid repeated closure + // assignment just for capturing variables. + doWork := func(piece int) { + if bundlesInWave[piece].applyOrReportDiffErr != nil { + // Skip a manifest if it has failed pre-processing. + // + // This added as a sanity check as the organization step normally + // would have already skipped all the manifests with processing failures. + return + } + + r.processOneManifest(ctx, bundlesInWave[piece], work, expectedAppliedWorkOwnerRef) + klog.V(2).InfoS("Processed a manifest", "manifestObj", klog.KObj(bundlesInWave[piece].manifestObj), "work", klog.KObj(work)) + } + + r.parallelizer.ParallelizeUntil(ctx, len(bundlesInWave), doWork, "processingManifests") } } diff --git a/pkg/controllers/workapplier/waves.go b/pkg/controllers/workapplier/waves.go new file mode 100644 index 000000000..6433584be --- /dev/null +++ b/pkg/controllers/workapplier/waves.go @@ -0,0 +1,180 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "slices" + + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/klog/v2" +) + +type waveNumber int + +const ( + lastWave waveNumber = 999 +) + +var ( + // The default wave number for all known Kubernetes resource type. + // + // Note (chenyu1): the waves below are based on the Helm resource installation + // order (see also the Helm source code). Similar objects are grouped together + // to achieve best performance. + defaultWaveNumberByResourceType = map[string]waveNumber{ + // Apply namespaces and priority classes first. + "namespaces": 0, + "priorityclasses": 0, + // Apply policies, configuration data, and other static resources second. + "networkpolicies": 1, + "resourcequotas": 1, + "limitranges": 1, + "podsecuritypolicies": 1, + "poddisruptionbudgets": 1, + "serviceaccounts": 1, + "secrets": 1, + "configmaps": 1, + "storageclasses": 1, + "persistentvolumes": 1, + "persistentvolumeclaims": 1, + "customresourcedefinitions": 1, + "ingressclasses": 1, + // Apply RBAC resources (cluster roles and roles). + "clusterroles": 2, + "roles": 2, + // Apply RBAC resources (cluster role bindings and role bindings). + "clusterrolebindings": 3, + "rolebindings": 3, + // Apply workloads and services. + "services": 4, + "daemonsets": 4, + "pods": 4, + "replicationcontrollers": 4, + "replicasets": 4, + "deployments": 4, + "horizontalpodautoscalers": 4, + "statefulsets": 4, + "jobs": 4, + "cronjobs": 4, + "ingresses": 4, + // Apply API services and webhooks. + "apiservices": 5, + "validatingwebhookconfigurations": 5, + "mutatingwebhookconfigurations": 5, + } + + // The API groups for all known Kubernetes resource types. + knownAPIGroups = sets.New( + // The core API group. + "", + // The networking API group (`networking.k8s.io`). + "networking.k8s.io", + // The scheduling API group (`scheduling.k8s.io`). + "scheduling.k8s.io", + // The policy API group (`policy.k8s.io`). + "policy", + // The storage API group (`storage.k8s.io`). + "storage.k8s.io", + // The API extensions API group (`apiextensions.k8s.io`). + "apiextensions.k8s.io", + // The RBAC authorization API group (`rbac.authorization.k8s.io`). + "rbac.authorization.k8s.io", + // The apps API group (`apps`). + "apps", + // The autoscaling API group (`autoscaling`). + "autoscaling", + // The API registration API group (`apiregistration.k8s.io`). + "apiregistration.k8s.io", + // The admission registration API group (`admissionregistration.k8s.io`). + "admissionregistration.k8s.io", + // The batch API group (`batch`). + "batch", + ) +) + +// bundleProcessingWave is a wave of bundles that can be processed in parallel. +type bundleProcessingWave struct { + num waveNumber + bundles []*manifestProcessingBundle +} + +// organizeBundlesIntoProcessingWaves organizes the list of bundles into different +// waves of bundles for parallel processing based on their GVR information. +func organizeBundlesIntoProcessingWaves(bundles []*manifestProcessingBundle, workRef klog.ObjectRef) []*bundleProcessingWave { + // Pre-allocate the map; 7 is the total count of default wave numbers, though + // not all wave numbers might be used. + waveByNum := make(map[waveNumber]*bundleProcessingWave, 7) + + getOrAddWave := func(num waveNumber) *bundleProcessingWave { + wave, ok := waveByNum[num] + if !ok { + wave = &bundleProcessingWave{ + num: num, + // Pre-allocate with a reasonable size. + bundles: make([]*manifestProcessingBundle, 0, 5), + } + waveByNum[num] = wave + } + return wave + } + + // For simplicity reasons, the organization itself runs in sequential order. + // Considering that the categorization itself is quick and the total number of bundles + // should be limited in most cases, this should not introduce significant overhead. + for idx := range bundles { + bundle := bundles[idx] + if bundle.gvr == nil { + // For manifest data that cannot be decoded, there might not be any available GVR + // information. Skip such processing bundles; this is not considered as an error. + klog.V(2).InfoS("Skipping a bundle with no GVR; no wave is assigned", + "ordinal", idx, "work", workRef) + continue + } + + if bundle.applyOrReportDiffErr != nil { + // An error has occurred before this step; such bundles need no further processing, + // skip them. This is not considered as an error. + klog.V(2).InfoS("Skipping a bundle with prior processing error; no wave is assigned", + "manifestObj", klog.KObj(bundle.manifestObj), "GVR", *bundle.gvr, "work", workRef) + continue + } + + waveNum := lastWave + defaultWaveNum, foundInDefaultWaveNumber := defaultWaveNumberByResourceType[bundle.gvr.Resource] + if foundInDefaultWaveNumber && knownAPIGroups.Has(bundle.gvr.Group) { + // The resource is a known one; assign the bundle to its default wave. + waveNum = defaultWaveNum + } + + wave := getOrAddWave(waveNum) + wave.bundles = append(wave.bundles, bundle) + klog.V(2).InfoS("Assigned manifest to a wave", + "waveNumber", waveNum, + "manifestObj", klog.KObj(bundle.manifestObj), "GVR", *bundle.gvr, "work", workRef) + } + + // Retrieve all the waves and sort them by their wave number. + waves := make([]*bundleProcessingWave, 0, len(waveByNum)) + for _, w := range waveByNum { + waves = append(waves, w) + } + // Sort the waves in ascending order. + slices.SortFunc(waves, func(a, b *bundleProcessingWave) int { + return int(a.num) - int(b.num) + }) + return waves +} diff --git a/pkg/controllers/workapplier/waves_test.go b/pkg/controllers/workapplier/waves_test.go new file mode 100644 index 000000000..fcd67977b --- /dev/null +++ b/pkg/controllers/workapplier/waves_test.go @@ -0,0 +1,1120 @@ +/* +Copyright 2025 The KubeFleet Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workapplier + +import ( + "fmt" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + + "github.com/google/go-cmp/cmp" + fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1" +) + +// TestOrganizeBundlesIntoProcessingWaves tests the organizeBundlesIntoProcessingWaves function. +func TestOrganizeBundlesIntoProcessingWaves(t *testing.T) { + work := &fleetv1beta1.Work{ + ObjectMeta: metav1.ObjectMeta{ + Name: workName, + }, + } + workRef := klog.KObj(work) + + testCases := []struct { + name string + bundles []*manifestProcessingBundle + wantWaves []*bundleProcessingWave + }{ + { + name: "single bundle (known resource) into single wave", + bundles: []*manifestProcessingBundle{ + { + // Note: the IDs are added here purely for identification reasons; they are + // not consistent with other parts of the bundle, and do not reflect + // how the work applier actually sees bundles in an actual processing run. + // + // The same applies to other test cases in this spec. + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + // Note: the version part does not matter, as Kubernetes requires that one + // resource must be uniquely identified by the combo of its API group, resource type, + // namespace (if applicable), and name, but not the version. The information + // here is added for completeness reasons. + // + // The same applies to other tests cases in this spec. + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + }, + }, + { + // Normally this test case will never occur; it is added for completeness reasons. + name: "bundle with decoding errors (no GVR)", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 2, + }, + gvr: nil, + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + }, + }, + { + name: "bundle with decoding errors (invalid JSON)", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{}, + applyOrReportDiffErr: fmt.Errorf("failed to unmarshal JSON"), + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + }, + }, + { + name: "bundle with decoding errors (unregistered API)", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{}, + applyOrReportDiffErr: fmt.Errorf("failed to find GVR from member cluster client REST mapping"), + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + }, + }, + { + name: "bundle with unknown resource type", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "placeholders", + }, + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + { + num: lastWave, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "placeholders", + }, + }, + }, + }, + }, + }, + { + name: "bundle with known resource type but different API group", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v1", + Resource: "deployments", + }, + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + { + num: lastWave, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v1", + Resource: "deployments", + }, + }, + }, + }, + }, + }, + { + name: "bundle with unknown resource type from unknown API group", + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v10", + Resource: "placeholders", + }, + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + { + num: lastWave, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v10", + Resource: "placeholders", + }, + }, + }, + }, + }, + }, + { + name: "mixed", + // The bundles below feature all known resource types from known API groups + // (in reverse order by wave number), plus unknown resources and bundles with errors. + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "admissionregistration.k8s.io", + Version: "v1", + Resource: "mutatingwebhookconfigurations", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "admissionregistration.k8s.io", + Version: "v1", + Resource: "validatingwebhookconfigurations", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 2, + }, + gvr: &schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 3, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "ingresses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 4, + }, + gvr: &schema.GroupVersionResource{ + Group: "batch", + Version: "v1", + Resource: "cronjobs", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 5, + }, + gvr: &schema.GroupVersionResource{ + Group: "batch", + Version: "v1", + Resource: "jobs", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 6, + }, + gvr: &schema.GroupVersionResource{ + Group: "autoscaling", + Version: "v1", + Resource: "horizontalpodautoscalers", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 7, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "statefulsets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 8, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 9, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 10, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "replicationcontrollers", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 11, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 12, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "daemonsets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 13, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 14, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "rolebindings", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 15, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "clusterrolebindings", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 16, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "roles", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 17, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "clusterroles", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 18, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "ingressclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 19, + }, + gvr: &schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 20, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "persistentvolumeclaims", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 21, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "persistentvolumes", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 22, + }, + gvr: &schema.GroupVersionResource{ + Group: "storage.k8s.io", + Version: "v1", + Resource: "storageclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 23, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "configmaps", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 24, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 25, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "serviceaccounts", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 26, + }, + gvr: &schema.GroupVersionResource{ + Group: "policy", + Version: "v1", + Resource: "poddisruptionbudgets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 27, + }, + gvr: &schema.GroupVersionResource{ + Group: "policy", + Version: "v1", + Resource: "podsecuritypolicies", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 28, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "limitranges", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 29, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "resourcequotas", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 30, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "networkpolicies", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 31, + }, + gvr: &schema.GroupVersionResource{ + Group: "scheduling.k8s.io", + Version: "v1", + Resource: "priorityclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 32, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v10", + Resource: "placeholders", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 33, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 34, + }, + gvr: &schema.GroupVersionResource{}, + applyOrReportDiffErr: fmt.Errorf("failed to find GVR from member cluster client REST mapping"), + }, + }, + wantWaves: []*bundleProcessingWave{ + { + num: 0, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 31, + }, + gvr: &schema.GroupVersionResource{ + Group: "scheduling.k8s.io", + Version: "v1", + Resource: "priorityclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 33, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "namespaces", + }, + }, + }, + }, + { + num: 1, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 18, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "ingressclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 19, + }, + gvr: &schema.GroupVersionResource{ + Group: "apiextensions.k8s.io", + Version: "v1", + Resource: "customresourcedefinitions", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 20, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "persistentvolumeclaims", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 21, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "persistentvolumes", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 22, + }, + gvr: &schema.GroupVersionResource{ + Group: "storage.k8s.io", + Version: "v1", + Resource: "storageclasses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 23, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "configmaps", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 24, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "secrets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 25, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "serviceaccounts", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 26, + }, + gvr: &schema.GroupVersionResource{ + Group: "policy", + Version: "v1", + Resource: "poddisruptionbudgets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 27, + }, + gvr: &schema.GroupVersionResource{ + Group: "policy", + Version: "v1", + Resource: "podsecuritypolicies", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 28, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "limitranges", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 29, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "resourcequotas", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 30, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "networkpolicies", + }, + }, + }, + }, + { + num: 2, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 16, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "roles", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 17, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "clusterroles", + }, + }, + }, + }, + { + num: 3, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 14, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "rolebindings", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 15, + }, + gvr: &schema.GroupVersionResource{ + Group: "rbac.authorization.k8s.io", + Version: "v1", + Resource: "clusterrolebindings", + }, + }, + }, + }, + { + num: 4, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 3, + }, + gvr: &schema.GroupVersionResource{ + Group: "networking.k8s.io", + Version: "v1", + Resource: "ingresses", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 4, + }, + gvr: &schema.GroupVersionResource{ + Group: "batch", + Version: "v1", + Resource: "cronjobs", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 5, + }, + gvr: &schema.GroupVersionResource{ + Group: "batch", + Version: "v1", + Resource: "jobs", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 6, + }, + gvr: &schema.GroupVersionResource{ + Group: "autoscaling", + Version: "v1", + Resource: "horizontalpodautoscalers", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 7, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "statefulsets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 8, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "deployments", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 9, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "replicasets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 10, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "replicationcontrollers", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 11, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "pods", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 12, + }, + gvr: &schema.GroupVersionResource{ + Group: "apps", + Version: "v1", + Resource: "daemonsets", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 13, + }, + gvr: &schema.GroupVersionResource{ + Group: "", + Version: "v1", + Resource: "services", + }, + }, + }, + }, + { + num: 5, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 0, + }, + gvr: &schema.GroupVersionResource{ + Group: "admissionregistration.k8s.io", + Version: "v1", + Resource: "mutatingwebhookconfigurations", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 1, + }, + gvr: &schema.GroupVersionResource{ + Group: "admissionregistration.k8s.io", + Version: "v1", + Resource: "validatingwebhookconfigurations", + }, + }, + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 2, + }, + gvr: &schema.GroupVersionResource{ + Group: "apiregistration.k8s.io", + Version: "v1", + Resource: "apiservices", + }, + }, + }, + }, + { + num: lastWave, + bundles: []*manifestProcessingBundle{ + { + id: &fleetv1beta1.WorkResourceIdentifier{ + Ordinal: 32, + }, + gvr: &schema.GroupVersionResource{ + Group: "dummy", + Version: "v10", + Resource: "placeholders", + }, + }, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + waves := organizeBundlesIntoProcessingWaves(tc.bundles, workRef) + if diff := cmp.Diff( + waves, tc.wantWaves, + cmp.AllowUnexported(manifestProcessingBundle{}, bundleProcessingWave{}), + ); diff != "" { + t.Errorf("organized waves mismatch (-got, +want):\n%s", diff) + } + }) + } +}